1086 lines
45 KiB
Python
1086 lines
45 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
全功能产品抓取与分析系统
|
||
整合integrated_scraper.py和product_ai_analysis.py的功能
|
||
支持从ProductHunt抓取数据并进行AI分析
|
||
"""
|
||
|
||
import sqlite3
|
||
import asyncio
|
||
import os
|
||
import argparse
|
||
from datetime import datetime
|
||
from loguru import logger
|
||
from tqdm import tqdm
|
||
import sys
|
||
import requests
|
||
import json
|
||
import time
|
||
from typing import List, Tuple, Optional
|
||
|
||
# 配置日志
|
||
logger.remove()
|
||
logger.add(sys.stderr, level="INFO", format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>")
|
||
|
||
# 动态导入playwright-get-data.py
|
||
import importlib.util
|
||
playwright_data_path = os.path.join(os.path.dirname(__file__), "playwright-get-data.py")
|
||
spec = importlib.util.spec_from_file_location("playwright_get_data", playwright_data_path)
|
||
playwright_get_data = importlib.util.module_from_spec(spec)
|
||
spec.loader.exec_module(playwright_get_data)
|
||
ProductHuntScraper = playwright_get_data.ProductHuntScraper
|
||
|
||
|
||
class IntegratedProductSystem:
|
||
"""全功能产品抓取与分析系统"""
|
||
|
||
def __init__(self, tophub_db_path=None, product_db_path=None, debug_port=9222,
|
||
limit=0, skip_duplicates=True, api_key=""):
|
||
"""
|
||
初始化系统
|
||
|
||
Args:
|
||
tophub_db_path: tophub数据库路径
|
||
product_db_path: 产品数据库路径
|
||
debug_port: Chrome调试端口
|
||
limit: 抓取链接数量限制
|
||
skip_duplicates: 是否跳过已存在的URL
|
||
api_key: API密钥(Ollama不需要)
|
||
"""
|
||
# 设置数据库路径
|
||
if tophub_db_path:
|
||
self.tophub_db_path = tophub_db_path
|
||
else:
|
||
self.tophub_db_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "tophub_data.db")
|
||
|
||
if product_db_path:
|
||
self.product_db_path = product_db_path
|
||
else:
|
||
self.product_db_path = os.path.join(os.path.dirname(__file__), "products.db")
|
||
|
||
self.debug_port = debug_port
|
||
self.limit = limit
|
||
self.skip_duplicates = skip_duplicates
|
||
self.api_key = api_key
|
||
self.api_url = "http://localhost:11434/api/generate"
|
||
self.product_urls = []
|
||
|
||
logger.info(f"初始化全功能产品系统,数据库: {self.product_db_path}")
|
||
|
||
def connect_to_database(self) -> sqlite3.Connection:
|
||
"""连接到SQLite数据库"""
|
||
try:
|
||
conn = sqlite3.connect(self.product_db_path)
|
||
logger.success(f"成功连接到数据库: {self.product_db_path}")
|
||
return conn
|
||
except Exception as e:
|
||
logger.error(f"连接数据库失败: {e}")
|
||
raise
|
||
|
||
def init_database(self):
|
||
"""初始化数据库,创建所有需要的表"""
|
||
logger.info("正在初始化产品数据库...")
|
||
|
||
try:
|
||
conn = sqlite3.connect(self.product_db_path)
|
||
cursor = conn.cursor()
|
||
|
||
# 创建产品信息表(来自integrated_scraper.py)
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS products (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
url TEXT NOT NULL UNIQUE,
|
||
name TEXT,
|
||
introduction TEXT,
|
||
user_count TEXT,
|
||
maker_link TEXT,
|
||
maker_statement TEXT,
|
||
created_at TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL
|
||
)
|
||
''')
|
||
|
||
# 创建分析结果表
|
||
# 根据最新数据库结构更新
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS product_analysis (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
original_name TEXT,
|
||
product_intro TEXT,
|
||
development_difficulty TEXT,
|
||
ai_response TEXT,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
difficulty_score INTEGER,
|
||
product_link TEXT,
|
||
follows INTEGER
|
||
)
|
||
''')
|
||
|
||
# 为现有表添加follows字段(如果不存在)
|
||
cursor.execute("PRAGMA table_info(product_analysis)")
|
||
columns = [col[1] for col in cursor.fetchall()]
|
||
if 'follows' not in columns:
|
||
cursor.execute("ALTER TABLE product_analysis ADD COLUMN follows INTEGER")
|
||
logger.info("已为product_analysis表添加follows字段")
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
logger.success("产品数据库初始化完成")
|
||
|
||
except Exception as e:
|
||
logger.error(f"初始化数据库失败: {e}")
|
||
|
||
def query_producthunt_urls(self, limit=None):
|
||
"""查询包含producthunt.com的链接"""
|
||
if limit is None:
|
||
limit = self.limit
|
||
|
||
logger.info(f"正在查询tophub_data.db数据库,限制: {limit}条")
|
||
|
||
try:
|
||
conn = sqlite3.connect(self.tophub_db_path)
|
||
cursor = conn.cursor()
|
||
|
||
# 查询包含producthunt.com的链接
|
||
cursor.execute("SELECT url FROM articles WHERE url LIKE '%producthunt.com%'")
|
||
|
||
urls = [row[0] for row in cursor.fetchall()]
|
||
|
||
conn.close()
|
||
|
||
logger.success(f"找到 {len(urls)} 个包含producthunt.com的链接")
|
||
return urls
|
||
|
||
except Exception as e:
|
||
logger.error(f"查询数据库失败: {e}")
|
||
return []
|
||
|
||
def check_duplicate(self, url):
|
||
"""检查URL是否已存在"""
|
||
try:
|
||
conn = sqlite3.connect(self.product_db_path)
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute("SELECT COUNT(*) FROM products WHERE url = ?", (url,))
|
||
count = cursor.fetchone()[0]
|
||
|
||
conn.close()
|
||
return count > 0
|
||
|
||
except Exception as e:
|
||
logger.error(f"检查重复失败: {e}")
|
||
return False
|
||
|
||
def save_product_info(self, product_info):
|
||
"""保存产品信息到数据库"""
|
||
try:
|
||
conn = sqlite3.connect(self.product_db_path)
|
||
cursor = conn.cursor()
|
||
|
||
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
# 检查是否已存在
|
||
cursor.execute("SELECT id FROM products WHERE url = ?", (product_info['url'],))
|
||
existing = cursor.fetchone()
|
||
|
||
if existing:
|
||
# 更新现有记录
|
||
cursor.execute('''
|
||
UPDATE products SET
|
||
name = ?, introduction = ?, user_count = ?,
|
||
maker_link = ?, maker_statement = ?, updated_at = ?
|
||
WHERE url = ?
|
||
''', (
|
||
product_info.get('name'),
|
||
product_info.get('introduction'),
|
||
product_info.get('user_count'),
|
||
product_info.get('maker_link'),
|
||
product_info.get('maker_statement'),
|
||
current_time,
|
||
product_info['url']
|
||
))
|
||
logger.info(f"更新产品信息: {product_info.get('name', '未知')}")
|
||
else:
|
||
# 插入新记录
|
||
cursor.execute('''
|
||
INSERT INTO products
|
||
(url, name, introduction, user_count, maker_link, maker_statement, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||
''', (
|
||
product_info['url'],
|
||
product_info.get('name'),
|
||
product_info.get('introduction'),
|
||
product_info.get('user_count'),
|
||
product_info.get('maker_link'),
|
||
product_info.get('maker_statement'),
|
||
current_time,
|
||
current_time
|
||
))
|
||
logger.info(f"新增产品信息: {product_info.get('name', '未知')}")
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"保存产品信息失败: {e}")
|
||
return False
|
||
|
||
async def scrape_product_info(self, url):
|
||
"""使用playwright-get-data.py中的专业功能抓取产品信息"""
|
||
try:
|
||
logger.info(f"开始抓取: {url}")
|
||
|
||
# 创建ProductHuntScraper实例
|
||
scraper = ProductHuntScraper(debug_port=self.debug_port)
|
||
|
||
# 连接到已运行的Chrome实例
|
||
connected = await scraper.connect_to_existing_chrome()
|
||
if not connected:
|
||
logger.error("连接Chrome失败,跳过此URL")
|
||
return None
|
||
|
||
# 导航到ProductHunt页面
|
||
navigated = await scraper.navigate_to_producthunt(url)
|
||
if not navigated:
|
||
logger.error("导航到页面失败,跳过此URL")
|
||
await scraper.close()
|
||
return None
|
||
|
||
# 提取产品信息
|
||
product_info = await scraper.extract_product_info()
|
||
if product_info:
|
||
product_info['url'] = url
|
||
logger.success(f"成功提取产品信息: {product_info.get('name', '未知')}")
|
||
else:
|
||
logger.error("提取产品信息失败")
|
||
|
||
# 关闭连接
|
||
await scraper.close()
|
||
|
||
return product_info
|
||
|
||
except Exception as e:
|
||
logger.error(f"抓取产品信息失败: {e}")
|
||
return None
|
||
|
||
def get_product_data(self, conn: sqlite3.Connection) -> List[Tuple]:
|
||
"""从数据库获取产品数据"""
|
||
try:
|
||
cursor = conn.cursor()
|
||
|
||
# 查询products表中的id、name、introduction、user_count和url字段
|
||
cursor.execute("""
|
||
SELECT id, name, introduction, user_count, url
|
||
FROM products
|
||
WHERE name IS NOT NULL AND introduction IS NOT NULL
|
||
AND name != '' AND introduction != ''
|
||
""")
|
||
|
||
products = cursor.fetchall()
|
||
logger.info(f"从数据库获取到 {len(products)} 个产品")
|
||
|
||
# 显示前几个产品作为示例
|
||
for i, (id, name, intro, user_count, url) in enumerate(products[:3], 1):
|
||
logger.info(f"示例产品{i}: ID={id}, 名称='{name}', 简介='{intro[:50]}...', 用户数='{user_count}', URL='{url}'")
|
||
|
||
return products
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取产品数据失败: {e}")
|
||
raise
|
||
|
||
def call_ollama_ai_api(self, name: str, introduction: str) -> Optional[str]:
|
||
"""调用Ollama AI API进行分析"""
|
||
try:
|
||
# 构建请求数据 - 使用Ollama API格式
|
||
prompt = f"这个是【{name}】,简介内容是【{introduction}】。请把产品的简介翻译成中文,并返回假设一个人加上AI辅助能否开发这个产品,请详细回答。返回的内容是产品名称/产品简介/开发难度。返回的例子一:notion/这个是笔记产品等等/一个人开发难度较高"
|
||
|
||
data = {
|
||
"model": "qwen3:8b",
|
||
"prompt": prompt,
|
||
"stream": False
|
||
}
|
||
|
||
headers = {
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
logger.info(f"调用Ollama AI API分析产品: {name}")
|
||
|
||
response = requests.post(
|
||
self.api_url,
|
||
headers=headers,
|
||
data=json.dumps(data, ensure_ascii=False),
|
||
timeout=60
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
content = result.get("response", "")
|
||
logger.success(f"API调用成功: {name}")
|
||
return content
|
||
else:
|
||
logger.error(f"API调用失败: {response.status_code}, {response.text}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"调用Ollama AI API时出错: {e}")
|
||
return None
|
||
|
||
def convert_user_count_to_number(self, user_count: str) -> Optional[int]:
|
||
"""使用Ollama API将user_count文本转换为数字
|
||
|
||
Args:
|
||
user_count: 用户数量文本,如"53 followers"或"1.9K followers"
|
||
|
||
Returns:
|
||
转换后的数字,或None如果转换失败
|
||
"""
|
||
if not user_count or user_count.strip() == "":
|
||
logger.info(f"空的用户数量: {user_count}")
|
||
return None
|
||
|
||
try:
|
||
logger.info(f"正在转换用户数量: {user_count}")
|
||
|
||
# 构建请求数据,专门用于用户数量转换
|
||
prompt = f"请将以下用户数量文本转换为纯数字,不要包含任何其他内容:\n{user_count}\n\n转换规则:\n- 直接数字:如'53 followers' → 53\n- K表示千:如'1.9K followers' → 1900\n- M表示百万:如'2.5M followers' → 2500000\n- 只返回数字,不要添加任何单位或解释"
|
||
|
||
data = {
|
||
"model": "qwen3:8b",
|
||
"prompt": prompt,
|
||
"stream": False
|
||
}
|
||
|
||
headers = {
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
# 调用Ollama API
|
||
response = requests.post(
|
||
self.api_url,
|
||
headers=headers,
|
||
data=json.dumps(data, ensure_ascii=False),
|
||
timeout=30
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
converted = result.get("response", "").strip()
|
||
logger.success(f"成功转换用户数量: {user_count} → {converted}")
|
||
|
||
# 提取纯数字
|
||
import re
|
||
number_match = re.search(r'\d+(?:\.\d+)?', converted)
|
||
if number_match:
|
||
return int(float(number_match.group()))
|
||
else:
|
||
logger.error(f"无法从转换结果中提取数字: {converted}")
|
||
return None
|
||
else:
|
||
logger.error(f"Ollama API调用失败: {response.status_code}, {response.text}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"转换用户数量时出错: {e}")
|
||
return None
|
||
|
||
def parse_ai_response(self, response: str) -> Tuple[str, str, str, int]:
|
||
"""解析AI响应内容,提取产品名称、简介、难度描述和难度分数"""
|
||
try:
|
||
# 使用/分割响应内容
|
||
parts = response.split('/')
|
||
|
||
product_intro = ""
|
||
difficulty = ""
|
||
difficulty_score = None
|
||
|
||
if len(parts) >= 3:
|
||
# 不再使用product_name,直接从原始名称中获取
|
||
product_intro = parts[1].strip()
|
||
difficulty = parts[2].strip()
|
||
|
||
logger.info(f"解析结果: 简介='{product_intro[:30]}...', 难度='{difficulty}'")
|
||
|
||
# 从难度描述中提取分数
|
||
import re
|
||
# 尝试匹配数字分数
|
||
score_match = re.search(r'\b(\d+)\b分|\b难度(\d+)\b|\b(\d+)\b', difficulty)
|
||
if score_match:
|
||
# 获取第一个匹配的数字
|
||
for group in score_match.groups():
|
||
if group:
|
||
difficulty_score = int(group)
|
||
break
|
||
|
||
# 如果没有提取到分数,根据关键词设置默认分数
|
||
if difficulty_score is None:
|
||
difficulty_lower = difficulty.lower()
|
||
if any(keyword in difficulty_lower for keyword in ['高', '很难', '非常难', '复杂']):
|
||
difficulty_score = 85
|
||
elif any(keyword in difficulty_lower for keyword in ['中', '一般', '适中', '普通']):
|
||
difficulty_score = 60
|
||
elif any(keyword in difficulty_lower for keyword in ['低', '简单', '容易']):
|
||
difficulty_score = 35
|
||
else:
|
||
difficulty_score = 50 # 默认中等难度
|
||
|
||
logger.info(f"提取到难度分数: {difficulty_score}")
|
||
else:
|
||
logger.warning(f"响应格式不符合预期: {response}")
|
||
# 如果格式不符合,返回原始内容
|
||
difficulty = response
|
||
difficulty_score = 50 # 默认中等难度
|
||
|
||
return product_intro, difficulty, difficulty_score
|
||
|
||
except Exception as e:
|
||
logger.error(f"解析AI响应失败: {e}")
|
||
return "", response, 50
|
||
|
||
def check_product_exists_in_analysis(self, conn: sqlite3.Connection, original_name: str) -> bool:
|
||
"""检查产品是否已存在于分析结果表中"""
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
SELECT COUNT(*) FROM product_analysis
|
||
WHERE original_name = ?
|
||
""", (original_name,))
|
||
|
||
count = cursor.fetchone()[0]
|
||
exists = count > 0
|
||
|
||
if exists:
|
||
logger.info(f"产品 '{original_name}' 已存在,跳过分析")
|
||
|
||
return exists
|
||
|
||
except Exception as e:
|
||
logger.error(f"检查产品存在性失败: {e}")
|
||
return False
|
||
|
||
def save_analysis_result(self, conn: sqlite3.Connection,
|
||
original_name: str, difficulty: str,
|
||
ai_response: str, difficulty_score: int = None,
|
||
product_link: str = None, follows: int = None):
|
||
"""保存分析结果到数据库,包括难度分数、产品链接和关注数"""
|
||
try:
|
||
cursor = conn.cursor()
|
||
|
||
# 如果没有提供难度分数,设置默认值50
|
||
if difficulty_score is None:
|
||
difficulty_score = 50
|
||
|
||
cursor.execute("""
|
||
INSERT INTO product_analysis
|
||
(original_name, development_difficulty, difficulty_score, ai_response, product_link, follows)
|
||
VALUES (?, ?, ?, ?, ?, ?)
|
||
""", (original_name, difficulty, difficulty_score, ai_response, product_link, follows))
|
||
|
||
conn.commit()
|
||
logger.success(f"保存分析结果成功: {original_name}, 难度分数: {difficulty_score}, 关注数: {follows}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"保存分析结果失败: {e}")
|
||
raise
|
||
|
||
def analyze_products(self, max_products: int = None):
|
||
"""分析产品数据"""
|
||
if max_products is None:
|
||
logger.info("开始分析所有产品数据")
|
||
else:
|
||
logger.info(f"开始分析产品数据,最大数量: {max_products}")
|
||
|
||
conn = None
|
||
try:
|
||
# 连接数据库
|
||
conn = self.connect_to_database()
|
||
|
||
# 获取产品数据
|
||
products = self.get_product_data(conn)
|
||
|
||
if not products:
|
||
logger.warning("没有找到可分析的产品数据")
|
||
return
|
||
|
||
# 限制分析数量
|
||
if max_products is not None:
|
||
products_to_analyze = products[:max_products]
|
||
else:
|
||
products_to_analyze = products
|
||
|
||
logger.info(f"准备分析 {len(products_to_analyze)} 个产品")
|
||
|
||
# 逐个分析产品
|
||
success_count = 0
|
||
skip_count = 0
|
||
for i, (original_id, name, introduction, user_count, url) in enumerate(products_to_analyze, 1):
|
||
logger.info(f"\n分析进度: {i}/{len(products_to_analyze)} - {name}")
|
||
|
||
# 检查产品是否已存在
|
||
if self.check_product_exists_in_analysis(conn, name):
|
||
skip_count += 1
|
||
logger.info(f"跳过已存在产品,当前进度: {i}/{len(products_to_analyze)}")
|
||
continue
|
||
|
||
# 显示API调用状态
|
||
logger.info(f"正在提交API请求... 进度: {i}/{len(products_to_analyze)}")
|
||
|
||
# 调用AI API分析产品
|
||
ai_response = self.call_ollama_ai_api(name, introduction)
|
||
|
||
if ai_response:
|
||
# 显示数据处理状态
|
||
logger.info(f"API调用成功,正在处理数据...")
|
||
|
||
# 解析响应
|
||
product_intro, difficulty, difficulty_score = self.parse_ai_response(ai_response)
|
||
|
||
# 转换用户关注数
|
||
follows = None
|
||
if user_count:
|
||
follows = self.convert_user_count_to_number(user_count)
|
||
|
||
# 保存结果
|
||
self.save_analysis_result(conn, name, difficulty, ai_response, difficulty_score, url, follows)
|
||
success_count += 1
|
||
|
||
# 显示完成状态
|
||
logger.success(f"产品 '{name}' 分析完成,进度: {i}/{len(products_to_analyze)}")
|
||
else:
|
||
logger.error(f"分析失败: {name}")
|
||
|
||
# 处理完数据后延时2秒
|
||
logger.info("数据处理完成,等待2秒后继续...")
|
||
time.sleep(2)
|
||
|
||
logger.success(f"分析完成! 成功分析 {success_count} 个产品,跳过 {skip_count} 个已存在产品")
|
||
|
||
except Exception as e:
|
||
logger.error(f"分析过程中出错: {e}")
|
||
finally:
|
||
if conn:
|
||
conn.close()
|
||
logger.info("数据库连接已关闭")
|
||
|
||
async def run_scraping(self, urls=None):
|
||
"""运行抓取任务"""
|
||
logger.info("=== 开始ProductHunt数据抓取 ===")
|
||
|
||
# 获取要抓取的URL列表
|
||
if urls is None:
|
||
self.product_urls = self.query_producthunt_urls()
|
||
else:
|
||
self.product_urls = urls
|
||
|
||
if not self.product_urls:
|
||
logger.error("未找到要抓取的ProductHunt链接")
|
||
return False
|
||
|
||
logger.info(f"找到 {len(self.product_urls)} 个ProductHunt链接")
|
||
|
||
# 统计抓取结果
|
||
success_count = 0
|
||
skip_count = 0
|
||
error_count = 0
|
||
|
||
# 使用进度条显示处理进度
|
||
with tqdm(total=len(self.product_urls), desc="抓取ProductHunt链接") as pbar:
|
||
for url in self.product_urls:
|
||
logger.info(f"处理URL: {url}")
|
||
|
||
# 检查是否已存在
|
||
if self.skip_duplicates and self.check_duplicate(url):
|
||
logger.info(f"URL已存在,跳过: {url}")
|
||
skip_count += 1
|
||
pbar.update(1)
|
||
continue
|
||
|
||
# 抓取产品信息
|
||
product_info = await self.scrape_product_info(url)
|
||
|
||
if product_info:
|
||
# 保存到数据库
|
||
success = self.save_product_info(product_info)
|
||
if success:
|
||
logger.success(f"成功保存产品信息: {product_info.get('name', '未知')}")
|
||
success_count += 1
|
||
else:
|
||
logger.error(f"保存产品信息失败: {url}")
|
||
error_count += 1
|
||
else:
|
||
logger.error(f"抓取产品信息失败: {url}")
|
||
error_count += 1
|
||
|
||
pbar.update(1)
|
||
|
||
# 显示抓取结果统计
|
||
self.show_scraping_results(success_count, skip_count, error_count)
|
||
|
||
logger.success("=== ProductHunt数据抓取完成 ===")
|
||
return True
|
||
|
||
def show_scraping_results(self, success_count, skip_count, error_count):
|
||
"""显示抓取结果统计"""
|
||
try:
|
||
conn = sqlite3.connect(self.product_db_path)
|
||
cursor = conn.cursor()
|
||
|
||
# 统计数据库中的产品数量
|
||
cursor.execute("SELECT COUNT(*) FROM products")
|
||
total_count = cursor.fetchone()[0]
|
||
|
||
# 获取最新抓取的产品信息
|
||
cursor.execute("SELECT name, url FROM products ORDER BY updated_at DESC LIMIT 10")
|
||
recent_products = cursor.fetchall()
|
||
|
||
conn.close()
|
||
|
||
logger.info("=== 抓取结果统计 ===")
|
||
logger.info(f"成功抓取: {success_count} 个产品")
|
||
logger.info(f"跳过重复: {skip_count} 个链接")
|
||
logger.info(f"抓取失败: {error_count} 个链接")
|
||
logger.info(f"数据库中的产品总数: {total_count}")
|
||
|
||
if recent_products:
|
||
logger.info("最新抓取的产品:")
|
||
for name, url in recent_products:
|
||
logger.info(f" - {name}: {url}")
|
||
else:
|
||
logger.info("数据库中暂无产品记录")
|
||
|
||
except Exception as e:
|
||
logger.error(f"显示抓取结果失败: {e}")
|
||
|
||
def analyze_missing_scores(self):
|
||
"""分析并补充缺失难度分数的产品"""
|
||
logger.info("=== 开始分析缺失难度分数的产品 ===")
|
||
|
||
conn = None
|
||
try:
|
||
# 连接数据库
|
||
conn = self.connect_to_database()
|
||
cursor = conn.cursor()
|
||
|
||
# 查询缺失难度分数的产品
|
||
cursor.execute("""
|
||
SELECT pa.id, pa.original_name, pa.product_intro, pa.ai_response
|
||
FROM product_analysis pa
|
||
WHERE pa.difficulty_score IS NULL OR pa.difficulty_score = ''
|
||
""")
|
||
|
||
products_with_missing_scores = cursor.fetchall()
|
||
logger.info(f"找到 {len(products_with_missing_scores)} 个缺失难度分数的产品")
|
||
|
||
if not products_with_missing_scores:
|
||
logger.info("没有发现缺失难度分数的产品")
|
||
return
|
||
|
||
# 为每个缺失分数的产品分析并更新分数
|
||
updated_count = 0
|
||
for i, (analysis_id, name, introduction, ai_response) in enumerate(products_with_missing_scores, 1):
|
||
logger.info(f"处理缺失分数的产品 {i}/{len(products_with_missing_scores)}: {name}")
|
||
|
||
# 如果已有AI响应,从响应中重新提取分数
|
||
difficulty_score = None
|
||
if ai_response:
|
||
try:
|
||
_, _, _, difficulty_score = self.parse_ai_response(ai_response)
|
||
logger.info(f"从现有AI响应中提取分数: {difficulty_score}")
|
||
except Exception as e:
|
||
logger.error(f"从现有响应提取分数失败: {e}")
|
||
|
||
# 如果无法从现有响应提取,重新调用API
|
||
if difficulty_score is None:
|
||
logger.info(f"重新调用API分析产品: {name}")
|
||
ai_response = self.call_ollama_ai_api(name, introduction)
|
||
if ai_response:
|
||
_, _, _, difficulty_score = self.parse_ai_response(ai_response)
|
||
# 更新AI响应
|
||
cursor.execute("""
|
||
UPDATE product_analysis
|
||
SET ai_response = ?
|
||
WHERE id = ?
|
||
""", (ai_response, analysis_id))
|
||
|
||
# 更新难度分数
|
||
if difficulty_score is not None:
|
||
cursor.execute("""
|
||
UPDATE product_analysis
|
||
SET difficulty_score = ?
|
||
WHERE id = ?
|
||
""", (difficulty_score, analysis_id))
|
||
conn.commit()
|
||
updated_count += 1
|
||
logger.success(f"成功更新产品 '{name}' 的难度分数为 {difficulty_score}")
|
||
else:
|
||
logger.warning(f"无法为产品 '{name}' 确定难度分数")
|
||
|
||
# 避免API调用过于频繁
|
||
if i < len(products_with_missing_scores):
|
||
time.sleep(2)
|
||
|
||
logger.success(f"缺失分数分析完成! 成功更新 {updated_count} 个产品的难度分数")
|
||
|
||
except Exception as e:
|
||
logger.error(f"分析缺失分数过程中出错: {e}")
|
||
finally:
|
||
if conn:
|
||
conn.close()
|
||
logger.info("数据库连接已关闭")
|
||
|
||
def analyze_follower_counts(self):
|
||
"""分析并更新产品的关注数,仅当follows字段为空或不存在时更新"""
|
||
logger.info("=== 开始分析产品关注数 ===")
|
||
|
||
conn = None
|
||
try:
|
||
# 连接数据库
|
||
conn = self.connect_to_database()
|
||
cursor = conn.cursor()
|
||
|
||
# 查询所有产品及其对应的分析记录,仅包括follows字段为空或不存在的记录
|
||
cursor.execute("""
|
||
SELECT p.id, p.name, p.user_count, pa.id as analysis_id, pa.follows
|
||
FROM products p
|
||
LEFT JOIN product_analysis pa ON p.name = pa.original_name
|
||
WHERE p.user_count IS NOT NULL AND p.user_count != ''
|
||
AND pa.id IS NOT NULL
|
||
AND (pa.follows IS NULL OR pa.follows = '')
|
||
""")
|
||
|
||
products = cursor.fetchall()
|
||
logger.info(f"找到 {len(products)} 个需要更新关注数的产品")
|
||
|
||
if not products:
|
||
logger.info("没有发现需要更新关注数的产品")
|
||
return
|
||
|
||
# 为每个产品转换user_count并更新到product_analysis.follows
|
||
updated_count = 0
|
||
for i, (product_id, name, user_count, analysis_id, current_follows) in enumerate(products, 1):
|
||
logger.info(f"处理产品关注数 {i}/{len(products)}: {name}, 用户数: {user_count}")
|
||
|
||
if not analysis_id:
|
||
logger.info(f"产品 '{name}' 没有对应的分析记录,跳过")
|
||
continue
|
||
|
||
# 转换用户关注数
|
||
follows = self.convert_user_count_to_number(user_count)
|
||
|
||
# 更新关注数
|
||
if follows is not None:
|
||
cursor.execute("""
|
||
UPDATE product_analysis
|
||
SET follows = ?
|
||
WHERE id = ?
|
||
""", (follows, analysis_id))
|
||
conn.commit()
|
||
updated_count += 1
|
||
logger.success(f"成功更新产品 '{name}' 的关注数为 {follows}")
|
||
else:
|
||
logger.warning(f"无法为产品 '{name}' 转换关注数")
|
||
|
||
# 避免API调用过于频繁
|
||
if i < len(products):
|
||
time.sleep(2)
|
||
|
||
logger.success(f"关注数分析完成! 成功更新 {updated_count} 个产品的关注数")
|
||
|
||
except Exception as e:
|
||
logger.error(f"分析关注数过程中出错: {e}")
|
||
finally:
|
||
if conn:
|
||
conn.close()
|
||
logger.info("数据库连接已关闭")
|
||
|
||
def reanalyze_invalid_difficulty_scores(self):
|
||
"""重新分析difficulty_score为1的行,确保难度评分准确"""
|
||
logger.info("=== 开始重新分析无效难度评分 ===")
|
||
|
||
conn = None
|
||
try:
|
||
# 连接数据库
|
||
conn = self.connect_to_database()
|
||
cursor = conn.cursor()
|
||
|
||
# 查询difficulty_score为1的记录
|
||
cursor.execute("""
|
||
SELECT id, original_name, product_intro, development_difficulty, ai_response
|
||
FROM product_analysis
|
||
WHERE difficulty_score = 1
|
||
""")
|
||
|
||
invalid_records = cursor.fetchall()
|
||
logger.info(f"找到 {len(invalid_records)} 条difficulty_score为1的记录需要重新分析")
|
||
|
||
if not invalid_records:
|
||
logger.info("没有发现需要重新分析的无效难度评分记录")
|
||
return
|
||
|
||
# 为每个无效记录重新分析难度
|
||
updated_count = 0
|
||
for i, (analysis_id, name, introduction, development_difficulty, ai_response) in enumerate(invalid_records, 1):
|
||
logger.info(f"重新分析记录 {i}/{len(invalid_records)}: {name}")
|
||
|
||
# 调用AI API重新分析产品难度
|
||
logger.info(f"重新调用Ollama API分析产品难度: {name}")
|
||
|
||
# 构建请求数据 - 使用Ollama API格式,专门用于难度分析
|
||
prompt = f"这个是【{name}】,简介内容是【{introduction}】。请重新分析这个产品的开发难度,特别是对于一个人加上AI辅助能否开发这个产品,请详细回答。返回的内容是产品名称/产品简介/开发难度。返回的例子一:notion/这个是笔记产品等等/一个人开发难度较高"
|
||
|
||
data = {
|
||
"model": "qwen3:8b",
|
||
"prompt": prompt,
|
||
"stream": False
|
||
}
|
||
|
||
headers = {
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
try:
|
||
# 调用Ollama API
|
||
response = requests.post(
|
||
self.api_url,
|
||
headers=headers,
|
||
data=json.dumps(data, ensure_ascii=False),
|
||
timeout=60
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
new_ai_response = result.get("response", "").strip()
|
||
logger.success(f"成功重新分析产品 '{name}'")
|
||
|
||
# 解析新的响应,获取难度分数
|
||
_, new_difficulty, new_difficulty_score = self.parse_ai_response(new_ai_response)
|
||
|
||
# 特别处理很难的情况,确保分数在70-90之间
|
||
difficulty_lower = new_difficulty.lower()
|
||
if any(keyword in difficulty_lower for keyword in ['高', '很难', '非常难', '复杂', '困难']):
|
||
if new_difficulty_score < 70:
|
||
new_difficulty_score = max(70, min(90, new_difficulty_score + 60))
|
||
logger.info(f"调整很难产品的难度分数为: {new_difficulty_score} (70-90区间)")
|
||
|
||
# 更新数据库记录
|
||
cursor.execute("""
|
||
UPDATE product_analysis
|
||
SET development_difficulty = ?,
|
||
difficulty_score = ?,
|
||
ai_response = ?
|
||
WHERE id = ?
|
||
""", (new_difficulty, new_difficulty_score, new_ai_response, analysis_id))
|
||
|
||
conn.commit()
|
||
updated_count += 1
|
||
logger.success(f"成功更新产品 '{name}' 的难度分数为 {new_difficulty_score}")
|
||
else:
|
||
logger.error(f"API调用失败: {response.status_code}, {response.text}")
|
||
except Exception as e:
|
||
logger.error(f"重新分析产品 '{name}' 失败: {e}")
|
||
|
||
# 避免API调用过于频繁
|
||
if i < len(invalid_records):
|
||
time.sleep(2)
|
||
|
||
logger.success(f"无效难度评分重新分析完成! 成功更新 {updated_count} 条记录")
|
||
|
||
except Exception as e:
|
||
logger.error(f"重新分析无效难度评分过程中出错: {e}")
|
||
finally:
|
||
if conn:
|
||
conn.close()
|
||
logger.info("数据库连接已关闭")
|
||
|
||
def fill_missing_product_links(self):
|
||
"""检查product_analysis表中的product_link字段是否为空,如果为空则从tophub_data.db补全"""
|
||
logger.info("=== 开始补全缺失的product_link字段 ===")
|
||
|
||
# 检查tophub_data.db是否存在
|
||
tophub_db_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "tophub_data.db")
|
||
if not os.path.exists(tophub_db_path):
|
||
logger.error(f"tophub_data.db不存在: {tophub_db_path}")
|
||
return
|
||
|
||
conn_product = None
|
||
conn_tophub = None
|
||
try:
|
||
# 连接两个数据库
|
||
conn_product = self.connect_to_database()
|
||
cursor_product = conn_product.cursor()
|
||
|
||
conn_tophub = sqlite3.connect(tophub_db_path)
|
||
cursor_tophub = conn_tophub.cursor()
|
||
logger.success(f"成功连接到tophub_data.db: {tophub_db_path}")
|
||
|
||
# 查询product_link为空的记录
|
||
cursor_product.execute("""
|
||
SELECT id, original_name
|
||
FROM product_analysis
|
||
WHERE product_link IS NULL OR product_link = ''
|
||
""")
|
||
|
||
missing_link_records = cursor_product.fetchall()
|
||
logger.info(f"找到 {len(missing_link_records)} 条product_link为空的记录需要补全")
|
||
|
||
if not missing_link_records:
|
||
logger.info("没有发现需要补全product_link的记录")
|
||
return
|
||
|
||
# 获取tophub_data.db中的所有producthunt链接
|
||
cursor_tophub.execute("SELECT url FROM articles WHERE url LIKE '%producthunt.com%'")
|
||
tophub_urls = [row[0] for row in cursor_tophub.fetchall()]
|
||
logger.info(f"从tophub_data.db获取到 {len(tophub_urls)} 个producthunt链接")
|
||
|
||
if not tophub_urls:
|
||
logger.error("从tophub_data.db中没有找到producthunt链接")
|
||
return
|
||
|
||
# 为每个缺失product_link的记录查找匹配的URL
|
||
updated_count = 0
|
||
for i, (analysis_id, original_name) in enumerate(missing_link_records, 1):
|
||
logger.info(f"处理记录 {i}/{len(missing_link_records)}: {original_name}")
|
||
|
||
# 查找匹配的URL
|
||
matched_url = None
|
||
for url in tophub_urls:
|
||
# 简单的匹配逻辑:如果产品名称在URL中出现
|
||
if original_name.lower() in url.lower():
|
||
matched_url = url
|
||
break
|
||
|
||
if matched_url:
|
||
# 更新product_link字段
|
||
cursor_product.execute("""
|
||
UPDATE product_analysis
|
||
SET product_link = ?
|
||
WHERE id = ?
|
||
""", (matched_url, analysis_id))
|
||
conn_product.commit()
|
||
updated_count += 1
|
||
logger.success(f"成功为产品 '{original_name}' 补全链接: {matched_url}")
|
||
else:
|
||
logger.warning(f"无法为产品 '{original_name}' 找到匹配的链接")
|
||
|
||
logger.success(f"product_link补全完成! 成功更新 {updated_count} 条记录")
|
||
|
||
except Exception as e:
|
||
logger.error(f"补全product_link过程中出错: {e}")
|
||
finally:
|
||
# 关闭数据库连接
|
||
if conn_product:
|
||
conn_product.close()
|
||
if conn_tophub:
|
||
conn_tophub.close()
|
||
logger.info("数据库连接已关闭")
|
||
|
||
async def run_full_workflow_async(self, max_products=None, analyze_only=False):
|
||
"""异步运行完整工作流程:抓取+分析+补充缺失分数+更新关注数+重新分析无效难度评分+补全product_link"""
|
||
logger.info("=== 开始全功能产品系统工作流程 ===")
|
||
|
||
# 初始化数据库
|
||
self.init_database()
|
||
|
||
if not analyze_only:
|
||
# 步骤1: 抓取数据
|
||
logger.info("步骤1: 开始抓取ProductHunt数据...")
|
||
await self.run_scraping()
|
||
else:
|
||
logger.info("跳过抓取步骤,直接进行分析")
|
||
|
||
# 步骤2: AI分析
|
||
logger.info("步骤2: 开始AI分析产品数据...")
|
||
self.analyze_products(max_products)
|
||
|
||
# 步骤3: 分析并补充缺失的难度分数
|
||
logger.info("步骤3: 开始分析并补充缺失的难度分数...")
|
||
self.analyze_missing_scores()
|
||
|
||
# 步骤4: 分析并更新产品关注数
|
||
logger.info("步骤4: 开始分析并更新产品关注数...")
|
||
self.analyze_follower_counts()
|
||
|
||
# 步骤5: 重新分析invalid难度评分
|
||
logger.info("步骤5: 开始重新分析invalid难度评分...")
|
||
self.reanalyze_invalid_difficulty_scores()
|
||
|
||
# 步骤6: 补全缺失的product_link字段
|
||
logger.info("步骤6: 开始补全缺失的product_link字段...")
|
||
self.fill_missing_product_links()
|
||
|
||
logger.success("=== 全功能产品系统工作流程完成 ===")
|
||
|
||
def run_full_workflow(self, max_products=None, analyze_only=False):
|
||
"""运行完整工作流程:抓取+分析(同步入口)"""
|
||
# 创建新的事件循环来运行异步函数
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
try:
|
||
loop.run_until_complete(self.run_full_workflow_async(max_products, analyze_only))
|
||
finally:
|
||
loop.close()
|
||
|
||
|
||
def parse_arguments():
|
||
"""解析命令行参数"""
|
||
parser = argparse.ArgumentParser(description="全功能产品抓取与分析系统")
|
||
parser.add_argument("--tophub-db", help="tophub数据库路径", default=None)
|
||
parser.add_argument("--product-db", help="产品数据库路径", default=None)
|
||
parser.add_argument("--debug-port", type=int, help="Chrome调试端口", default=9222)
|
||
parser.add_argument("--limit", type=int, help="抓取链接数量限制", default=0)
|
||
parser.add_argument("--no-skip-duplicates", action="store_true", help="不跳过重复URL")
|
||
parser.add_argument("--urls", nargs="+", help="指定要抓取的URL列表")
|
||
parser.add_argument("--log-file", help="日志文件路径", default="integrated_product_system.log")
|
||
parser.add_argument("--max-products", type=int, help="最大分析产品数量", default=None)
|
||
parser.add_argument("--analyze-only", action="store_true", help="仅进行分析,不抓取数据")
|
||
|
||
return parser.parse_args()
|
||
|
||
|
||
async def main():
|
||
"""主函数"""
|
||
args = parse_arguments()
|
||
|
||
# 配置日志文件输出
|
||
logger.add(args.log_file, level="INFO", rotation="10 MB")
|
||
|
||
# 运行start_chrome.bat批处理程序
|
||
import subprocess
|
||
chrome_bat_path = os.path.join(os.path.dirname(__file__), "start_chrome.bat")
|
||
logger.info(f"正在运行Chrome启动脚本: {chrome_bat_path}")
|
||
try:
|
||
# 异步运行批处理程序,不等待其完成
|
||
subprocess.Popen([chrome_bat_path], shell=True)
|
||
logger.success("Chrome启动脚本已启动")
|
||
except Exception as e:
|
||
logger.error(f"Chrome启动脚本启动失败: {e}")
|
||
except FileNotFoundError:
|
||
logger.error(f"未找到Chrome启动脚本: {chrome_bat_path}")
|
||
|
||
# 创建系统实例
|
||
system = IntegratedProductSystem(
|
||
tophub_db_path=args.tophub_db,
|
||
product_db_path=args.product_db,
|
||
debug_port=args.debug_port,
|
||
limit=args.limit,
|
||
skip_duplicates=not args.no_skip_duplicates,
|
||
api_key="" # Ollama不需要API密钥
|
||
)
|
||
|
||
# 运行完整工作流程
|
||
if args.urls:
|
||
# 如果指定了URL,先抓取这些URL
|
||
await system.run_scraping(urls=args.urls)
|
||
# 然后进行分析
|
||
system.analyze_products(max_products=args.max_products)
|
||
else:
|
||
# 运行完整工作流程
|
||
await system.run_full_workflow_async(max_products=args.max_products, analyze_only=args.analyze_only)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 运行异步主函数
|
||
asyncio.run(main()) |