Files
tophux_scrape/product/playwright-get-data.py
2025-11-18 08:07:31 +08:00

259 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
使用Playwright连接远程Chrome调试端口访问ProductHunt页面
"""
import asyncio
from playwright.async_api import async_playwright
from loguru import logger
import sys
from datetime import datetime
# 配置日志
logger.remove()
logger.add(sys.stderr, level="INFO", format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>")
class ProductHuntScraper:
"""ProductHunt数据抓取器"""
def __init__(self, debug_port=9222):
self.debug_port = debug_port
self.browser = None
self.page = None
async def connect_to_existing_chrome(self):
"""连接到已运行的Chrome实例"""
logger.info(f"正在连接到Chrome远程调试端口 {self.debug_port}")
try:
# 创建Playwright实例并保持引用
self.playwright = await async_playwright().start()
# 连接到已运行的Chrome实例
self.browser = await self.playwright.chromium.connect_over_cdp(
f"http://localhost:{self.debug_port}"
)
# 获取第一个上下文(通常是默认的)
contexts = self.browser.contexts
if contexts:
context = contexts[0]
# 获取第一个页面
pages = context.pages
if pages:
self.page = pages[0]
else:
# 如果没有页面,创建新页面
self.page = await context.new_page()
else:
# 如果没有上下文,创建新上下文
context = await self.browser.new_context()
self.page = await context.new_page()
logger.success("成功连接到Chrome浏览器")
return True
except Exception as e:
logger.error(f"连接Chrome失败: {e}")
return False
async def navigate_to_producthunt(self, url):
"""导航到ProductHunt页面"""
if not self.page:
logger.error("页面未初始化")
return False
try:
logger.info(f"正在访问: {url}")
# 增加页面导航超时时间到300秒
await self.page.goto(url, wait_until="domcontentloaded", timeout=300000)
# 等待页面加载完成,增加超时时间
await self.page.wait_for_load_state("networkidle", timeout=300000)
# 等待页面标题包含"Product Hunt"最长等待300秒
logger.info("等待页面标题包含'Product Hunt'...")
max_wait_time = 300 # 最大等待时间(秒)
wait_interval = 5 # 检查间隔(秒)
waited_time = 0
while waited_time < max_wait_time:
# 获取页面标题
title = await self.page.title()
logger.info(f"当前页面标题: {title}")
# 检查标题是否包含"Product Hunt"
if "Product Hunt" in title:
logger.success(f"页面标题已包含'Product Hunt',等待时间: {waited_time}")
logger.success("Product Hunt网站已成功打开")
return True
# 等待一段时间后再次检查
await asyncio.sleep(wait_interval)
waited_time += wait_interval
logger.info(f"已等待 {waited_time} 秒,继续等待...")
# 如果超时仍未找到目标标题
logger.warning(f"等待超时({max_wait_time}秒),页面标题仍未包含'Product Hunt'")
logger.info(f"最终页面标题: {await self.page.title()}")
# 即使超时如果页面正常加载也返回True
final_title = await self.page.title()
if final_title and "Not Found" not in final_title and "Error" not in final_title:
logger.success("页面已正常加载,但标题不符合预期")
return True
else:
logger.error("页面加载失败")
return False
except Exception as e:
logger.error(f"访问页面失败: {e}")
return False
async def extract_product_info(self):
"""提取产品信息"""
if not self.page:
logger.error("页面未初始化")
return None
try:
product_info = {}
# 提取产品名称h1标签
name_element = await self.page.query_selector("h1")
if name_element:
product_info["name"] = (await name_element.text_content()).strip()
logger.info(f"产品名称: {product_info['name']}")
# 提取产品简介class为"relative text-16 font-normal text-gray-700"的div
logger.info("正在提取产品简介...")
try:
intro_div = await self.page.query_selector('div.relative.text-16.font-normal.text-gray-700')
if intro_div:
product_info["introduction"] = (await intro_div.text_content()).strip()
logger.info(f"产品简介: {product_info['introduction'][:200]}...")
else:
logger.warning("未找到class为'relative text-16 font-normal text-gray-700'的div")
except Exception as e:
logger.error(f"提取产品简介失败: {e}")
# 等待制作人发言动态加载等待class="flex flex-col gap-2"的section标签出现
logger.info("等待制作人发言动态加载...")
try:
# 等待section标签出现最长等待60秒
section_element = await self.page.wait_for_selector(
'section.flex.flex-col.gap-2',
timeout=60000
)
if section_element:
logger.success("制作人发言区域已加载")
# 提取制作人发言class为"flex flex-col gap-1"的div里面的span标签
maker_div = await section_element.query_selector('div.flex.flex-col.gap-1')
if maker_div:
span_element = await maker_div.query_selector('span')
if span_element:
product_info["maker_statement"] = (await span_element.text_content()).strip()
logger.info(f"制作人发言: {product_info['maker_statement'][:200]}...")
else:
logger.warning("在div中未找到span标签")
else:
logger.warning("未找到class为'flex flex-col gap-1'的div")
else:
logger.warning("制作人发言区域未加载")
except Exception as e:
logger.error(f"等待制作人发言加载失败: {e}")
# 提取用户数class="text-14 font-medium text-gray-700"的p标签
logger.info("正在提取用户数...")
try:
user_count_element = await self.page.query_selector('p.text-14.font-medium.text-gray-700')
if user_count_element:
product_info["user_count"] = (await user_count_element.text_content()).strip()
logger.info(f"用户数: {product_info['user_count']}")
else:
logger.warning("未找到class为'text-14 font-medium text-gray-700'的p标签")
except Exception as e:
logger.error(f"提取用户数失败: {e}")
# 保存到临时文件
temp_file_path = "temp_product_info.txt"
with open(temp_file_path, "w", encoding="utf-8") as f:
f.write("=== Product Hunt 产品信息 ===\n\n")
f.write(f"产品名称: {product_info.get('name', '未获取')}\n\n")
f.write(f"产品简介: {product_info.get('introduction', '未获取')}\n\n")
f.write(f"制作人发言: {product_info.get('maker_statement', '未获取')}\n\n")
f.write(f"用户数: {product_info.get('user_count', '未获取')}\n\n")
f.write(f"提取时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
logger.info(f"产品信息已保存到临时文件: {temp_file_path}")
# 截取页面截图
screenshot_path = "product_screenshot.png"
await self.page.screenshot(path=screenshot_path, full_page=True)
logger.info(f"页面截图已保存到: {screenshot_path}")
return product_info
except Exception as e:
logger.error(f"提取产品信息失败: {e}")
return None
async def close(self):
"""关闭连接"""
if self.browser:
await self.browser.close()
logger.info("浏览器连接已关闭")
if hasattr(self, 'playwright') and self.playwright:
await self.playwright.stop()
logger.info("Playwright实例已关闭")
async def main():
"""主函数"""
logger.info("开始ProductHunt数据抓取任务")
# 目标URL
target_url = "https://www.producthunt.com/products/notion"
# 创建抓取器实例
scraper = ProductHuntScraper(debug_port=9222)
try:
# 连接到Chrome
if not await scraper.connect_to_existing_chrome():
logger.error("无法连接到Chrome请确保Chrome已启动并启用远程调试")
return
# 导航到目标页面
if not await scraper.navigate_to_producthunt(target_url):
logger.error("页面访问失败")
return
# 提取产品信息
product_info = await scraper.extract_product_info()
if product_info:
logger.success("产品信息提取完成")
# 保存产品信息到JSON文件
import json
with open("product_info.json", "w", encoding="utf-8") as f:
json.dump(product_info, f, ensure_ascii=False, indent=2)
logger.info("产品信息已保存到 product_info.json")
else:
logger.warning("未能提取到产品信息")
except Exception as e:
logger.error(f"执行过程中发生错误: {e}")
finally:
# 关闭连接
await scraper.close()
logger.info("任务完成")
if __name__ == "__main__":
asyncio.run(main())