245 lines
8.4 KiB
Python
245 lines
8.4 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
"""
|
|||
|
|
ProductHunt API抓取器 - 通过API获取产品信息
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import asyncio
|
|||
|
|
import sqlite3
|
|||
|
|
import requests
|
|||
|
|
from loguru import logger
|
|||
|
|
import os
|
|||
|
|
import json
|
|||
|
|
from urllib.parse import urlparse
|
|||
|
|
|
|||
|
|
class ProductHuntAPIScraper:
|
|||
|
|
def __init__(self, db_path="test_product.db"):
|
|||
|
|
self.db_path = db_path
|
|||
|
|
self.init_database()
|
|||
|
|
|
|||
|
|
def init_database(self):
|
|||
|
|
"""初始化数据库"""
|
|||
|
|
conn = sqlite3.connect(self.db_path)
|
|||
|
|
cursor = conn.cursor()
|
|||
|
|
|
|||
|
|
# 创建products表
|
|||
|
|
cursor.execute("""
|
|||
|
|
CREATE TABLE IF NOT EXISTS products (
|
|||
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|||
|
|
name TEXT,
|
|||
|
|
url TEXT UNIQUE,
|
|||
|
|
introduction TEXT,
|
|||
|
|
user_count INTEGER,
|
|||
|
|
maker_link TEXT,
|
|||
|
|
maker_statement TEXT,
|
|||
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|||
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|||
|
|
)
|
|||
|
|
""")
|
|||
|
|
|
|||
|
|
conn.commit()
|
|||
|
|
conn.close()
|
|||
|
|
logger.info(f"数据库已初始化: {self.db_path}")
|
|||
|
|
|
|||
|
|
def save_product_info(self, product_info):
|
|||
|
|
"""保存产品信息到数据库"""
|
|||
|
|
conn = sqlite3.connect(self.db_path)
|
|||
|
|
cursor = conn.cursor()
|
|||
|
|
|
|||
|
|
# 检查是否已存在
|
|||
|
|
cursor.execute("SELECT id FROM products WHERE url = ?", (product_info['url'],))
|
|||
|
|
existing = cursor.fetchone()
|
|||
|
|
|
|||
|
|
if existing:
|
|||
|
|
# 更新现有记录
|
|||
|
|
cursor.execute("""
|
|||
|
|
UPDATE products SET
|
|||
|
|
name = ?, introduction = ?, user_count = ?,
|
|||
|
|
maker_link = ?, maker_statement = ?, updated_at = CURRENT_TIMESTAMP
|
|||
|
|
WHERE url = ?
|
|||
|
|
""", (
|
|||
|
|
product_info['name'], product_info['introduction'],
|
|||
|
|
product_info['user_count'], product_info['maker_link'],
|
|||
|
|
product_info['maker_statement'], product_info['url']
|
|||
|
|
))
|
|||
|
|
logger.info(f"更新产品信息: {product_info['name']}")
|
|||
|
|
else:
|
|||
|
|
# 插入新记录
|
|||
|
|
cursor.execute("""
|
|||
|
|
INSERT INTO products (name, url, introduction, user_count, maker_link, maker_statement)
|
|||
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|||
|
|
""", (
|
|||
|
|
product_info['name'], product_info['url'], product_info['introduction'],
|
|||
|
|
product_info['user_count'], product_info['maker_link'], product_info['maker_statement']
|
|||
|
|
))
|
|||
|
|
logger.info(f"保存产品信息: {product_info['name']}")
|
|||
|
|
|
|||
|
|
conn.commit()
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
def extract_product_name_from_url(self, url):
|
|||
|
|
"""从URL中提取产品名称"""
|
|||
|
|
try:
|
|||
|
|
parsed_url = urlparse(url)
|
|||
|
|
path_parts = parsed_url.path.split('/')
|
|||
|
|
|
|||
|
|
# 查找products路径段
|
|||
|
|
for i, part in enumerate(path_parts):
|
|||
|
|
if part == 'products' and i + 1 < len(path_parts):
|
|||
|
|
product_slug = path_parts[i + 1]
|
|||
|
|
# 将slug转换为可读的名称
|
|||
|
|
name = product_slug.replace('-', ' ').title()
|
|||
|
|
return name
|
|||
|
|
|
|||
|
|
# 如果找不到products路径段,使用最后一个路径段
|
|||
|
|
if path_parts:
|
|||
|
|
last_part = path_parts[-1]
|
|||
|
|
if last_part:
|
|||
|
|
name = last_part.replace('-', ' ').title()
|
|||
|
|
return name
|
|||
|
|
|
|||
|
|
return "Unknown Product"
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"从URL提取产品名称失败: {e}")
|
|||
|
|
return "Unknown Product"
|
|||
|
|
|
|||
|
|
def get_product_info_from_api(self, url):
|
|||
|
|
"""尝试通过API获取产品信息"""
|
|||
|
|
try:
|
|||
|
|
# 从URL中提取产品slug
|
|||
|
|
parsed_url = urlparse(url)
|
|||
|
|
path_parts = parsed_url.path.split('/')
|
|||
|
|
|
|||
|
|
product_slug = None
|
|||
|
|
for i, part in enumerate(path_parts):
|
|||
|
|
if part == 'products' and i + 1 < len(path_parts):
|
|||
|
|
product_slug = path_parts[i + 1]
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
if not product_slug:
|
|||
|
|
logger.warning(f"无法从URL中提取产品slug: {url}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
# 尝试使用ProductHunt的GraphQL API(需要API密钥)
|
|||
|
|
# 这里我们使用一个简化的方法,只提取基本信息
|
|||
|
|
|
|||
|
|
product_info = {
|
|||
|
|
'url': url,
|
|||
|
|
'name': self.extract_product_name_from_url(url),
|
|||
|
|
'introduction': f"Product from ProductHunt: {product_slug}",
|
|||
|
|
'user_count': None, # 需要API访问
|
|||
|
|
'maker_link': None, # 需要API访问
|
|||
|
|
'maker_statement': None # 需要API访问
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logger.info(f"通过API获取产品信息: {product_info['name']}")
|
|||
|
|
return product_info
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"API获取产品信息失败: {e}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def get_product_info_fallback(self, url):
|
|||
|
|
"""备用方法:从URL中提取基本信息"""
|
|||
|
|
try:
|
|||
|
|
product_name = self.extract_product_name_from_url(url)
|
|||
|
|
|
|||
|
|
product_info = {
|
|||
|
|
'url': url,
|
|||
|
|
'name': product_name,
|
|||
|
|
'introduction': f"Product from ProductHunt: {product_name}",
|
|||
|
|
'user_count': None,
|
|||
|
|
'maker_link': None,
|
|||
|
|
'maker_statement': None
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logger.info(f"使用备用方法获取产品信息: {product_info['name']}")
|
|||
|
|
return product_info
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"备用方法获取产品信息失败: {e}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def run_test(self):
|
|||
|
|
"""运行测试"""
|
|||
|
|
# 从tophub_data.db获取ProductHunt链接
|
|||
|
|
tophub_db_path = os.path.join(os.path.dirname(self.db_path), "..", "tophub_data.db")
|
|||
|
|
|
|||
|
|
conn = sqlite3.connect(tophub_db_path)
|
|||
|
|
cursor = conn.cursor()
|
|||
|
|
|
|||
|
|
# 查询包含producthunt.com的链接
|
|||
|
|
cursor.execute("""
|
|||
|
|
SELECT url FROM articles
|
|||
|
|
WHERE url LIKE '%producthunt.com%'
|
|||
|
|
LIMIT 3
|
|||
|
|
""")
|
|||
|
|
|
|||
|
|
urls = [row[0] for row in cursor.fetchall()]
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
logger.info(f"找到 {len(urls)} 个ProductHunt链接")
|
|||
|
|
|
|||
|
|
# 处理每个URL
|
|||
|
|
for url in urls:
|
|||
|
|
logger.info(f"处理URL: {url}")
|
|||
|
|
|
|||
|
|
# 尝试通过API获取产品信息
|
|||
|
|
product_info = self.get_product_info_from_api(url)
|
|||
|
|
|
|||
|
|
# 如果API失败,使用备用方法
|
|||
|
|
if not product_info:
|
|||
|
|
product_info = self.get_product_info_fallback(url)
|
|||
|
|
|
|||
|
|
# 如果两种方法都失败,创建基本产品信息
|
|||
|
|
if not product_info:
|
|||
|
|
product_info = {
|
|||
|
|
'url': url,
|
|||
|
|
'name': 'Unknown Product',
|
|||
|
|
'introduction': 'Unable to fetch product information',
|
|||
|
|
'user_count': None,
|
|||
|
|
'maker_link': None,
|
|||
|
|
'maker_statement': None
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 保存到数据库
|
|||
|
|
self.save_product_info(product_info)
|
|||
|
|
|
|||
|
|
# 统计结果
|
|||
|
|
conn = sqlite3.connect(self.db_path)
|
|||
|
|
cursor = conn.cursor()
|
|||
|
|
cursor.execute("SELECT COUNT(*) FROM products")
|
|||
|
|
count = cursor.fetchone()[0]
|
|||
|
|
|
|||
|
|
cursor.execute("SELECT name, url FROM products")
|
|||
|
|
products = cursor.fetchall()
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
logger.success("测试任务完成")
|
|||
|
|
|
|||
|
|
print("\n=== 测试结果统计 ===")
|
|||
|
|
print(f"数据库中的产品数量: {count}")
|
|||
|
|
print("已抓取的产品:")
|
|||
|
|
for name, url in products:
|
|||
|
|
print(f" - {name}: {url}")
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
"""主函数"""
|
|||
|
|
# 配置日志
|
|||
|
|
logger.remove()
|
|||
|
|
logger.add(
|
|||
|
|
"api_scraper.log",
|
|||
|
|
level="DEBUG",
|
|||
|
|
format="{time:YYYY-MM-DD HH:mm:ss} | {level:<8} | {name}:{function}:{line} - {message}",
|
|||
|
|
rotation="10 MB",
|
|||
|
|
retention="7 days"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 创建抓取器实例
|
|||
|
|
scraper = ProductHuntAPIScraper()
|
|||
|
|
|
|||
|
|
# 运行测试
|
|||
|
|
scraper.run_test()
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
main()
|