#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
全功能产品抓取与分析系统
整合integrated_scraper.py和product_ai_analysis.py的功能
支持从ProductHunt抓取数据并进行AI分析
"""
import sqlite3
import asyncio
import os
import argparse
from datetime import datetime
from loguru import logger
from tqdm import tqdm
import sys
import requests
import json
import time
from typing import List, Tuple, Optional
# 配置日志
logger.remove()
logger.add(sys.stderr, level="INFO", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}")
# 动态导入playwright-get-data.py
import importlib.util
playwright_data_path = os.path.join(os.path.dirname(__file__), "playwright-get-data.py")
spec = importlib.util.spec_from_file_location("playwright_get_data", playwright_data_path)
playwright_get_data = importlib.util.module_from_spec(spec)
spec.loader.exec_module(playwright_get_data)
ProductHuntScraper = playwright_get_data.ProductHuntScraper
class IntegratedProductSystem:
"""全功能产品抓取与分析系统"""
def __init__(self, tophub_db_path=None, product_db_path=None, debug_port=9222,
limit=0, skip_duplicates=True, api_key=""):
"""
初始化系统
Args:
tophub_db_path: tophub数据库路径
product_db_path: 产品数据库路径
debug_port: Chrome调试端口
limit: 抓取链接数量限制
skip_duplicates: 是否跳过已存在的URL
api_key: API密钥(Ollama不需要)
"""
# 设置数据库路径
if tophub_db_path:
self.tophub_db_path = tophub_db_path
else:
self.tophub_db_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "tophub_data.db")
if product_db_path:
self.product_db_path = product_db_path
else:
self.product_db_path = os.path.join(os.path.dirname(__file__), "products.db")
self.debug_port = debug_port
self.limit = limit
self.skip_duplicates = skip_duplicates
self.api_key = api_key
self.api_url = "http://localhost:11434/api/generate"
self.product_urls = []
logger.info(f"初始化全功能产品系统,数据库: {self.product_db_path}")
def connect_to_database(self) -> sqlite3.Connection:
"""连接到SQLite数据库"""
try:
conn = sqlite3.connect(self.product_db_path)
logger.success(f"成功连接到数据库: {self.product_db_path}")
return conn
except Exception as e:
logger.error(f"连接数据库失败: {e}")
raise
def init_database(self):
"""初始化数据库,创建所有需要的表"""
logger.info("正在初始化产品数据库...")
try:
conn = sqlite3.connect(self.product_db_path)
cursor = conn.cursor()
# 创建产品信息表(来自integrated_scraper.py)
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT NOT NULL UNIQUE,
name TEXT,
introduction TEXT,
user_count TEXT,
maker_link TEXT,
maker_statement TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
''')
# 创建分析结果表
# 根据最新数据库结构更新
cursor.execute('''
CREATE TABLE IF NOT EXISTS product_analysis (
id INTEGER PRIMARY KEY AUTOINCREMENT,
original_name TEXT,
product_intro TEXT,
development_difficulty TEXT,
ai_response TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
difficulty_score INTEGER,
product_link TEXT,
follows INTEGER
)
''')
# 为现有表添加follows字段(如果不存在)
cursor.execute("PRAGMA table_info(product_analysis)")
columns = [col[1] for col in cursor.fetchall()]
if 'follows' not in columns:
cursor.execute("ALTER TABLE product_analysis ADD COLUMN follows INTEGER")
logger.info("已为product_analysis表添加follows字段")
conn.commit()
conn.close()
logger.success("产品数据库初始化完成")
except Exception as e:
logger.error(f"初始化数据库失败: {e}")
def query_producthunt_urls(self, limit=None):
"""查询包含producthunt.com的链接"""
if limit is None:
limit = self.limit
logger.info(f"正在查询tophub_data.db数据库,限制: {limit}条")
try:
conn = sqlite3.connect(self.tophub_db_path)
cursor = conn.cursor()
# 查询包含producthunt.com的链接
cursor.execute("SELECT url FROM articles WHERE url LIKE '%producthunt.com%'")
urls = [row[0] for row in cursor.fetchall()]
conn.close()
logger.success(f"找到 {len(urls)} 个包含producthunt.com的链接")
return urls
except Exception as e:
logger.error(f"查询数据库失败: {e}")
return []
def check_duplicate(self, url):
"""检查URL是否已存在"""
try:
conn = sqlite3.connect(self.product_db_path)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM products WHERE url = ?", (url,))
count = cursor.fetchone()[0]
conn.close()
return count > 0
except Exception as e:
logger.error(f"检查重复失败: {e}")
return False
def save_product_info(self, product_info):
"""保存产品信息到数据库"""
try:
conn = sqlite3.connect(self.product_db_path)
cursor = conn.cursor()
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 检查是否已存在
cursor.execute("SELECT id FROM products WHERE url = ?", (product_info['url'],))
existing = cursor.fetchone()
if existing:
# 更新现有记录
cursor.execute('''
UPDATE products SET
name = ?, introduction = ?, user_count = ?,
maker_link = ?, maker_statement = ?, updated_at = ?
WHERE url = ?
''', (
product_info.get('name'),
product_info.get('introduction'),
product_info.get('user_count'),
product_info.get('maker_link'),
product_info.get('maker_statement'),
current_time,
product_info['url']
))
logger.info(f"更新产品信息: {product_info.get('name', '未知')}")
else:
# 插入新记录
cursor.execute('''
INSERT INTO products
(url, name, introduction, user_count, maker_link, maker_statement, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
product_info['url'],
product_info.get('name'),
product_info.get('introduction'),
product_info.get('user_count'),
product_info.get('maker_link'),
product_info.get('maker_statement'),
current_time,
current_time
))
logger.info(f"新增产品信息: {product_info.get('name', '未知')}")
conn.commit()
conn.close()
return True
except Exception as e:
logger.error(f"保存产品信息失败: {e}")
return False
async def scrape_product_info(self, url):
"""使用playwright-get-data.py中的专业功能抓取产品信息"""
try:
logger.info(f"开始抓取: {url}")
# 创建ProductHuntScraper实例
scraper = ProductHuntScraper(debug_port=self.debug_port)
# 连接到已运行的Chrome实例
connected = await scraper.connect_to_existing_chrome()
if not connected:
logger.error("连接Chrome失败,跳过此URL")
return None
# 导航到ProductHunt页面
navigated = await scraper.navigate_to_producthunt(url)
if not navigated:
logger.error("导航到页面失败,跳过此URL")
await scraper.close()
return None
# 提取产品信息
product_info = await scraper.extract_product_info()
if product_info:
product_info['url'] = url
logger.success(f"成功提取产品信息: {product_info.get('name', '未知')}")
else:
logger.error("提取产品信息失败")
# 关闭连接
await scraper.close()
return product_info
except Exception as e:
logger.error(f"抓取产品信息失败: {e}")
return None
def get_product_data(self, conn: sqlite3.Connection) -> List[Tuple]:
"""从数据库获取产品数据"""
try:
cursor = conn.cursor()
# 查询products表中的id、name、introduction、user_count和url字段
cursor.execute("""
SELECT id, name, introduction, user_count, url
FROM products
WHERE name IS NOT NULL AND introduction IS NOT NULL
AND name != '' AND introduction != ''
""")
products = cursor.fetchall()
logger.info(f"从数据库获取到 {len(products)} 个产品")
# 显示前几个产品作为示例
for i, (id, name, intro, user_count, url) in enumerate(products[:3], 1):
logger.info(f"示例产品{i}: ID={id}, 名称='{name}', 简介='{intro[:50]}...', 用户数='{user_count}', URL='{url}'")
return products
except Exception as e:
logger.error(f"获取产品数据失败: {e}")
raise
def call_ollama_ai_api(self, name: str, introduction: str) -> Optional[str]:
"""调用Ollama AI API进行分析"""
try:
# 构建请求数据 - 使用Ollama API格式
prompt = f"这个是【{name}】,简介内容是【{introduction}】。请把产品的简介翻译成中文,并返回假设一个人加上AI辅助能否开发这个产品,请详细回答。返回的内容是产品名称/产品简介/开发难度。返回的例子一:notion/这个是笔记产品等等/一个人开发难度较高"
data = {
"model": "qwen3:8b",
"prompt": prompt,
"stream": False
}
headers = {
"Content-Type": "application/json"
}
logger.info(f"调用Ollama AI API分析产品: {name}")
response = requests.post(
self.api_url,
headers=headers,
data=json.dumps(data, ensure_ascii=False),
timeout=60
)
if response.status_code == 200:
result = response.json()
content = result.get("response", "")
logger.success(f"API调用成功: {name}")
return content
else:
logger.error(f"API调用失败: {response.status_code}, {response.text}")
return None
except Exception as e:
logger.error(f"调用Ollama AI API时出错: {e}")
return None
def convert_user_count_to_number(self, user_count: str) -> Optional[int]:
"""使用Ollama API将user_count文本转换为数字
Args:
user_count: 用户数量文本,如"53 followers"或"1.9K followers"
Returns:
转换后的数字,或None如果转换失败
"""
if not user_count or user_count.strip() == "":
logger.info(f"空的用户数量: {user_count}")
return None
try:
logger.info(f"正在转换用户数量: {user_count}")
# 构建请求数据,专门用于用户数量转换
prompt = f"请将以下用户数量文本转换为纯数字,不要包含任何其他内容:\n{user_count}\n\n转换规则:\n- 直接数字:如'53 followers' → 53\n- K表示千:如'1.9K followers' → 1900\n- M表示百万:如'2.5M followers' → 2500000\n- 只返回数字,不要添加任何单位或解释"
data = {
"model": "qwen3:8b",
"prompt": prompt,
"stream": False
}
headers = {
"Content-Type": "application/json"
}
# 调用Ollama API
response = requests.post(
self.api_url,
headers=headers,
data=json.dumps(data, ensure_ascii=False),
timeout=30
)
if response.status_code == 200:
result = response.json()
converted = result.get("response", "").strip()
logger.success(f"成功转换用户数量: {user_count} → {converted}")
# 提取纯数字
import re
number_match = re.search(r'\d+(?:\.\d+)?', converted)
if number_match:
return int(float(number_match.group()))
else:
logger.error(f"无法从转换结果中提取数字: {converted}")
return None
else:
logger.error(f"Ollama API调用失败: {response.status_code}, {response.text}")
return None
except Exception as e:
logger.error(f"转换用户数量时出错: {e}")
return None
def parse_ai_response(self, response: str) -> Tuple[str, str, str, int]:
"""解析AI响应内容,提取产品名称、简介、难度描述和难度分数"""
try:
# 使用/分割响应内容
parts = response.split('/')
product_intro = ""
difficulty = ""
difficulty_score = None
if len(parts) >= 3:
# 不再使用product_name,直接从原始名称中获取
product_intro = parts[1].strip()
difficulty = parts[2].strip()
logger.info(f"解析结果: 简介='{product_intro[:30]}...', 难度='{difficulty}'")
# 从难度描述中提取分数
import re
# 尝试匹配数字分数
score_match = re.search(r'\b(\d+)\b分|\b难度(\d+)\b|\b(\d+)\b', difficulty)
if score_match:
# 获取第一个匹配的数字
for group in score_match.groups():
if group:
difficulty_score = int(group)
break
# 如果没有提取到分数,根据关键词设置默认分数
if difficulty_score is None:
difficulty_lower = difficulty.lower()
if any(keyword in difficulty_lower for keyword in ['高', '很难', '非常难', '复杂']):
difficulty_score = 85
elif any(keyword in difficulty_lower for keyword in ['中', '一般', '适中', '普通']):
difficulty_score = 60
elif any(keyword in difficulty_lower for keyword in ['低', '简单', '容易']):
difficulty_score = 35
else:
difficulty_score = 50 # 默认中等难度
logger.info(f"提取到难度分数: {difficulty_score}")
else:
logger.warning(f"响应格式不符合预期: {response}")
# 如果格式不符合,返回原始内容
difficulty = response
difficulty_score = 50 # 默认中等难度
return product_intro, difficulty, difficulty_score
except Exception as e:
logger.error(f"解析AI响应失败: {e}")
return "", response, 50
def check_product_exists_in_analysis(self, conn: sqlite3.Connection, original_name: str) -> bool:
"""检查产品是否已存在于分析结果表中"""
try:
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*) FROM product_analysis
WHERE original_name = ?
""", (original_name,))
count = cursor.fetchone()[0]
exists = count > 0
if exists:
logger.info(f"产品 '{original_name}' 已存在,跳过分析")
return exists
except Exception as e:
logger.error(f"检查产品存在性失败: {e}")
return False
def save_analysis_result(self, conn: sqlite3.Connection,
original_name: str, difficulty: str,
ai_response: str, difficulty_score: int = None,
product_link: str = None, follows: int = None):
"""保存分析结果到数据库,包括难度分数、产品链接和关注数"""
try:
cursor = conn.cursor()
# 如果没有提供难度分数,设置默认值50
if difficulty_score is None:
difficulty_score = 50
cursor.execute("""
INSERT INTO product_analysis
(original_name, development_difficulty, difficulty_score, ai_response, product_link, follows)
VALUES (?, ?, ?, ?, ?, ?)
""", (original_name, difficulty, difficulty_score, ai_response, product_link, follows))
conn.commit()
logger.success(f"保存分析结果成功: {original_name}, 难度分数: {difficulty_score}, 关注数: {follows}")
except Exception as e:
logger.error(f"保存分析结果失败: {e}")
raise
def analyze_products(self, max_products: int = None):
"""分析产品数据"""
if max_products is None:
logger.info("开始分析所有产品数据")
else:
logger.info(f"开始分析产品数据,最大数量: {max_products}")
conn = None
try:
# 连接数据库
conn = self.connect_to_database()
# 获取产品数据
products = self.get_product_data(conn)
if not products:
logger.warning("没有找到可分析的产品数据")
return
# 限制分析数量
if max_products is not None:
products_to_analyze = products[:max_products]
else:
products_to_analyze = products
logger.info(f"准备分析 {len(products_to_analyze)} 个产品")
# 逐个分析产品
success_count = 0
skip_count = 0
for i, (original_id, name, introduction, user_count, url) in enumerate(products_to_analyze, 1):
logger.info(f"\n分析进度: {i}/{len(products_to_analyze)} - {name}")
# 检查产品是否已存在
if self.check_product_exists_in_analysis(conn, name):
skip_count += 1
logger.info(f"跳过已存在产品,当前进度: {i}/{len(products_to_analyze)}")
continue
# 显示API调用状态
logger.info(f"正在提交API请求... 进度: {i}/{len(products_to_analyze)}")
# 调用AI API分析产品
ai_response = self.call_ollama_ai_api(name, introduction)
if ai_response:
# 显示数据处理状态
logger.info(f"API调用成功,正在处理数据...")
# 解析响应
product_intro, difficulty, difficulty_score = self.parse_ai_response(ai_response)
# 转换用户关注数
follows = None
if user_count:
follows = self.convert_user_count_to_number(user_count)
# 保存结果
self.save_analysis_result(conn, name, difficulty, ai_response, difficulty_score, url, follows)
success_count += 1
# 显示完成状态
logger.success(f"产品 '{name}' 分析完成,进度: {i}/{len(products_to_analyze)}")
else:
logger.error(f"分析失败: {name}")
# 处理完数据后延时2秒
logger.info("数据处理完成,等待2秒后继续...")
time.sleep(2)
logger.success(f"分析完成! 成功分析 {success_count} 个产品,跳过 {skip_count} 个已存在产品")
except Exception as e:
logger.error(f"分析过程中出错: {e}")
finally:
if conn:
conn.close()
logger.info("数据库连接已关闭")
async def run_scraping(self, urls=None):
"""运行抓取任务"""
logger.info("=== 开始ProductHunt数据抓取 ===")
# 获取要抓取的URL列表
if urls is None:
self.product_urls = self.query_producthunt_urls()
else:
self.product_urls = urls
if not self.product_urls:
logger.error("未找到要抓取的ProductHunt链接")
return False
logger.info(f"找到 {len(self.product_urls)} 个ProductHunt链接")
# 统计抓取结果
success_count = 0
skip_count = 0
error_count = 0
# 使用进度条显示处理进度
with tqdm(total=len(self.product_urls), desc="抓取ProductHunt链接") as pbar:
for url in self.product_urls:
logger.info(f"处理URL: {url}")
# 检查是否已存在
if self.skip_duplicates and self.check_duplicate(url):
logger.info(f"URL已存在,跳过: {url}")
skip_count += 1
pbar.update(1)
continue
# 抓取产品信息
product_info = await self.scrape_product_info(url)
if product_info:
# 保存到数据库
success = self.save_product_info(product_info)
if success:
logger.success(f"成功保存产品信息: {product_info.get('name', '未知')}")
success_count += 1
else:
logger.error(f"保存产品信息失败: {url}")
error_count += 1
else:
logger.error(f"抓取产品信息失败: {url}")
error_count += 1
pbar.update(1)
# 显示抓取结果统计
self.show_scraping_results(success_count, skip_count, error_count)
logger.success("=== ProductHunt数据抓取完成 ===")
return True
def show_scraping_results(self, success_count, skip_count, error_count):
"""显示抓取结果统计"""
try:
conn = sqlite3.connect(self.product_db_path)
cursor = conn.cursor()
# 统计数据库中的产品数量
cursor.execute("SELECT COUNT(*) FROM products")
total_count = cursor.fetchone()[0]
# 获取最新抓取的产品信息
cursor.execute("SELECT name, url FROM products ORDER BY updated_at DESC LIMIT 10")
recent_products = cursor.fetchall()
conn.close()
logger.info("=== 抓取结果统计 ===")
logger.info(f"成功抓取: {success_count} 个产品")
logger.info(f"跳过重复: {skip_count} 个链接")
logger.info(f"抓取失败: {error_count} 个链接")
logger.info(f"数据库中的产品总数: {total_count}")
if recent_products:
logger.info("最新抓取的产品:")
for name, url in recent_products:
logger.info(f" - {name}: {url}")
else:
logger.info("数据库中暂无产品记录")
except Exception as e:
logger.error(f"显示抓取结果失败: {e}")
def analyze_missing_scores(self):
"""分析并补充缺失难度分数的产品"""
logger.info("=== 开始分析缺失难度分数的产品 ===")
conn = None
try:
# 连接数据库
conn = self.connect_to_database()
cursor = conn.cursor()
# 查询缺失难度分数的产品
cursor.execute("""
SELECT pa.id, pa.original_name, pa.product_intro, pa.ai_response
FROM product_analysis pa
WHERE pa.difficulty_score IS NULL OR pa.difficulty_score = ''
""")
products_with_missing_scores = cursor.fetchall()
logger.info(f"找到 {len(products_with_missing_scores)} 个缺失难度分数的产品")
if not products_with_missing_scores:
logger.info("没有发现缺失难度分数的产品")
return
# 为每个缺失分数的产品分析并更新分数
updated_count = 0
for i, (analysis_id, name, introduction, ai_response) in enumerate(products_with_missing_scores, 1):
logger.info(f"处理缺失分数的产品 {i}/{len(products_with_missing_scores)}: {name}")
# 如果已有AI响应,从响应中重新提取分数
difficulty_score = None
if ai_response:
try:
_, _, _, difficulty_score = self.parse_ai_response(ai_response)
logger.info(f"从现有AI响应中提取分数: {difficulty_score}")
except Exception as e:
logger.error(f"从现有响应提取分数失败: {e}")
# 如果无法从现有响应提取,重新调用API
if difficulty_score is None:
logger.info(f"重新调用API分析产品: {name}")
ai_response = self.call_ollama_ai_api(name, introduction)
if ai_response:
_, _, _, difficulty_score = self.parse_ai_response(ai_response)
# 更新AI响应
cursor.execute("""
UPDATE product_analysis
SET ai_response = ?
WHERE id = ?
""", (ai_response, analysis_id))
# 更新难度分数
if difficulty_score is not None:
cursor.execute("""
UPDATE product_analysis
SET difficulty_score = ?
WHERE id = ?
""", (difficulty_score, analysis_id))
conn.commit()
updated_count += 1
logger.success(f"成功更新产品 '{name}' 的难度分数为 {difficulty_score}")
else:
logger.warning(f"无法为产品 '{name}' 确定难度分数")
# 避免API调用过于频繁
if i < len(products_with_missing_scores):
time.sleep(2)
logger.success(f"缺失分数分析完成! 成功更新 {updated_count} 个产品的难度分数")
except Exception as e:
logger.error(f"分析缺失分数过程中出错: {e}")
finally:
if conn:
conn.close()
logger.info("数据库连接已关闭")
def analyze_follower_counts(self):
"""分析并更新产品的关注数"""
logger.info("=== 开始分析产品关注数 ===")
conn = None
try:
# 连接数据库
conn = self.connect_to_database()
cursor = conn.cursor()
# 查询所有产品及其对应的分析记录
cursor.execute("""
SELECT p.id, p.name, p.user_count, pa.id as analysis_id
FROM products p
LEFT JOIN product_analysis pa ON p.name = pa.original_name
WHERE p.user_count IS NOT NULL AND p.user_count != ''
""")
products = cursor.fetchall()
logger.info(f"找到 {len(products)} 个需要更新关注数的产品")
if not products:
logger.info("没有发现需要更新关注数的产品")
return
# 为每个产品转换user_count并更新到product_analysis.follows
updated_count = 0
for i, (product_id, name, user_count, analysis_id) in enumerate(products, 1):
logger.info(f"处理产品关注数 {i}/{len(products)}: {name}, 用户数: {user_count}")
if not analysis_id:
logger.info(f"产品 '{name}' 没有对应的分析记录,跳过")
continue
# 转换用户关注数
follows = self.convert_user_count_to_number(user_count)
# 更新关注数
if follows is not None:
cursor.execute("""
UPDATE product_analysis
SET follows = ?
WHERE id = ?
""", (follows, analysis_id))
conn.commit()
updated_count += 1
logger.success(f"成功更新产品 '{name}' 的关注数为 {follows}")
else:
logger.warning(f"无法为产品 '{name}' 转换关注数")
# 避免API调用过于频繁
if i < len(products):
time.sleep(2)
logger.success(f"关注数分析完成! 成功更新 {updated_count} 个产品的关注数")
except Exception as e:
logger.error(f"分析关注数过程中出错: {e}")
finally:
if conn:
conn.close()
logger.info("数据库连接已关闭")
async def run_full_workflow_async(self, max_products=None, analyze_only=False):
"""异步运行完整工作流程:抓取+分析+补充缺失分数+更新关注数"""
logger.info("=== 开始全功能产品系统工作流程 ===")
# 初始化数据库
self.init_database()
if not analyze_only:
# 步骤1: 抓取数据
logger.info("步骤1: 开始抓取ProductHunt数据...")
await self.run_scraping()
else:
logger.info("跳过抓取步骤,直接进行分析")
# 步骤2: AI分析
logger.info("步骤2: 开始AI分析产品数据...")
self.analyze_products(max_products)
# 步骤3: 分析并补充缺失的难度分数
logger.info("步骤3: 开始分析并补充缺失的难度分数...")
self.analyze_missing_scores()
# 步骤4: 分析并更新产品关注数
logger.info("步骤4: 开始分析并更新产品关注数...")
self.analyze_follower_counts()
logger.success("=== 全功能产品系统工作流程完成 ===")
def run_full_workflow(self, max_products=None, analyze_only=False):
"""运行完整工作流程:抓取+分析(同步入口)"""
# 创建新的事件循环来运行异步函数
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(self.run_full_workflow_async(max_products, analyze_only))
finally:
loop.close()
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description="全功能产品抓取与分析系统")
parser.add_argument("--tophub-db", help="tophub数据库路径", default=None)
parser.add_argument("--product-db", help="产品数据库路径", default=None)
parser.add_argument("--debug-port", type=int, help="Chrome调试端口", default=9222)
parser.add_argument("--limit", type=int, help="抓取链接数量限制", default=0)
parser.add_argument("--no-skip-duplicates", action="store_true", help="不跳过重复URL")
parser.add_argument("--urls", nargs="+", help="指定要抓取的URL列表")
parser.add_argument("--log-file", help="日志文件路径", default="integrated_product_system.log")
parser.add_argument("--max-products", type=int, help="最大分析产品数量", default=None)
parser.add_argument("--analyze-only", action="store_true", help="仅进行分析,不抓取数据")
return parser.parse_args()
async def main():
"""主函数"""
args = parse_arguments()
# 配置日志文件输出
logger.add(args.log_file, level="INFO", rotation="10 MB")
# 运行start_chrome.bat批处理程序
import subprocess
chrome_bat_path = os.path.join(os.path.dirname(__file__), "start_chrome.bat")
logger.info(f"正在运行Chrome启动脚本: {chrome_bat_path}")
try:
# 异步运行批处理程序,不等待其完成
subprocess.Popen([chrome_bat_path], shell=True)
logger.success("Chrome启动脚本已启动")
except Exception as e:
logger.error(f"Chrome启动脚本启动失败: {e}")
except FileNotFoundError:
logger.error(f"未找到Chrome启动脚本: {chrome_bat_path}")
# 创建系统实例
system = IntegratedProductSystem(
tophub_db_path=args.tophub_db,
product_db_path=args.product_db,
debug_port=args.debug_port,
limit=args.limit,
skip_duplicates=not args.no_skip_duplicates,
api_key="" # Ollama不需要API密钥
)
# 运行完整工作流程
if args.urls:
# 如果指定了URL,先抓取这些URL
await system.run_scraping(urls=args.urls)
# 然后进行分析
system.analyze_products(max_products=args.max_products)
else:
# 运行完整工作流程
await system.run_full_workflow_async(max_products=args.max_products, analyze_only=args.analyze_only)
if __name__ == "__main__":
# 运行异步主函数
asyncio.run(main())