Files
tophux_scrape/product/integrated_product_system.py

757 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
全功能产品抓取与分析系统
整合integrated_scraper.py和product_ai_analysis.py的功能
支持从ProductHunt抓取数据并进行AI分析
"""
import sqlite3
import asyncio
import os
import argparse
from datetime import datetime
from loguru import logger
from tqdm import tqdm
import sys
import requests
import json
import time
from typing import List, Tuple, Optional
# 配置日志
logger.remove()
logger.add(sys.stderr, level="INFO", format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>")
# 动态导入playwright-get-data.py
import importlib.util
playwright_data_path = os.path.join(os.path.dirname(__file__), "playwright-get-data.py")
spec = importlib.util.spec_from_file_location("playwright_get_data", playwright_data_path)
playwright_get_data = importlib.util.module_from_spec(spec)
spec.loader.exec_module(playwright_get_data)
ProductHuntScraper = playwright_get_data.ProductHuntScraper
class IntegratedProductSystem:
"""全功能产品抓取与分析系统"""
def __init__(self, tophub_db_path=None, product_db_path=None, debug_port=9222,
limit=0, skip_duplicates=True, api_key=""):
"""
初始化系统
Args:
tophub_db_path: tophub数据库路径
product_db_path: 产品数据库路径
debug_port: Chrome调试端口
limit: 抓取链接数量限制
skip_duplicates: 是否跳过已存在的URL
api_key: API密钥Ollama不需要
"""
# 设置数据库路径
if tophub_db_path:
self.tophub_db_path = tophub_db_path
else:
self.tophub_db_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "tophub_data.db")
if product_db_path:
self.product_db_path = product_db_path
else:
self.product_db_path = os.path.join(os.path.dirname(__file__), "products.db")
self.debug_port = debug_port
self.limit = limit
self.skip_duplicates = skip_duplicates
self.api_key = api_key
self.api_url = "http://localhost:11434/api/generate"
self.product_urls = []
logger.info(f"初始化全功能产品系统,数据库: {self.product_db_path}")
def connect_to_database(self) -> sqlite3.Connection:
"""连接到SQLite数据库"""
try:
conn = sqlite3.connect(self.product_db_path)
logger.success(f"成功连接到数据库: {self.product_db_path}")
return conn
except Exception as e:
logger.error(f"连接数据库失败: {e}")
raise
def init_database(self):
"""初始化数据库,创建所有需要的表"""
logger.info("正在初始化产品数据库...")
try:
conn = sqlite3.connect(self.product_db_path)
cursor = conn.cursor()
# 创建产品信息表来自integrated_scraper.py
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT NOT NULL UNIQUE,
name TEXT,
introduction TEXT,
user_count TEXT,
maker_link TEXT,
maker_statement TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
''')
# 创建分析结果表
# 根据最新数据库结构更新
cursor.execute('''
CREATE TABLE IF NOT EXISTS product_analysis (
id INTEGER PRIMARY KEY AUTOINCREMENT,
original_name TEXT,
product_intro TEXT,
development_difficulty TEXT,
ai_response TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
difficulty_score INTEGER,
product_link TEXT
)
''')
conn.commit()
conn.close()
logger.success("产品数据库初始化完成")
except Exception as e:
logger.error(f"初始化数据库失败: {e}")
def query_producthunt_urls(self, limit=None):
"""查询包含producthunt.com的链接"""
if limit is None:
limit = self.limit
logger.info(f"正在查询tophub_data.db数据库限制: {limit}")
try:
conn = sqlite3.connect(self.tophub_db_path)
cursor = conn.cursor()
# 查询包含producthunt.com的链接
cursor.execute("SELECT url FROM articles WHERE url LIKE '%producthunt.com%'")
urls = [row[0] for row in cursor.fetchall()]
conn.close()
logger.success(f"找到 {len(urls)} 个包含producthunt.com的链接")
return urls
except Exception as e:
logger.error(f"查询数据库失败: {e}")
return []
def check_duplicate(self, url):
"""检查URL是否已存在"""
try:
conn = sqlite3.connect(self.product_db_path)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM products WHERE url = ?", (url,))
count = cursor.fetchone()[0]
conn.close()
return count > 0
except Exception as e:
logger.error(f"检查重复失败: {e}")
return False
def save_product_info(self, product_info):
"""保存产品信息到数据库"""
try:
conn = sqlite3.connect(self.product_db_path)
cursor = conn.cursor()
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 检查是否已存在
cursor.execute("SELECT id FROM products WHERE url = ?", (product_info['url'],))
existing = cursor.fetchone()
if existing:
# 更新现有记录
cursor.execute('''
UPDATE products SET
name = ?, introduction = ?, user_count = ?,
maker_link = ?, maker_statement = ?, updated_at = ?
WHERE url = ?
''', (
product_info.get('name'),
product_info.get('introduction'),
product_info.get('user_count'),
product_info.get('maker_link'),
product_info.get('maker_statement'),
current_time,
product_info['url']
))
logger.info(f"更新产品信息: {product_info.get('name', '未知')}")
else:
# 插入新记录
cursor.execute('''
INSERT INTO products
(url, name, introduction, user_count, maker_link, maker_statement, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
product_info['url'],
product_info.get('name'),
product_info.get('introduction'),
product_info.get('user_count'),
product_info.get('maker_link'),
product_info.get('maker_statement'),
current_time,
current_time
))
logger.info(f"新增产品信息: {product_info.get('name', '未知')}")
conn.commit()
conn.close()
return True
except Exception as e:
logger.error(f"保存产品信息失败: {e}")
return False
async def scrape_product_info(self, url):
"""使用playwright-get-data.py中的专业功能抓取产品信息"""
try:
logger.info(f"开始抓取: {url}")
# 创建ProductHuntScraper实例
scraper = ProductHuntScraper(debug_port=self.debug_port)
# 连接到已运行的Chrome实例
connected = await scraper.connect_to_existing_chrome()
if not connected:
logger.error("连接Chrome失败跳过此URL")
return None
# 导航到ProductHunt页面
navigated = await scraper.navigate_to_producthunt(url)
if not navigated:
logger.error("导航到页面失败跳过此URL")
await scraper.close()
return None
# 提取产品信息
product_info = await scraper.extract_product_info()
if product_info:
product_info['url'] = url
logger.success(f"成功提取产品信息: {product_info.get('name', '未知')}")
else:
logger.error("提取产品信息失败")
# 关闭连接
await scraper.close()
return product_info
except Exception as e:
logger.error(f"抓取产品信息失败: {e}")
return None
def get_product_data(self, conn: sqlite3.Connection) -> List[Tuple]:
"""从数据库获取产品数据"""
try:
cursor = conn.cursor()
# 查询products表中的name和introduction字段
cursor.execute("""
SELECT id, name, introduction
FROM products
WHERE name IS NOT NULL AND introduction IS NOT NULL
AND name != '' AND introduction != ''
""")
products = cursor.fetchall()
logger.info(f"从数据库获取到 {len(products)} 个产品")
# 显示前几个产品作为示例
for i, (id, name, intro) in enumerate(products[:3], 1):
logger.info(f"示例产品{i}: ID={id}, 名称='{name}', 简介='{intro[:50]}...'")
return products
except Exception as e:
logger.error(f"获取产品数据失败: {e}")
raise
def call_ollama_ai_api(self, name: str, introduction: str) -> Optional[str]:
"""调用Ollama AI API进行分析"""
try:
# 构建请求数据 - 使用Ollama API格式
prompt = f"这个是【{name}】,简介内容是【{introduction}】。请把产品的简介翻译成中文并返回假设一个人加上AI辅助能否开发这个产品请详细回答。返回的内容是产品名称/产品简介/开发难度。返回的例子一notion/这个是笔记产品等等/一个人开发难度较高"
data = {
"model": "qwen3:8b",
"prompt": prompt,
"stream": False
}
headers = {
"Content-Type": "application/json"
}
logger.info(f"调用Ollama AI API分析产品: {name}")
response = requests.post(
self.api_url,
headers=headers,
data=json.dumps(data, ensure_ascii=False),
timeout=60
)
if response.status_code == 200:
result = response.json()
content = result.get("response", "")
logger.success(f"API调用成功: {name}")
return content
else:
logger.error(f"API调用失败: {response.status_code}, {response.text}")
return None
except Exception as e:
logger.error(f"调用Ollama AI API时出错: {e}")
return None
def parse_ai_response(self, response: str) -> Tuple[str, str, str, int]:
"""解析AI响应内容提取产品名称、简介、难度描述和难度分数"""
try:
# 使用/分割响应内容
parts = response.split('/')
product_intro = ""
difficulty = ""
difficulty_score = None
if len(parts) >= 3:
# 不再使用product_name直接从原始名称中获取
product_intro = parts[1].strip()
difficulty = parts[2].strip()
logger.info(f"解析结果: 简介='{product_intro[:30]}...', 难度='{difficulty}'")
# 从难度描述中提取分数
import re
# 尝试匹配数字分数
score_match = re.search(r'\b(\d+)\b分|\b难度(\d+)\b|\b(\d+)\b', difficulty)
if score_match:
# 获取第一个匹配的数字
for group in score_match.groups():
if group:
difficulty_score = int(group)
break
# 如果没有提取到分数,根据关键词设置默认分数
if difficulty_score is None:
difficulty_lower = difficulty.lower()
if any(keyword in difficulty_lower for keyword in ['', '很难', '非常难', '复杂']):
difficulty_score = 85
elif any(keyword in difficulty_lower for keyword in ['', '一般', '适中', '普通']):
difficulty_score = 60
elif any(keyword in difficulty_lower for keyword in ['', '简单', '容易']):
difficulty_score = 35
else:
difficulty_score = 50 # 默认中等难度
logger.info(f"提取到难度分数: {difficulty_score}")
else:
logger.warning(f"响应格式不符合预期: {response}")
# 如果格式不符合,返回原始内容
difficulty = response
difficulty_score = 50 # 默认中等难度
return product_intro, difficulty, difficulty_score
except Exception as e:
logger.error(f"解析AI响应失败: {e}")
return "", response, 50
def check_product_exists_in_analysis(self, conn: sqlite3.Connection, original_name: str) -> bool:
"""检查产品是否已存在于分析结果表中"""
try:
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*) FROM product_analysis
WHERE original_name = ?
""", (original_name,))
count = cursor.fetchone()[0]
exists = count > 0
if exists:
logger.info(f"产品 '{original_name}' 已存在,跳过分析")
return exists
except Exception as e:
logger.error(f"检查产品存在性失败: {e}")
return False
def save_analysis_result(self, conn: sqlite3.Connection,
original_name: str, difficulty: str,
ai_response: str, difficulty_score: int = None,
product_link: str = None):
"""保存分析结果到数据库,包括难度分数和产品链接"""
try:
cursor = conn.cursor()
# 如果没有提供难度分数设置默认值50
if difficulty_score is None:
difficulty_score = 50
cursor.execute("""
INSERT INTO product_analysis
(original_name, development_difficulty, difficulty_score, ai_response, product_link)
VALUES (?, ?, ?, ?, ?)
""", (original_name, difficulty, difficulty_score, ai_response, product_link))
conn.commit()
logger.success(f"保存分析结果成功: {original_name}, 难度分数: {difficulty_score}")
except Exception as e:
logger.error(f"保存分析结果失败: {e}")
raise
def analyze_products(self, max_products: int = None):
"""分析产品数据"""
if max_products is None:
logger.info("开始分析所有产品数据")
else:
logger.info(f"开始分析产品数据,最大数量: {max_products}")
conn = None
try:
# 连接数据库
conn = self.connect_to_database()
# 获取产品数据
products = self.get_product_data(conn)
if not products:
logger.warning("没有找到可分析的产品数据")
return
# 限制分析数量
if max_products is not None:
products_to_analyze = products[:max_products]
else:
products_to_analyze = products
logger.info(f"准备分析 {len(products_to_analyze)} 个产品")
# 逐个分析产品
success_count = 0
skip_count = 0
for i, (original_id, name, introduction) in enumerate(products_to_analyze, 1):
logger.info(f"\n分析进度: {i}/{len(products_to_analyze)} - {name}")
# 检查产品是否已存在
if self.check_product_exists_in_analysis(conn, name):
skip_count += 1
logger.info(f"跳过已存在产品,当前进度: {i}/{len(products_to_analyze)}")
continue
# 显示API调用状态
logger.info(f"正在提交API请求... 进度: {i}/{len(products_to_analyze)}")
# 调用AI API
ai_response = self.call_ollama_ai_api(name, introduction)
if ai_response:
# 显示数据处理状态
logger.info(f"API调用成功正在处理数据...")
# 解析响应
product_intro, difficulty, difficulty_score = self.parse_ai_response(ai_response)
# 保存结果不再保存product_intro避免与ai_response重复
self.save_analysis_result(conn, name, difficulty, ai_response, difficulty_score)
success_count += 1
# 显示完成状态
logger.success(f"产品 '{name}' 分析完成,进度: {i}/{len(products_to_analyze)}")
else:
logger.error(f"分析失败: {name}")
# 处理完数据后延时2秒
logger.info("数据处理完成等待2秒后继续...")
time.sleep(2)
logger.success(f"分析完成! 成功分析 {success_count} 个产品,跳过 {skip_count} 个已存在产品")
except Exception as e:
logger.error(f"分析过程中出错: {e}")
finally:
if conn:
conn.close()
logger.info("数据库连接已关闭")
async def run_scraping(self, urls=None):
"""运行抓取任务"""
logger.info("=== 开始ProductHunt数据抓取 ===")
# 获取要抓取的URL列表
if urls is None:
self.product_urls = self.query_producthunt_urls()
else:
self.product_urls = urls
if not self.product_urls:
logger.error("未找到要抓取的ProductHunt链接")
return False
logger.info(f"找到 {len(self.product_urls)} 个ProductHunt链接")
# 统计抓取结果
success_count = 0
skip_count = 0
error_count = 0
# 使用进度条显示处理进度
with tqdm(total=len(self.product_urls), desc="抓取ProductHunt链接") as pbar:
for url in self.product_urls:
logger.info(f"处理URL: {url}")
# 检查是否已存在
if self.skip_duplicates and self.check_duplicate(url):
logger.info(f"URL已存在跳过: {url}")
skip_count += 1
pbar.update(1)
continue
# 抓取产品信息
product_info = await self.scrape_product_info(url)
if product_info:
# 保存到数据库
success = self.save_product_info(product_info)
if success:
logger.success(f"成功保存产品信息: {product_info.get('name', '未知')}")
success_count += 1
else:
logger.error(f"保存产品信息失败: {url}")
error_count += 1
else:
logger.error(f"抓取产品信息失败: {url}")
error_count += 1
pbar.update(1)
# 显示抓取结果统计
self.show_scraping_results(success_count, skip_count, error_count)
logger.success("=== ProductHunt数据抓取完成 ===")
return True
def show_scraping_results(self, success_count, skip_count, error_count):
"""显示抓取结果统计"""
try:
conn = sqlite3.connect(self.product_db_path)
cursor = conn.cursor()
# 统计数据库中的产品数量
cursor.execute("SELECT COUNT(*) FROM products")
total_count = cursor.fetchone()[0]
# 获取最新抓取的产品信息
cursor.execute("SELECT name, url FROM products ORDER BY updated_at DESC LIMIT 10")
recent_products = cursor.fetchall()
conn.close()
logger.info("=== 抓取结果统计 ===")
logger.info(f"成功抓取: {success_count} 个产品")
logger.info(f"跳过重复: {skip_count} 个链接")
logger.info(f"抓取失败: {error_count} 个链接")
logger.info(f"数据库中的产品总数: {total_count}")
if recent_products:
logger.info("最新抓取的产品:")
for name, url in recent_products:
logger.info(f" - {name}: {url}")
else:
logger.info("数据库中暂无产品记录")
except Exception as e:
logger.error(f"显示抓取结果失败: {e}")
def analyze_missing_scores(self):
"""分析并补充缺失难度分数的产品"""
logger.info("=== 开始分析缺失难度分数的产品 ===")
conn = None
try:
# 连接数据库
conn = self.connect_to_database()
cursor = conn.cursor()
# 查询缺失难度分数的产品
cursor.execute("""
SELECT pa.id, pa.original_name, pa.product_intro, pa.ai_response
FROM product_analysis pa
WHERE pa.difficulty_score IS NULL OR pa.difficulty_score = ''
""")
products_with_missing_scores = cursor.fetchall()
logger.info(f"找到 {len(products_with_missing_scores)} 个缺失难度分数的产品")
if not products_with_missing_scores:
logger.info("没有发现缺失难度分数的产品")
return
# 为每个缺失分数的产品分析并更新分数
updated_count = 0
for i, (analysis_id, name, introduction, ai_response) in enumerate(products_with_missing_scores, 1):
logger.info(f"处理缺失分数的产品 {i}/{len(products_with_missing_scores)}: {name}")
# 如果已有AI响应从响应中重新提取分数
difficulty_score = None
if ai_response:
try:
_, _, _, difficulty_score = self.parse_ai_response(ai_response)
logger.info(f"从现有AI响应中提取分数: {difficulty_score}")
except Exception as e:
logger.error(f"从现有响应提取分数失败: {e}")
# 如果无法从现有响应提取重新调用API
if difficulty_score is None:
logger.info(f"重新调用API分析产品: {name}")
ai_response = self.call_ollama_ai_api(name, introduction)
if ai_response:
_, _, _, difficulty_score = self.parse_ai_response(ai_response)
# 更新AI响应
cursor.execute("""
UPDATE product_analysis
SET ai_response = ?
WHERE id = ?
""", (ai_response, analysis_id))
# 更新难度分数
if difficulty_score is not None:
cursor.execute("""
UPDATE product_analysis
SET difficulty_score = ?
WHERE id = ?
""", (difficulty_score, analysis_id))
conn.commit()
updated_count += 1
logger.success(f"成功更新产品 '{name}' 的难度分数为 {difficulty_score}")
else:
logger.warning(f"无法为产品 '{name}' 确定难度分数")
# 避免API调用过于频繁
if i < len(products_with_missing_scores):
time.sleep(2)
logger.success(f"缺失分数分析完成! 成功更新 {updated_count} 个产品的难度分数")
except Exception as e:
logger.error(f"分析缺失分数过程中出错: {e}")
finally:
if conn:
conn.close()
logger.info("数据库连接已关闭")
async def run_full_workflow_async(self, max_products=None, analyze_only=False):
"""异步运行完整工作流程:抓取+分析+补充缺失分数"""
logger.info("=== 开始全功能产品系统工作流程 ===")
# 初始化数据库
self.init_database()
if not analyze_only:
# 步骤1: 抓取数据
logger.info("步骤1: 开始抓取ProductHunt数据...")
await self.run_scraping()
else:
logger.info("跳过抓取步骤,直接进行分析")
# 步骤2: AI分析
logger.info("步骤2: 开始AI分析产品数据...")
self.analyze_products(max_products)
# 步骤3: 分析并补充缺失的难度分数
logger.info("步骤3: 开始分析并补充缺失的难度分数...")
self.analyze_missing_scores()
logger.success("=== 全功能产品系统工作流程完成 ===")
def run_full_workflow(self, max_products=None, analyze_only=False):
"""运行完整工作流程:抓取+分析(同步入口)"""
# 创建新的事件循环来运行异步函数
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(self.run_full_workflow_async(max_products, analyze_only))
finally:
loop.close()
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description="全功能产品抓取与分析系统")
parser.add_argument("--tophub-db", help="tophub数据库路径", default=None)
parser.add_argument("--product-db", help="产品数据库路径", default=None)
parser.add_argument("--debug-port", type=int, help="Chrome调试端口", default=9222)
parser.add_argument("--limit", type=int, help="抓取链接数量限制", default=0)
parser.add_argument("--no-skip-duplicates", action="store_true", help="不跳过重复URL")
parser.add_argument("--urls", nargs="+", help="指定要抓取的URL列表")
parser.add_argument("--log-file", help="日志文件路径", default="integrated_product_system.log")
parser.add_argument("--max-products", type=int, help="最大分析产品数量", default=None)
parser.add_argument("--analyze-only", action="store_true", help="仅进行分析,不抓取数据")
return parser.parse_args()
async def main():
"""主函数"""
args = parse_arguments()
# 配置日志文件输出
logger.add(args.log_file, level="INFO", rotation="10 MB")
# 运行start_chrome.bat批处理程序
import subprocess
chrome_bat_path = os.path.join(os.path.dirname(__file__), "start_chrome.bat")
logger.info(f"正在运行Chrome启动脚本: {chrome_bat_path}")
try:
# 运行批处理程序,等待其完成
subprocess.run([chrome_bat_path], check=True, shell=True)
logger.success("Chrome启动脚本执行成功")
except subprocess.CalledProcessError as e:
logger.error(f"Chrome启动脚本执行失败: {e}")
except FileNotFoundError:
logger.error(f"未找到Chrome启动脚本: {chrome_bat_path}")
# 创建系统实例
system = IntegratedProductSystem(
tophub_db_path=args.tophub_db,
product_db_path=args.product_db,
debug_port=args.debug_port,
limit=args.limit,
skip_duplicates=not args.no_skip_duplicates,
api_key="" # Ollama不需要API密钥
)
# 运行完整工作流程
if args.urls:
# 如果指定了URL先抓取这些URL
await system.run_scraping(urls=args.urls)
# 然后进行分析
system.analyze_products(max_products=args.max_products)
else:
# 运行完整工作流程
await system.run_full_workflow_async(max_products=args.max_products, analyze_only=args.analyze_only)
if __name__ == "__main__":
# 运行异步主函数
asyncio.run(main())