Files
tophux_scrape/product/playwright-get-data.py

217 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
使用Playwright连接远程Chrome调试端口访问ProductHunt页面
"""
import asyncio
from playwright.async_api import async_playwright
from loguru import logger
import sys
# 配置日志
logger.remove()
logger.add(sys.stderr, level="INFO", format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>")
class ProductHuntScraper:
"""ProductHunt数据抓取器"""
def __init__(self, debug_port=9222):
self.debug_port = debug_port
self.browser = None
self.page = None
async def connect_to_existing_chrome(self):
"""连接到已运行的Chrome实例"""
logger.info(f"正在连接到Chrome远程调试端口 {self.debug_port}")
try:
# 创建Playwright实例并保持引用
self.playwright = await async_playwright().start()
# 连接到已运行的Chrome实例
self.browser = await self.playwright.chromium.connect_over_cdp(
f"http://localhost:{self.debug_port}"
)
# 获取第一个上下文(通常是默认的)
contexts = self.browser.contexts
if contexts:
context = contexts[0]
# 获取第一个页面
pages = context.pages
if pages:
self.page = pages[0]
else:
# 如果没有页面,创建新页面
self.page = await context.new_page()
else:
# 如果没有上下文,创建新上下文
context = await self.browser.new_context()
self.page = await context.new_page()
logger.success("成功连接到Chrome浏览器")
return True
except Exception as e:
logger.error(f"连接Chrome失败: {e}")
return False
async def navigate_to_producthunt(self, url):
"""导航到ProductHunt页面"""
if not self.page:
logger.error("页面未初始化")
return False
try:
logger.info(f"正在访问: {url}")
# 增加页面导航超时时间到300秒
await self.page.goto(url, wait_until="domcontentloaded", timeout=300000)
# 等待页面加载完成,增加超时时间
await self.page.wait_for_load_state("networkidle", timeout=300000)
# 等待页面标题包含"Product Hunt"最长等待300秒
logger.info("等待页面标题包含'Product Hunt'...")
max_wait_time = 300 # 最大等待时间(秒)
wait_interval = 5 # 检查间隔(秒)
waited_time = 0
while waited_time < max_wait_time:
# 获取页面标题
title = await self.page.title()
logger.info(f"当前页面标题: {title}")
# 检查标题是否包含"Product Hunt"
if "Product Hunt" in title:
logger.success(f"页面标题已包含'Product Hunt',等待时间: {waited_time}")
logger.success("Product Hunt网站已成功打开")
return True
# 等待一段时间后再次检查
await asyncio.sleep(wait_interval)
waited_time += wait_interval
logger.info(f"已等待 {waited_time} 秒,继续等待...")
# 如果超时仍未找到目标标题
logger.warning(f"等待超时({max_wait_time}秒),页面标题仍未包含'Product Hunt'")
logger.info(f"最终页面标题: {await self.page.title()}")
# 即使超时如果页面正常加载也返回True
final_title = await self.page.title()
if final_title and "Not Found" not in final_title and "Error" not in final_title:
logger.success("页面已正常加载,但标题不符合预期")
return True
else:
logger.error("页面加载失败")
return False
except Exception as e:
logger.error(f"访问页面失败: {e}")
return False
async def extract_product_info(self):
"""提取产品信息"""
if not self.page:
logger.error("页面未初始化")
return None
try:
product_info = {}
# 提取产品名称
name_element = await self.page.query_selector("h1")
if name_element:
product_info["name"] = await name_element.text_content()
logger.info(f"产品名称: {product_info['name']}")
# 提取产品描述
desc_element = await self.page.query_selector("[data-testid='product-description']")
if not desc_element:
desc_element = await self.page.query_selector(".styles_description__")
if desc_element:
product_info["description"] = await desc_element.text_content()
logger.info(f"产品描述: {product_info['description'][:100]}...")
# 提取投票数
votes_element = await self.page.query_selector("[data-testid='vote-button']")
if votes_element:
votes_text = await votes_element.text_content()
product_info["votes"] = votes_text
logger.info(f"投票数: {votes_text}")
# 提取产品链接
website_element = await self.page.query_selector("a[href*='://']")
if website_element:
product_info["website"] = await website_element.get_attribute("href")
logger.info(f"产品网站: {product_info['website']}")
# 截取页面截图
screenshot_path = "product_screenshot.png"
await self.page.screenshot(path=screenshot_path, full_page=True)
logger.info(f"页面截图已保存到: {screenshot_path}")
return product_info
except Exception as e:
logger.error(f"提取产品信息失败: {e}")
return None
async def close(self):
"""关闭连接"""
if self.browser:
await self.browser.close()
logger.info("浏览器连接已关闭")
if hasattr(self, 'playwright') and self.playwright:
await self.playwright.stop()
logger.info("Playwright实例已关闭")
async def main():
"""主函数"""
logger.info("开始ProductHunt数据抓取任务")
# 目标URL
target_url = "https://www.producthunt.com/products/notion"
# 创建抓取器实例
scraper = ProductHuntScraper(debug_port=9222)
try:
# 连接到Chrome
if not await scraper.connect_to_existing_chrome():
logger.error("无法连接到Chrome请确保Chrome已启动并启用远程调试")
return
# 导航到目标页面
if not await scraper.navigate_to_producthunt(target_url):
logger.error("页面访问失败")
return
# 提取产品信息
product_info = await scraper.extract_product_info()
if product_info:
logger.success("产品信息提取完成")
# 保存产品信息到JSON文件
import json
with open("product_info.json", "w", encoding="utf-8") as f:
json.dump(product_info, f, ensure_ascii=False, indent=2)
logger.info("产品信息已保存到 product_info.json")
else:
logger.warning("未能提取到产品信息")
except Exception as e:
logger.error(f"执行过程中发生错误: {e}")
finally:
# 关闭连接
await scraper.close()
logger.info("任务完成")
if __name__ == "__main__":
asyncio.run(main())