Files
tophux_scrape/product/playwright-get-data.py
2025-11-22 17:31:44 +08:00

456 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
使用Playwright连接远程Chrome调试端口访问ProductHunt页面
"""
import asyncio
from playwright.async_api import async_playwright
from loguru import logger
import sys
from datetime import datetime
# 配置日志
logger.remove()
logger.add(sys.stderr, level="INFO", format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>")
class ProductHuntScraper:
"""ProductHunt数据抓取器"""
def __init__(self, debug_port=9222):
self.debug_port = debug_port
self.browser = None
self.page = None
self.click_records = [] # 记录点击行为
self.dom_selection_records = [] # 记录DOM选取行为
async def connect_to_existing_chrome(self):
"""连接到已运行的Chrome实例"""
logger.info(f"正在连接到Chrome远程调试端口 {self.debug_port}")
try:
# 创建Playwright实例并保持引用
self.playwright = await async_playwright().start()
# 连接到已运行的Chrome实例
self.browser = await self.playwright.chromium.connect_over_cdp(
f"http://localhost:{self.debug_port}"
)
# 获取第一个上下文(通常是默认的)
contexts = self.browser.contexts
if contexts:
context = contexts[0]
# 获取第一个页面
pages = context.pages
if pages:
self.page = pages[0]
else:
# 如果没有页面,创建新页面
self.page = await context.new_page()
else:
# 如果没有上下文,创建新上下文
context = await self.browser.new_context()
self.page = await context.new_page()
logger.success("成功连接到Chrome浏览器")
return True
except Exception as e:
logger.error(f"连接Chrome失败: {e}")
return False
async def record_click(self, x, y, selector="", description=""):
"""记录点击行为"""
click_record = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"type": "click",
"x": x,
"y": y,
"selector": selector,
"description": description
}
self.click_records.append(click_record)
logger.info(f"记录点击: {description} - 坐标({x}, {y}) - 选择器: {selector}")
async def record_dom_selection(self, selector, description=""):
"""记录DOM选取行为"""
dom_record = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"type": "dom_selection",
"selector": selector,
"description": description
}
self.dom_selection_records.append(dom_record)
logger.info(f"记录DOM选取: {description} - 选择器: {selector}")
async def save_behavior_records(self):
"""保存行为记录到文件"""
import json
records = {
"click_records": self.click_records,
"dom_selection_records": self.dom_selection_records
}
filename = f"playwright_behavior_records_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(records, f, ensure_ascii=False, indent=2)
logger.success(f"行为记录已保存到: {filename}")
async def navigate_to_producthunt(self, url):
"""导航到ProductHunt页面"""
if not self.page:
logger.error("页面未初始化")
return False
try:
logger.info(f"正在访问: {url}")
# 增加页面导航超时时间到300秒
await self.page.goto(url, wait_until="domcontentloaded", timeout=300000)
# 等待页面标题包含"Product Hunt"最长等待300秒
logger.info("等待页面标题包含'Product Hunt'...")
max_wait_time = 300 # 最大等待时间(秒)
wait_interval = 5 # 检查间隔(秒)
waited_time = 0
while waited_time < max_wait_time:
# 获取页面标题
title = await self.page.title()
logger.info(f"当前页面标题: {title}")
# 检查标题是否包含"Product Hunt"
if "Product Hunt" in title:
logger.success(f"页面标题已包含'Product Hunt',等待时间: {waited_time}")
logger.success("Product Hunt网站已成功打开")
return True
# 等待一段时间后再次检查
await asyncio.sleep(wait_interval)
waited_time += wait_interval
logger.info(f"已等待 {waited_time} 秒,继续等待...")
# 如果超时仍未找到目标标题
logger.warning(f"等待超时({max_wait_time}秒),页面标题仍未包含'Product Hunt'")
logger.info(f"最终页面标题: {await self.page.title()}")
# 即使超时如果页面正常加载也返回True
final_title = await self.page.title()
if final_title and "Not Found" not in final_title and "Error" not in final_title:
logger.success("页面已正常加载,但标题不符合预期")
return True
else:
logger.error("页面加载失败")
return False
except Exception as e:
logger.error(f"访问页面失败: {e}")
return False
async def extract_maker_statement_from_current_window(self, maker_link, maker_text):
"""在当前窗口中提取制作人发言"""
if not maker_link:
logger.warning("制作人链接为空")
return ""
if not self.page:
logger.error("当前页面未初始化")
return ""
try:
# 记录点击制作人链接的行为
await self.record_click("制作人链接", "点击制作人链接在当前窗口打开")
# 在当前页面导航到制作人链接
logger.info(f"正在在当前窗口打开制作人链接: {maker_link}")
await self.page.goto(maker_link, wait_until="domcontentloaded")
# 等待页面加载
await self.page.wait_for_load_state("networkidle")
# 等待title元素出现并包含产品名称最长等待2分钟
logger.info("等待title元素出现并包含产品名称最长等待2分钟...")
try:
# 等待title元素出现最长等待2分钟
await self.page.wait_for_selector("title", timeout=120000)
# 检查title是否包含产品名称
title_text = await self.page.title()
logger.info(f"页面标题: {title_text}")
# 获取产品名称从maker_text参数中获取
product_name = maker_text.strip() if maker_text else ""
if product_name and product_name.lower() in title_text.lower():
logger.success(f"标题包含产品名称: {product_name}")
else:
logger.warning(f"标题不包含产品名称,产品名称: {product_name}")
except Exception as e:
logger.error(f"等待title元素失败: {e}")
# 再等待30秒确保页面完全加载
logger.info("再等待30秒确保页面完全加载...")
await self.page.wait_for_timeout(30000) # 等待30秒
# 提取制作人评论内容XPath: //*[@id=\"comment-4597755\"]/div/div[2]/div/div/div
logger.info("正在提取制作人评论内容...")
try:
# 使用XPath查找评论元素
comment_element = await self.page.query_selector(
'xpath=//*[@id="comment-4597755"]/div/div[2]/div/div/div'
)
if comment_element:
maker_statement = (await comment_element.text_content()).strip()
logger.info(f"制作人评论内容: {maker_statement[:200]}...")
return maker_statement
else:
logger.warning("未找到XPath为//*[@id=\"comment-4597755\"]/div/div[2]/div/div/div的元素")
except Exception as e:
logger.error(f"提取制作人评论内容失败: {e}")
return ""
except Exception as e:
logger.error(f"在当前窗口打开制作人链接失败: {e}")
return ""
async def _extract_maker_statement_direct_open(self, maker_link, maker_text):
"""备用方法:直接在新窗口中打开链接"""
try:
logger.info("使用备用方法:直接在新窗口中打开链接...")
# 创建新页面
new_page = await self.browser.new_page()
# 导航到制作人页面
await new_page.goto(maker_link, wait_until="domcontentloaded", timeout=30000)
# 等待页面加载
await new_page.wait_for_timeout(15000)
logger.info("页面加载等待完成,开始提取内容...")
# 抓取第一个section的tag
await self.record_dom_selection('section', "备用方法-新窗口第一个section标签")
first_section = await new_page.query_selector('section')
if first_section:
logger.success("找到第一个section标签")
# 在section下面找一个没有任何class的div标签
await self.record_dom_selection('div:not([class])', "备用方法-section下无class的div标签")
div_without_class = await first_section.query_selector('div:not([class])')
if div_without_class:
logger.success("找到无class的div标签")
# 提取div及其子标签的所有文本内容
maker_statement = await div_without_class.inner_text()
result = maker_statement.strip()
logger.info(f"制作人发言(新窗口): {result[:2000]}...")
else:
logger.warning("未找到无class的div标签")
# 回退到提取section的文本内容
section_text = await first_section.inner_text()
result = section_text.strip()
logger.info(f"制作人发言(回退section): {result[:200]}...")
else:
logger.warning("未找到section标签")
# 回退到原始a标签文本
result = maker_text
logger.info(f"制作人发言(回退a标签): {maker_text[:200]}...")
# 添加充分延迟,确保内容完全加载
logger.info("等待内容完全稳定...")
await new_page.wait_for_timeout(3000)
# 关闭新页面
await new_page.close()
logger.info("新窗口已关闭")
return result
except Exception as e:
logger.error(f"备用方法也失败: {e}")
# 如果备用方法也失败回退到原始a标签文本
return maker_text
async def extract_product_info(self):
"""提取产品信息"""
if not self.page:
logger.error("页面未初始化")
return None
try:
product_info = {}
# 提取产品名称XPath: //h1
logger.info("正在提取产品名称...")
try:
await self.record_dom_selection("//h1", "产品名称")
name_element = await self.page.query_selector("xpath=//h1")
if name_element:
product_info["name"] = (await name_element.text_content()).strip()
logger.info(f"产品名称: {product_info['name']}")
else:
logger.warning("未找到XPath为//h1的元素")
except Exception as e:
logger.error(f"提取产品名称失败: {e}")
# 提取产品简介XPath: //*[@class=\"relative text-16 font-normal text-gray-700\"]//div
logger.info("正在提取产品简介...")
try:
await self.record_dom_selection('//*[@class="relative text-16 font-normal text-gray-700"]//div', "产品简介")
intro_element = await self.page.query_selector('xpath=//*[@class="relative text-16 font-normal text-gray-700"]//div')
if intro_element:
product_info["introduction"] = (await intro_element.text_content()).strip()
logger.info(f"产品简介: {product_info['introduction'][:200]}...")
else:
logger.warning("未找到XPath为//*[@class=\"relative text-16 font-normal text-gray-700\"]//div的元素")
except Exception as e:
logger.error(f"提取产品简介失败: {e}")
# 提取用户数XPath: //*[@class=\"flex flex-row gap-2\"]//div/div[2]/span/p
logger.info("正在提取用户数...")
try:
await self.record_dom_selection('//*[@class="flex flex-row gap-2"]//div/div[2]/span/p', "用户数")
user_count_element = await self.page.query_selector('xpath=//*[@class="flex flex-row gap-2"]//div/div[2]/span/p')
if user_count_element:
product_info["user_count"] = (await user_count_element.text_content()).strip()
logger.info(f"用户数: {product_info['user_count']}")
else:
logger.warning("未找到XPath为//*[@class=\"flex flex-row gap-2\"]//div/div[2]/span/p的元素")
except Exception as e:
logger.error(f"提取用户数失败: {e}")
# 提取制作人发言链接XPath: //span[contains(@class, \"absolute\")]的父级a标签
logger.info("正在提取制作人发言链接...")
try:
# 增加显性等待,等待页面元素加载完成
logger.info("等待页面元素加载...")
await self.page.wait_for_timeout(20000) # 等待20秒
# 先找到包含class="absolute"的span元素
await self.record_dom_selection('//span[contains(@class, "absolute")]', "制作人span标签")
span_element = await self.page.query_selector('xpath=//span[contains(@class, "absolute")]')
if span_element:
# 找到span元素的父级a标签
await self.record_dom_selection('//span[contains(@class, "absolute")]/parent::a', "制作人链接")
# 使用更可靠的方法获取父级a标签
a_element = await span_element.evaluate_handle('(element) => element.closest("a")')
# 检查a_element是否为有效的元素句柄
if a_element:
# 提取a标签的文本内容
maker_text = (await a_element.text_content()).strip()
# 提取a标签的href属性超链接
maker_link = await a_element.get_attribute('href')
# 拼凑完整的URL
if maker_link and not maker_link.startswith('http'):
# 如果是相对路径拼凑为完整URL
base_url = "https://www.producthunt.com"
if maker_link.startswith('/'):
maker_link = base_url + maker_link
else:
maker_link = base_url + '/' + maker_link
product_info["maker_link"] = maker_link
logger.info(f"制作人链接: {maker_link}")
# 调用子函数在当前窗口中提取制作人发言
product_info["maker_statement"] = await self.extract_maker_statement_from_current_window(maker_link, maker_text)
else:
logger.warning("未找到制作人链接的a标签")
else:
logger.warning("未找到XPath为//span[contains(@class, \"absolute\")]的元素")
except Exception as e:
logger.error(f"提取制作人发言链接失败: {e}")
# 保存到临时文件
temp_file_path = "temp_product_info.txt"
with open(temp_file_path, "w", encoding="utf-8") as f:
f.write("=== Product Hunt 产品信息 ===\n\n")
f.write(f"产品名称: {product_info.get('name', '未获取')}\n\n")
f.write(f"产品简介: {product_info.get('introduction', '未获取')}\n\n")
f.write(f"制作人发言: {product_info.get('maker_statement', '未获取')}\n\n")
f.write(f"用户数: {product_info.get('user_count', '未获取')}\n\n")
f.write(f"提取时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
logger.info(f"产品信息已保存到临时文件: {temp_file_path}")
# 截取页面截图
screenshot_path = "product_screenshot.png"
await self.page.screenshot(path=screenshot_path, full_page=True)
logger.info(f"页面截图已保存到: {screenshot_path}")
return product_info
except Exception as e:
logger.error(f"提取产品信息失败: {e}")
return None
async def close(self):
"""关闭连接"""
if self.browser:
await self.browser.close()
logger.info("浏览器连接已关闭")
if hasattr(self, 'playwright') and self.playwright:
await self.playwright.stop()
logger.info("Playwright实例已关闭")
async def main():
"""主函数"""
logger.info("开始ProductHunt数据抓取任务")
# 目标URL
target_url = "https://www.producthunt.com/products/notion"
# 创建抓取器实例
scraper = ProductHuntScraper(debug_port=9222)
try:
# 连接到Chrome
if not await scraper.connect_to_existing_chrome():
logger.error("无法连接到Chrome请确保Chrome已启动并启用远程调试")
return
# 导航到目标页面
if not await scraper.navigate_to_producthunt(target_url):
logger.error("页面访问失败")
return
# 提取产品信息
product_info = await scraper.extract_product_info()
if product_info:
logger.success("产品信息提取完成")
# 保存产品信息到JSON文件
import json
with open("product_info.json", "w", encoding="utf-8") as f:
json.dump(product_info, f, ensure_ascii=False, indent=2)
logger.info("产品信息已保存到 product_info.json")
# 保存点击和DOM选取行为记录
await scraper.save_behavior_records()
logger.info("行为记录已保存到 behavior_records.json")
else:
logger.warning("未能提取到产品信息")
except Exception as e:
logger.error(f"执行过程中发生错误: {e}")
finally:
# 关闭连接
await scraper.close()
logger.info("任务完成")
if __name__ == "__main__":
asyncio.run(main())