#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ ProductHunt API抓取器 - 通过API获取产品信息 """ import asyncio import sqlite3 import requests from loguru import logger import os import json from urllib.parse import urlparse class ProductHuntAPIScraper: def __init__(self, db_path="test_product.db"): self.db_path = db_path self.init_database() def init_database(self): """初始化数据库""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 创建products表 cursor.execute(""" CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, url TEXT UNIQUE, introduction TEXT, user_count INTEGER, maker_link TEXT, maker_statement TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) conn.commit() conn.close() logger.info(f"数据库已初始化: {self.db_path}") def save_product_info(self, product_info): """保存产品信息到数据库""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 检查是否已存在 cursor.execute("SELECT id FROM products WHERE url = ?", (product_info['url'],)) existing = cursor.fetchone() if existing: # 更新现有记录 cursor.execute(""" UPDATE products SET name = ?, introduction = ?, user_count = ?, maker_link = ?, maker_statement = ?, updated_at = CURRENT_TIMESTAMP WHERE url = ? """, ( product_info['name'], product_info['introduction'], product_info['user_count'], product_info['maker_link'], product_info['maker_statement'], product_info['url'] )) logger.info(f"更新产品信息: {product_info['name']}") else: # 插入新记录 cursor.execute(""" INSERT INTO products (name, url, introduction, user_count, maker_link, maker_statement) VALUES (?, ?, ?, ?, ?, ?) """, ( product_info['name'], product_info['url'], product_info['introduction'], product_info['user_count'], product_info['maker_link'], product_info['maker_statement'] )) logger.info(f"保存产品信息: {product_info['name']}") conn.commit() conn.close() def extract_product_name_from_url(self, url): """从URL中提取产品名称""" try: parsed_url = urlparse(url) path_parts = parsed_url.path.split('/') # 查找products路径段 for i, part in enumerate(path_parts): if part == 'products' and i + 1 < len(path_parts): product_slug = path_parts[i + 1] # 将slug转换为可读的名称 name = product_slug.replace('-', ' ').title() return name # 如果找不到products路径段,使用最后一个路径段 if path_parts: last_part = path_parts[-1] if last_part: name = last_part.replace('-', ' ').title() return name return "Unknown Product" except Exception as e: logger.error(f"从URL提取产品名称失败: {e}") return "Unknown Product" def get_product_info_from_api(self, url): """尝试通过API获取产品信息""" try: # 从URL中提取产品slug parsed_url = urlparse(url) path_parts = parsed_url.path.split('/') product_slug = None for i, part in enumerate(path_parts): if part == 'products' and i + 1 < len(path_parts): product_slug = path_parts[i + 1] break if not product_slug: logger.warning(f"无法从URL中提取产品slug: {url}") return None # 尝试使用ProductHunt的GraphQL API(需要API密钥) # 这里我们使用一个简化的方法,只提取基本信息 product_info = { 'url': url, 'name': self.extract_product_name_from_url(url), 'introduction': f"Product from ProductHunt: {product_slug}", 'user_count': None, # 需要API访问 'maker_link': None, # 需要API访问 'maker_statement': None # 需要API访问 } logger.info(f"通过API获取产品信息: {product_info['name']}") return product_info except Exception as e: logger.error(f"API获取产品信息失败: {e}") return None def get_product_info_fallback(self, url): """备用方法:从URL中提取基本信息""" try: product_name = self.extract_product_name_from_url(url) product_info = { 'url': url, 'name': product_name, 'introduction': f"Product from ProductHunt: {product_name}", 'user_count': None, 'maker_link': None, 'maker_statement': None } logger.info(f"使用备用方法获取产品信息: {product_info['name']}") return product_info except Exception as e: logger.error(f"备用方法获取产品信息失败: {e}") return None def run_test(self): """运行测试""" # 从tophub_data.db获取ProductHunt链接 tophub_db_path = os.path.join(os.path.dirname(self.db_path), "..", "tophub_data.db") conn = sqlite3.connect(tophub_db_path) cursor = conn.cursor() # 查询包含producthunt.com的链接 cursor.execute(""" SELECT url FROM articles WHERE url LIKE '%producthunt.com%' LIMIT 3 """) urls = [row[0] for row in cursor.fetchall()] conn.close() logger.info(f"找到 {len(urls)} 个ProductHunt链接") # 处理每个URL for url in urls: logger.info(f"处理URL: {url}") # 尝试通过API获取产品信息 product_info = self.get_product_info_from_api(url) # 如果API失败,使用备用方法 if not product_info: product_info = self.get_product_info_fallback(url) # 如果两种方法都失败,创建基本产品信息 if not product_info: product_info = { 'url': url, 'name': 'Unknown Product', 'introduction': 'Unable to fetch product information', 'user_count': None, 'maker_link': None, 'maker_statement': None } # 保存到数据库 self.save_product_info(product_info) # 统计结果 conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM products") count = cursor.fetchone()[0] cursor.execute("SELECT name, url FROM products") products = cursor.fetchall() conn.close() logger.success("测试任务完成") print("\n=== 测试结果统计 ===") print(f"数据库中的产品数量: {count}") print("已抓取的产品:") for name, url in products: print(f" - {name}: {url}") def main(): """主函数""" # 配置日志 logger.remove() logger.add( "api_scraper.log", level="DEBUG", format="{time:YYYY-MM-DD HH:mm:ss} | {level:<8} | {name}:{function}:{line} - {message}", rotation="10 MB", retention="7 days" ) # 创建抓取器实例 scraper = ProductHuntAPIScraper() # 运行测试 scraper.run_test() if __name__ == "__main__": main()