#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
全功能ProductHunt数据抓取器
使用playwright-get-data.py中的专业功能绕过Cloudflare挑战
"""
import sqlite3
import asyncio
import os
import argparse
from datetime import datetime
from loguru import logger
from tqdm import tqdm
import sys
# 导入playwright-get-data.py中的功能
import importlib.util
# 动态导入playwright-get-data.py
playwright_data_path = os.path.join(os.path.dirname(__file__), "playwright-get-data.py")
spec = importlib.util.spec_from_file_location("playwright_get_data", playwright_data_path)
playwright_get_data = importlib.util.module_from_spec(spec)
spec.loader.exec_module(playwright_get_data)
ProductHuntScraper = playwright_get_data.ProductHuntScraper
# 配置日志
logger.remove()
logger.add(sys.stderr, level="INFO", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}")
class ProductHuntScraperFull:
"""全功能ProductHunt数据抓取器"""
def __init__(self, tophub_db_path=None, product_db_path=None, debug_port=9222, limit=10, skip_duplicates=True):
"""
初始化抓取器
Args:
tophub_db_path: tophub数据库路径
product_db_path: 产品数据库路径
debug_port: Chrome调试端口
limit: 抓取链接数量限制
skip_duplicates: 是否跳过已存在的URL
"""
if tophub_db_path:
self.tophub_db_path = tophub_db_path
else:
self.tophub_db_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "tophub_data.db")
if product_db_path:
self.product_db_path = product_db_path
else:
self.product_db_path = os.path.join(os.path.dirname(__file__), "products.db")
self.debug_port = debug_port
self.limit = limit
self.skip_duplicates = skip_duplicates
self.product_urls = []
def query_producthunt_urls(self, limit=None):
"""查询包含producthunt.com的链接"""
if limit is None:
limit = self.limit
logger.info(f"正在查询tophub_data.db数据库,限制: {limit}条")
try:
conn = sqlite3.connect(self.tophub_db_path)
cursor = conn.cursor()
# 查询包含producthunt.com的链接
if limit > 0:
cursor.execute("SELECT url FROM articles WHERE url LIKE '%producthunt.com%' LIMIT ?", (limit,))
else:
cursor.execute("SELECT url FROM articles WHERE url LIKE '%producthunt.com%'")
urls = [row[0] for row in cursor.fetchall()]
conn.close()
logger.success(f"找到 {len(urls)} 个包含producthunt.com的链接")
return urls
except Exception as e:
logger.error(f"查询数据库失败: {e}")
return []
def init_product_database(self):
"""初始化产品数据库"""
logger.info("正在初始化产品数据库...")
try:
conn = sqlite3.connect(self.product_db_path)
cursor = conn.cursor()
# 创建产品信息表
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT NOT NULL UNIQUE,
name TEXT,
introduction TEXT,
user_count TEXT,
maker_link TEXT,
maker_statement TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
''')
conn.commit()
conn.close()
logger.success("产品数据库初始化完成")
except Exception as e:
logger.error(f"初始化数据库失败: {e}")
def check_duplicate(self, url):
"""检查URL是否已存在"""
try:
conn = sqlite3.connect(self.product_db_path)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM products WHERE url = ?", (url,))
count = cursor.fetchone()[0]
conn.close()
return count > 0
except Exception as e:
logger.error(f"检查重复失败: {e}")
return False
def save_product_info(self, product_info):
"""保存产品信息到数据库"""
try:
conn = sqlite3.connect(self.product_db_path)
cursor = conn.cursor()
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 检查是否已存在
cursor.execute("SELECT id FROM products WHERE url = ?", (product_info['url'],))
existing = cursor.fetchone()
if existing:
# 更新现有记录
cursor.execute('''
UPDATE products SET
name = ?, introduction = ?, user_count = ?,
maker_link = ?, maker_statement = ?, updated_at = ?
WHERE url = ?
''', (
product_info.get('name'),
product_info.get('introduction'),
product_info.get('user_count'),
product_info.get('maker_link'),
product_info.get('maker_statement'),
current_time,
product_info['url']
))
logger.info(f"更新产品信息: {product_info.get('name', '未知')}")
else:
# 插入新记录
cursor.execute('''
INSERT INTO products
(url, name, introduction, user_count, maker_link, maker_statement, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
product_info['url'],
product_info.get('name'),
product_info.get('introduction'),
product_info.get('user_count'),
product_info.get('maker_link'),
product_info.get('maker_statement'),
current_time,
current_time
))
logger.info(f"新增产品信息: {product_info.get('name', '未知')}")
conn.commit()
conn.close()
return True
except Exception as e:
logger.error(f"保存产品信息失败: {e}")
return False
async def scrape_product_info(self, url):
"""使用playwright-get-data.py中的专业功能抓取产品信息"""
try:
logger.info(f"开始抓取: {url}")
# 创建ProductHuntScraper实例
scraper = ProductHuntScraper(debug_port=self.debug_port)
# 连接到已运行的Chrome实例
connected = await scraper.connect_to_existing_chrome()
if not connected:
logger.error("连接Chrome失败,跳过此URL")
return None
# 导航到ProductHunt页面
navigated = await scraper.navigate_to_producthunt(url)
if not navigated:
logger.error("导航到页面失败,跳过此URL")
await scraper.close()
return None
# 提取产品信息
product_info = await scraper.extract_product_info()
if product_info:
product_info['url'] = url
logger.success(f"成功提取产品信息: {product_info.get('name', '未知')}")
else:
logger.error("提取产品信息失败")
# 关闭连接
await scraper.close()
return product_info
except Exception as e:
logger.error(f"抓取产品信息失败: {e}")
return None
async def run_scraping(self, urls=None):
"""运行抓取任务"""
logger.info("=== 开始ProductHunt数据抓取 ===")
# 初始化数据库
self.init_product_database()
# 获取要抓取的URL列表
if urls is None:
self.product_urls = self.query_producthunt_urls()
else:
self.product_urls = urls
if not self.product_urls:
logger.error("未找到要抓取的ProductHunt链接")
return False
logger.info(f"找到 {len(self.product_urls)} 个ProductHunt链接")
# 统计抓取结果
success_count = 0
skip_count = 0
error_count = 0
# 使用进度条显示处理进度
with tqdm(total=len(self.product_urls), desc="抓取ProductHunt链接") as pbar:
for url in self.product_urls:
logger.info(f"处理URL: {url}")
# 检查是否已存在
if self.skip_duplicates and self.check_duplicate(url):
logger.info(f"URL已存在,跳过: {url}")
skip_count += 1
pbar.update(1)
continue
# 抓取产品信息
product_info = await self.scrape_product_info(url)
if product_info:
# 保存到数据库
success = self.save_product_info(product_info)
if success:
logger.success(f"成功保存产品信息: {product_info.get('name', '未知')}")
success_count += 1
else:
logger.error(f"保存产品信息失败: {url}")
error_count += 1
else:
logger.error(f"抓取产品信息失败: {url}")
error_count += 1
pbar.update(1)
# 显示抓取结果统计
self.show_scraping_results(success_count, skip_count, error_count)
logger.success("=== ProductHunt数据抓取完成 ===")
return True
def show_scraping_results(self, success_count, skip_count, error_count):
"""显示抓取结果统计"""
try:
conn = sqlite3.connect(self.product_db_path)
cursor = conn.cursor()
# 统计数据库中的产品数量
cursor.execute("SELECT COUNT(*) FROM products")
total_count = cursor.fetchone()[0]
# 获取最新抓取的产品信息
cursor.execute("SELECT name, url FROM products ORDER BY updated_at DESC LIMIT 10")
recent_products = cursor.fetchall()
conn.close()
logger.info("=== 抓取结果统计 ===")
logger.info(f"成功抓取: {success_count} 个产品")
logger.info(f"跳过重复: {skip_count} 个链接")
logger.info(f"抓取失败: {error_count} 个链接")
logger.info(f"数据库中的产品总数: {total_count}")
if recent_products:
logger.info("最新抓取的产品:")
for name, url in recent_products:
logger.info(f" - {name}: {url}")
else:
logger.info("数据库中暂无产品记录")
except Exception as e:
logger.error(f"显示抓取结果失败: {e}")
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description="全功能ProductHunt数据抓取器")
parser.add_argument("--tophub-db", help="tophub数据库路径", default=None)
parser.add_argument("--product-db", help="产品数据库路径", default=None)
parser.add_argument("--debug-port", type=int, help="Chrome调试端口", default=9222)
parser.add_argument("--limit", type=int, help="抓取链接数量限制", default=10)
parser.add_argument("--no-skip-duplicates", action="store_true", help="不跳过重复URL")
parser.add_argument("--urls", nargs="+", help="指定要抓取的URL列表")
parser.add_argument("--log-file", help="日志文件路径", default="producthunt_scraper.log")
return parser.parse_args()
async def main():
"""主函数"""
args = parse_arguments()
# 配置日志文件输出
logger.add(args.log_file, level="INFO", rotation="10 MB")
# 创建抓取器实例
scraper = ProductHuntScraperFull(
tophub_db_path=args.tophub_db,
product_db_path=args.product_db,
debug_port=args.debug_port,
limit=args.limit,
skip_duplicates=not args.no_skip_duplicates
)
# 运行抓取任务
if args.urls:
await scraper.run_scraping(urls=args.urls)
else:
await scraper.run_scraping()
if __name__ == "__main__":
# 运行异步主函数
asyncio.run(main())