#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 全功能ProductHunt数据抓取器 使用playwright-get-data.py中的专业功能绕过Cloudflare挑战 """ import sqlite3 import asyncio import os import argparse from datetime import datetime from loguru import logger from tqdm import tqdm import sys # 导入playwright-get-data.py中的功能 import importlib.util # 动态导入playwright-get-data.py playwright_data_path = os.path.join(os.path.dirname(__file__), "playwright-get-data.py") spec = importlib.util.spec_from_file_location("playwright_get_data", playwright_data_path) playwright_get_data = importlib.util.module_from_spec(spec) spec.loader.exec_module(playwright_get_data) ProductHuntScraper = playwright_get_data.ProductHuntScraper # 配置日志 logger.remove() logger.add(sys.stderr, level="INFO", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}") class ProductHuntScraperFull: """全功能ProductHunt数据抓取器""" def __init__(self, tophub_db_path=None, product_db_path=None, debug_port=9222, limit=0, skip_duplicates=True): """ 初始化抓取器 Args: tophub_db_path: tophub数据库路径 product_db_path: 产品数据库路径 debug_port: Chrome调试端口 limit: 抓取链接数量限制 skip_duplicates: 是否跳过已存在的URL """ if tophub_db_path: self.tophub_db_path = tophub_db_path else: self.tophub_db_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "tophub_data.db") if product_db_path: self.product_db_path = product_db_path else: self.product_db_path = os.path.join(os.path.dirname(__file__), "products.db") self.debug_port = debug_port self.limit = limit self.skip_duplicates = skip_duplicates self.product_urls = [] def query_producthunt_urls(self, limit=None): """查询包含producthunt.com的链接""" if limit is None: limit = self.limit logger.info(f"正在查询tophub_data.db数据库,限制: {limit}条") try: conn = sqlite3.connect(self.tophub_db_path) cursor = conn.cursor() # 查询包含producthunt.com的链接(去掉LIMIT限制) cursor.execute("SELECT url FROM articles WHERE url LIKE '%producthunt.com%'") urls = [row[0] for row in cursor.fetchall()] conn.close() logger.success(f"找到 {len(urls)} 个包含producthunt.com的链接") return urls except Exception as e: logger.error(f"查询数据库失败: {e}") return [] def init_product_database(self): """初始化产品数据库""" logger.info("正在初始化产品数据库...") try: conn = sqlite3.connect(self.product_db_path) cursor = conn.cursor() # 创建产品信息表 cursor.execute(''' CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT NOT NULL UNIQUE, name TEXT, introduction TEXT, user_count TEXT, maker_link TEXT, maker_statement TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) ''') conn.commit() conn.close() logger.success("产品数据库初始化完成") except Exception as e: logger.error(f"初始化数据库失败: {e}") def check_duplicate(self, url): """检查URL是否已存在""" try: conn = sqlite3.connect(self.product_db_path) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM products WHERE url = ?", (url,)) count = cursor.fetchone()[0] conn.close() return count > 0 except Exception as e: logger.error(f"检查重复失败: {e}") return False def save_product_info(self, product_info): """保存产品信息到数据库""" try: conn = sqlite3.connect(self.product_db_path) cursor = conn.cursor() current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 检查是否已存在 cursor.execute("SELECT id FROM products WHERE url = ?", (product_info['url'],)) existing = cursor.fetchone() if existing: # 更新现有记录 cursor.execute(''' UPDATE products SET name = ?, introduction = ?, user_count = ?, maker_link = ?, maker_statement = ?, updated_at = ? WHERE url = ? ''', ( product_info.get('name'), product_info.get('introduction'), product_info.get('user_count'), product_info.get('maker_link'), product_info.get('maker_statement'), current_time, product_info['url'] )) logger.info(f"更新产品信息: {product_info.get('name', '未知')}") else: # 插入新记录 cursor.execute(''' INSERT INTO products (url, name, introduction, user_count, maker_link, maker_statement, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', ( product_info['url'], product_info.get('name'), product_info.get('introduction'), product_info.get('user_count'), product_info.get('maker_link'), product_info.get('maker_statement'), current_time, current_time )) logger.info(f"新增产品信息: {product_info.get('name', '未知')}") conn.commit() conn.close() return True except Exception as e: logger.error(f"保存产品信息失败: {e}") return False async def scrape_product_info(self, url): """使用playwright-get-data.py中的专业功能抓取产品信息""" try: logger.info(f"开始抓取: {url}") # 创建ProductHuntScraper实例 scraper = ProductHuntScraper(debug_port=self.debug_port) # 连接到已运行的Chrome实例 connected = await scraper.connect_to_existing_chrome() if not connected: logger.error("连接Chrome失败,跳过此URL") return None # 导航到ProductHunt页面 navigated = await scraper.navigate_to_producthunt(url) if not navigated: logger.error("导航到页面失败,跳过此URL") await scraper.close() return None # 提取产品信息 product_info = await scraper.extract_product_info() if product_info: product_info['url'] = url logger.success(f"成功提取产品信息: {product_info.get('name', '未知')}") else: logger.error("提取产品信息失败") # 关闭连接 await scraper.close() return product_info except Exception as e: logger.error(f"抓取产品信息失败: {e}") return None async def run_scraping(self, urls=None): """运行抓取任务""" logger.info("=== 开始ProductHunt数据抓取 ===") # 初始化数据库 self.init_product_database() # 获取要抓取的URL列表 if urls is None: self.product_urls = self.query_producthunt_urls() else: self.product_urls = urls if not self.product_urls: logger.error("未找到要抓取的ProductHunt链接") return False logger.info(f"找到 {len(self.product_urls)} 个ProductHunt链接") # 统计抓取结果 success_count = 0 skip_count = 0 error_count = 0 # 使用进度条显示处理进度 with tqdm(total=len(self.product_urls), desc="抓取ProductHunt链接") as pbar: for url in self.product_urls: logger.info(f"处理URL: {url}") # 检查是否已存在 if self.skip_duplicates and self.check_duplicate(url): logger.info(f"URL已存在,跳过: {url}") skip_count += 1 pbar.update(1) continue # 抓取产品信息 product_info = await self.scrape_product_info(url) if product_info: # 保存到数据库 success = self.save_product_info(product_info) if success: logger.success(f"成功保存产品信息: {product_info.get('name', '未知')}") success_count += 1 else: logger.error(f"保存产品信息失败: {url}") error_count += 1 else: logger.error(f"抓取产品信息失败: {url}") error_count += 1 pbar.update(1) # 显示抓取结果统计 self.show_scraping_results(success_count, skip_count, error_count) logger.success("=== ProductHunt数据抓取完成 ===") return True def show_scraping_results(self, success_count, skip_count, error_count): """显示抓取结果统计""" try: conn = sqlite3.connect(self.product_db_path) cursor = conn.cursor() # 统计数据库中的产品数量 cursor.execute("SELECT COUNT(*) FROM products") total_count = cursor.fetchone()[0] # 获取最新抓取的产品信息 cursor.execute("SELECT name, url FROM products ORDER BY updated_at DESC LIMIT 10") recent_products = cursor.fetchall() conn.close() logger.info("=== 抓取结果统计 ===") logger.info(f"成功抓取: {success_count} 个产品") logger.info(f"跳过重复: {skip_count} 个链接") logger.info(f"抓取失败: {error_count} 个链接") logger.info(f"数据库中的产品总数: {total_count}") if recent_products: logger.info("最新抓取的产品:") for name, url in recent_products: logger.info(f" - {name}: {url}") else: logger.info("数据库中暂无产品记录") except Exception as e: logger.error(f"显示抓取结果失败: {e}") def parse_arguments(): """解析命令行参数""" parser = argparse.ArgumentParser(description="全功能ProductHunt数据抓取器") parser.add_argument("--tophub-db", help="tophub数据库路径", default=None) parser.add_argument("--product-db", help="产品数据库路径", default=None) parser.add_argument("--debug-port", type=int, help="Chrome调试端口", default=9222) parser.add_argument("--limit", type=int, help="抓取链接数量限制", default=0) parser.add_argument("--no-skip-duplicates", action="store_true", help="不跳过重复URL") parser.add_argument("--urls", nargs="+", help="指定要抓取的URL列表") parser.add_argument("--log-file", help="日志文件路径", default="producthunt_scraper.log") return parser.parse_args() async def main(): """主函数""" args = parse_arguments() # 配置日志文件输出 logger.add(args.log_file, level="INFO", rotation="10 MB") # 创建抓取器实例 scraper = ProductHuntScraperFull( tophub_db_path=args.tophub_db, product_db_path=args.product_db, debug_port=args.debug_port, limit=args.limit, skip_duplicates=not args.no_skip_duplicates ) # 运行抓取任务 if args.urls: await scraper.run_scraping(urls=args.urls) else: await scraper.run_scraping() if __name__ == "__main__": # 运行异步主函数 asyncio.run(main())