Files
tophux_scrape/product/api_scraper.py

245 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ProductHunt API抓取器 - 通过API获取产品信息
"""
import asyncio
import sqlite3
import requests
from loguru import logger
import os
import json
from urllib.parse import urlparse
class ProductHuntAPIScraper:
def __init__(self, db_path="test_product.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建products表
cursor.execute("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
url TEXT UNIQUE,
introduction TEXT,
user_count INTEGER,
maker_link TEXT,
maker_statement TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
logger.info(f"数据库已初始化: {self.db_path}")
def save_product_info(self, product_info):
"""保存产品信息到数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 检查是否已存在
cursor.execute("SELECT id FROM products WHERE url = ?", (product_info['url'],))
existing = cursor.fetchone()
if existing:
# 更新现有记录
cursor.execute("""
UPDATE products SET
name = ?, introduction = ?, user_count = ?,
maker_link = ?, maker_statement = ?, updated_at = CURRENT_TIMESTAMP
WHERE url = ?
""", (
product_info['name'], product_info['introduction'],
product_info['user_count'], product_info['maker_link'],
product_info['maker_statement'], product_info['url']
))
logger.info(f"更新产品信息: {product_info['name']}")
else:
# 插入新记录
cursor.execute("""
INSERT INTO products (name, url, introduction, user_count, maker_link, maker_statement)
VALUES (?, ?, ?, ?, ?, ?)
""", (
product_info['name'], product_info['url'], product_info['introduction'],
product_info['user_count'], product_info['maker_link'], product_info['maker_statement']
))
logger.info(f"保存产品信息: {product_info['name']}")
conn.commit()
conn.close()
def extract_product_name_from_url(self, url):
"""从URL中提取产品名称"""
try:
parsed_url = urlparse(url)
path_parts = parsed_url.path.split('/')
# 查找products路径段
for i, part in enumerate(path_parts):
if part == 'products' and i + 1 < len(path_parts):
product_slug = path_parts[i + 1]
# 将slug转换为可读的名称
name = product_slug.replace('-', ' ').title()
return name
# 如果找不到products路径段使用最后一个路径段
if path_parts:
last_part = path_parts[-1]
if last_part:
name = last_part.replace('-', ' ').title()
return name
return "Unknown Product"
except Exception as e:
logger.error(f"从URL提取产品名称失败: {e}")
return "Unknown Product"
def get_product_info_from_api(self, url):
"""尝试通过API获取产品信息"""
try:
# 从URL中提取产品slug
parsed_url = urlparse(url)
path_parts = parsed_url.path.split('/')
product_slug = None
for i, part in enumerate(path_parts):
if part == 'products' and i + 1 < len(path_parts):
product_slug = path_parts[i + 1]
break
if not product_slug:
logger.warning(f"无法从URL中提取产品slug: {url}")
return None
# 尝试使用ProductHunt的GraphQL API需要API密钥
# 这里我们使用一个简化的方法,只提取基本信息
product_info = {
'url': url,
'name': self.extract_product_name_from_url(url),
'introduction': f"Product from ProductHunt: {product_slug}",
'user_count': None, # 需要API访问
'maker_link': None, # 需要API访问
'maker_statement': None # 需要API访问
}
logger.info(f"通过API获取产品信息: {product_info['name']}")
return product_info
except Exception as e:
logger.error(f"API获取产品信息失败: {e}")
return None
def get_product_info_fallback(self, url):
"""备用方法从URL中提取基本信息"""
try:
product_name = self.extract_product_name_from_url(url)
product_info = {
'url': url,
'name': product_name,
'introduction': f"Product from ProductHunt: {product_name}",
'user_count': None,
'maker_link': None,
'maker_statement': None
}
logger.info(f"使用备用方法获取产品信息: {product_info['name']}")
return product_info
except Exception as e:
logger.error(f"备用方法获取产品信息失败: {e}")
return None
def run_test(self):
"""运行测试"""
# 从tophub_data.db获取ProductHunt链接
tophub_db_path = os.path.join(os.path.dirname(self.db_path), "..", "tophub_data.db")
conn = sqlite3.connect(tophub_db_path)
cursor = conn.cursor()
# 查询包含producthunt.com的链接
cursor.execute("""
SELECT url FROM articles
WHERE url LIKE '%producthunt.com%'
LIMIT 3
""")
urls = [row[0] for row in cursor.fetchall()]
conn.close()
logger.info(f"找到 {len(urls)} 个ProductHunt链接")
# 处理每个URL
for url in urls:
logger.info(f"处理URL: {url}")
# 尝试通过API获取产品信息
product_info = self.get_product_info_from_api(url)
# 如果API失败使用备用方法
if not product_info:
product_info = self.get_product_info_fallback(url)
# 如果两种方法都失败,创建基本产品信息
if not product_info:
product_info = {
'url': url,
'name': 'Unknown Product',
'introduction': 'Unable to fetch product information',
'user_count': None,
'maker_link': None,
'maker_statement': None
}
# 保存到数据库
self.save_product_info(product_info)
# 统计结果
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM products")
count = cursor.fetchone()[0]
cursor.execute("SELECT name, url FROM products")
products = cursor.fetchall()
conn.close()
logger.success("测试任务完成")
print("\n=== 测试结果统计 ===")
print(f"数据库中的产品数量: {count}")
print("已抓取的产品:")
for name, url in products:
print(f" - {name}: {url}")
def main():
"""主函数"""
# 配置日志
logger.remove()
logger.add(
"api_scraper.log",
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level:<8} | {name}:{function}:{line} - {message}",
rotation="10 MB",
retention="7 days"
)
# 创建抓取器实例
scraper = ProductHuntAPIScraper()
# 运行测试
scraper.run_test()
if __name__ == "__main__":
main()