#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
使用Playwright连接远程Chrome调试端口访问ProductHunt页面
"""
import asyncio
from playwright.async_api import async_playwright
from loguru import logger
import sys
from datetime import datetime
# 配置日志
logger.remove()
logger.add(sys.stderr, level="INFO", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}")
class ProductHuntScraper:
"""ProductHunt数据抓取器"""
def __init__(self, debug_port=9222):
self.debug_port = debug_port
self.browser = None
self.page = None
self.click_records = [] # 记录点击行为
self.dom_selection_records = [] # 记录DOM选取行为
async def connect_to_existing_chrome(self):
"""连接到已运行的Chrome实例"""
logger.info(f"正在连接到Chrome远程调试端口 {self.debug_port}")
try:
# 创建Playwright实例并保持引用
self.playwright = await async_playwright().start()
# 连接到已运行的Chrome实例
self.browser = await self.playwright.chromium.connect_over_cdp(
f"http://localhost:{self.debug_port}"
)
# 获取第一个上下文(通常是默认的)
contexts = self.browser.contexts
if contexts:
context = contexts[0]
# 获取第一个页面
pages = context.pages
if pages:
self.page = pages[0]
else:
# 如果没有页面,创建新页面
self.page = await context.new_page()
else:
# 如果没有上下文,创建新上下文
context = await self.browser.new_context()
self.page = await context.new_page()
logger.success("成功连接到Chrome浏览器")
return True
except Exception as e:
logger.error(f"连接Chrome失败: {e}")
return False
async def record_click(self, x, y, selector="", description=""):
"""记录点击行为"""
click_record = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"type": "click",
"x": x,
"y": y,
"selector": selector,
"description": description
}
self.click_records.append(click_record)
logger.info(f"记录点击: {description} - 坐标({x}, {y}) - 选择器: {selector}")
async def record_dom_selection(self, selector, description=""):
"""记录DOM选取行为"""
dom_record = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"type": "dom_selection",
"selector": selector,
"description": description
}
self.dom_selection_records.append(dom_record)
logger.info(f"记录DOM选取: {description} - 选择器: {selector}")
async def save_behavior_records(self):
"""保存行为记录到文件"""
import json
records = {
"click_records": self.click_records,
"dom_selection_records": self.dom_selection_records
}
filename = f"playwright_behavior_records_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(records, f, ensure_ascii=False, indent=2)
logger.success(f"行为记录已保存到: {filename}")
async def navigate_to_producthunt(self, url):
"""导航到ProductHunt页面"""
if not self.page:
logger.error("页面未初始化")
return False
try:
logger.info(f"正在访问: {url}")
# 增加页面导航超时时间到300秒
await self.page.goto(url, wait_until="domcontentloaded", timeout=300000)
# 等待页面标题包含"Product Hunt",最长等待300秒
logger.info("等待页面标题包含'Product Hunt'...")
max_wait_time = 60 # 最大等待时间(秒)
wait_interval = 5 # 检查间隔(秒)
waited_time = 0
while waited_time < max_wait_time:
# 获取页面标题
title = await self.page.title()
logger.info(f"当前页面标题: {title}")
# 检查标题是否包含"Product Hunt"
if "Product Hunt" in title:
logger.success(f"页面标题已包含'Product Hunt',等待时间: {waited_time}秒")
logger.success("Product Hunt网站已成功打开")
return True
# 检查是否遇到Cloudflare验证
if "Just a moment" in title or "请稍候" in title or "Checking your browser" in title:
logger.info("遇到Cloudflare验证,等待验证完成...")
await asyncio.sleep(10) # 等待10秒
waited_time += 10
continue
# 检查是否已成功加载页面内容
try:
# 尝试查找页面中的关键元素
h1_element = await self.page.query_selector("h1")
if h1_element:
logger.success("检测到页面内容已加载")
return True
except Exception:
pass
# 等待一段时间后再次检查
await asyncio.sleep(wait_interval)
waited_time += wait_interval
logger.info(f"已等待 {waited_time} 秒,继续等待...")
# 如果超时仍未找到目标标题
logger.warning(f"等待超时({max_wait_time}秒),页面标题仍未包含'Product Hunt'")
logger.info(f"最终页面标题: {await self.page.title()}")
# 即使超时,如果页面正常加载也返回True
final_title = await self.page.title()
if final_title and "Not Found" not in final_title and "Error" not in final_title:
logger.success("页面已正常加载,但标题不符合预期")
return True
else:
logger.error("页面加载失败")
return False
except Exception as e:
logger.error(f"访问页面失败: {e}")
return False
async def extract_maker_statement_from_current_window(self, maker_link, maker_text):
"""在当前窗口中提取制作人发言"""
if not maker_link:
logger.warning("制作人链接为空")
return ""
if not self.page:
logger.error("当前页面未初始化")
return ""
try:
# 记录点击制作人链接的行为
await self.record_click("制作人链接", "点击制作人链接在当前窗口打开")
# 保存当前页面的URL,以便后续返回
original_url = self.page.url
logger.info(f"保存当前页面URL: {original_url}")
# 在当前页面导航到制作人链接
logger.info(f"正在在当前窗口打开制作人链接: {maker_link}")
# 设置更长的超时时间来处理模态窗口
try:
await self.page.goto(maker_link, wait_until="domcontentloaded", timeout=60000)
logger.success("页面导航成功")
except Exception as e:
logger.error(f"页面导航失败: {e}")
# 尝试返回原始页面
try:
await self.page.goto(original_url, wait_until="domcontentloaded")
logger.success(f"已返回原始页面: {original_url}")
except Exception as return_error:
logger.error(f"返回原始页面失败: {return_error}")
return ""
# 等待页面加载
await self.page.wait_for_load_state("networkidle")
# 检查并处理可能的模态窗口
try:
logger.info("检查是否存在模态窗口...")
modal_selectors = [
"[role='dialog']",
".modal",
".modal-dialog",
"[data-testid='modal']",
"[class*='modal']",
"[class*='overlay']",
"[class*='dialog']",
"[class*='popup']"
]
for selector in modal_selectors:
try:
modal_element = await self.page.query_selector(selector)
if modal_element:
logger.info(f"检测到模态窗口,选择器: {selector}")
# 尝试关闭模态窗口
close_selectors = [
"[aria-label='Close']",
".close",
".modal-close",
"[data-testid='close']",
"button:has-text('Close')",
"button:has-text('关闭')",
"button:has-text('X')"
]
for close_selector in close_selectors:
try:
close_button = await modal_element.query_selector(close_selector)
if close_button:
await close_button.click()
logger.success(f"已关闭模态窗口,使用选择器: {close_selector}")
await self.page.wait_for_timeout(1000) # 等待关闭动画
break
except Exception:
continue
# 如果模态窗口仍然存在,尝试点击模态窗口外部关闭
try:
await self.page.mouse.click(10, 10) # 点击页面左上角
logger.info("尝试点击页面外部关闭模态窗口")
await self.page.wait_for_timeout(1000)
except Exception:
pass
break
except Exception:
continue
except Exception as e:
logger.warning(f"检查模态窗口时出错: {e}")
# 快速检查页面是否已加载
logger.info("快速检查页面加载状态...")
# 立即尝试获取页面内容,不等待特定元素
try:
title_text = await self.page.title()
logger.info(f"页面标题: {title_text}")
except Exception as e:
logger.warning(f"获取页面标题失败: {e}")
# 快速检查页面是否有内容
try:
body_element = await self.page.query_selector("body")
if body_element:
body_text = await body_element.text_content()
if len(body_text.strip()) > 10:
logger.success("页面内容已加载")
else:
logger.warning("页面内容为空或过短")
except Exception as e:
logger.warning(f"检查页面内容失败: {e}")
# 短暂等待确保DOM稳定
logger.info("等待DOM稳定...")
await self.page.wait_for_timeout(2000) # 等待2秒
# 保存模态窗口截图用于调试
modal_screenshot = "modal_window_debug.png"
await self.page.screenshot(path=modal_screenshot, full_page=True)
logger.info(f"模态窗口调试截图已保存到: {modal_screenshot}")
# 首先检查页面内容,获取页面主要文本
try:
page_content = await self.page.content()
logger.info("页面内容已获取")
# 检查页面是否包含常见的关键词
keywords = ['comment', 'discussion', 'maker', 'creator', 'author', 'statement', 'description']
found_keywords = [kw for kw in keywords if kw in page_content.lower()]
if found_keywords:
logger.info(f"页面包含关键词: {found_keywords}")
else:
logger.warning("页面未检测到常见关键词")
except Exception as e:
logger.error(f"获取页面内容失败: {e}")
# 提取制作人评论内容 - 针对模态窗口的多种选择器策略
logger.info("正在提取制作人评论内容...")
# 策略1:尝试多种XPath选择器
xpath_selectors = [
# 新的主要选择器:包含prose、prose-format和richText类的div
"//div[contains(@class, 'prose') and contains(@class, 'prose-format') and contains(@class, 'richText')]",
# 备用选择器
'//*[@id="comment-4597755"]/div/div[2]/div/div/div', # 原始选择器
'//div[contains(@class, "comment")]//div[contains(@class, "text")]', # 通用评论选择器
'//div[contains(@class, "modal")]//div[contains(@class, "content")]', # 模态窗口内容
'//div[contains(@class, "dialog")]//div[contains(@class, "body")]', # 对话框内容
'//section//div[contains(@class, "text")]', # section内的文本内容
'//div[contains(@class, "launch")]//div[contains(@class, "description")]', # 发布描述
'//article//div[contains(@class, "content")]', # 文章内容
'//main//div[contains(@class, "text")]', # 主要内容区文本
# 其他备用选择器
"//div[contains(@class, 'styles_commentsContainer')]//div[contains(@class, 'styles_comment')]//div[contains(@class, 'styles_commentBody')]//p",
"//div[contains(@class, 'comment')]//p",
"//div[contains(@class, 'comments')]//p",
]
for i, xpath in enumerate(xpath_selectors, 1):
try:
logger.info(f"尝试选择器 {i}/{len(xpath_selectors)}: {xpath}")
comment_element = await self.page.query_selector(f'xpath={xpath}')
if comment_element:
maker_statement = (await comment_element.text_content()).strip()
if maker_statement: # 确保有内容
logger.success(f"使用选择器 {i} 成功提取制作人评论内容: {maker_statement[:200]}...")
# 提取完成后返回原始页面
logger.info("提取完成,正在返回原始产品页面...")
await self.page.goto(original_url, wait_until="domcontentloaded")
logger.success(f"已成功返回原始页面: {original_url}")
return maker_statement
else:
logger.warning(f"选择器 {i} 提取的内容为空")
except Exception as e:
logger.warning(f"选择器 {i} 失败: {e}")
# 策略2:如果所有选择器都失败,尝试提取页面主要文本内容
logger.info("所有选择器失败,尝试提取页面主要文本内容...")
try:
# 获取页面body文本
body_element = await self.page.query_selector('body')
if body_element:
full_text = (await body_element.text_content()).strip()
# 提取前500个字符作为制作人发言
if len(full_text) > 100:
maker_statement = full_text[:500]
logger.info(f"提取页面主要文本内容: {maker_statement[:200]}...")
# 提取完成后返回原始页面
logger.info("提取完成,正在返回原始产品页面...")
await self.page.goto(original_url, wait_until="domcontentloaded")
logger.success(f"已成功返回原始页面: {original_url}")
return maker_statement
except Exception as e:
logger.error(f"提取页面主要文本内容失败: {e}")
# 策略3:如果仍然失败,记录页面截图以便调试
logger.warning("所有提取策略都失败,保存截图用于调试...")
try:
screenshot_path = "modal_debug_screenshot.png"
await self.page.screenshot(path=screenshot_path, full_page=True)
logger.info(f"模态窗口截图已保存到: {screenshot_path}")
except Exception as e:
logger.error(f"保存截图失败: {e}")
# 即使未找到元素,也返回原始页面
logger.info("正在返回原始产品页面...")
await self.page.goto(original_url, wait_until="domcontentloaded")
logger.success(f"已成功返回原始页面: {original_url}")
return ""
except Exception as e:
logger.error(f"在当前窗口打开制作人链接失败: {e}")
# 保存当前页面截图用于调试
try:
debug_screenshot = "debug_maker_link_failure.png"
await self.page.screenshot(path=debug_screenshot, full_page=True)
logger.info(f"错误调试截图已保存到: {debug_screenshot}")
except Exception as screenshot_error:
logger.error(f"保存调试截图失败: {screenshot_error}")
# 发生异常时也尝试返回原始页面
try:
logger.info("发生异常,尝试返回原始产品页面...")
await self.page.goto(original_url, wait_until="domcontentloaded")
logger.success(f"已成功返回原始页面: {original_url}")
except Exception as return_error:
logger.error(f"返回原始页面失败: {return_error}")
return ""
async def _extract_maker_statement_direct_open(self, maker_link, maker_text):
"""备用方法:直接在新窗口中打开链接"""
try:
logger.info("使用备用方法:直接在新窗口中打开链接...")
# 创建新页面
new_page = await self.browser.new_page()
# 导航到制作人页面
await new_page.goto(maker_link, wait_until="domcontentloaded", timeout=30000)
# 等待页面加载
await new_page.wait_for_timeout(15000)
logger.info("页面加载等待完成,开始提取内容...")
# 抓取第一个section的tag
await self.record_dom_selection('section', "备用方法-新窗口第一个section标签")
first_section = await new_page.query_selector('section')
if first_section:
logger.success("找到第一个section标签")
# 在section下面找一个没有任何class的div标签
await self.record_dom_selection('div:not([class])', "备用方法-section下无class的div标签")
div_without_class = await first_section.query_selector('div:not([class])')
if div_without_class:
logger.success("找到无class的div标签")
# 提取div及其子标签的所有文本内容
maker_statement = await div_without_class.inner_text()
result = maker_statement.strip()
logger.info(f"制作人发言(新窗口): {result[:2000]}...")
else:
logger.warning("未找到无class的div标签")
# 回退到提取section的文本内容
section_text = await first_section.inner_text()
result = section_text.strip()
logger.info(f"制作人发言(回退section): {result[:200]}...")
else:
logger.warning("未找到section标签")
# 回退到原始a标签文本
result = maker_text
logger.info(f"制作人发言(回退a标签): {maker_text[:200]}...")
# 添加充分延迟,确保内容完全加载
logger.info("等待内容完全稳定...")
await new_page.wait_for_timeout(3000)
# 关闭新页面
await new_page.close()
logger.info("新窗口已关闭")
return result
except Exception as e:
logger.error(f"备用方法也失败: {e}")
# 如果备用方法也失败,回退到原始a标签文本
return maker_text
async def extract_product_info(self):
"""提取产品信息"""
if not self.page:
logger.error("页面未初始化")
return None
try:
product_info = {}
# 提取产品名称(XPath: //h1)
logger.info("正在提取产品名称...")
try:
await self.record_dom_selection("//h1", "产品名称")
name_element = await self.page.query_selector("xpath=//h1")
if name_element:
product_info["name"] = (await name_element.text_content()).strip()
logger.info(f"产品名称: {product_info['name']}")
else:
logger.warning("未找到XPath为//h1的元素")
except Exception as e:
logger.error(f"提取产品名称失败: {e}")
# 提取产品简介(XPath: //*[@class=\"relative text-16 font-normal text-gray-700\"]//div)
logger.info("正在提取产品简介...")
try:
await self.record_dom_selection('//*[@class="relative text-16 font-normal text-gray-700"]//div', "产品简介")
intro_element = await self.page.query_selector('xpath=//*[@class="relative text-16 font-normal text-gray-700"]//div')
if intro_element:
product_info["introduction"] = (await intro_element.text_content()).strip()
logger.info(f"产品简介: {product_info['introduction'][:200]}...")
else:
logger.warning("未找到XPath为//*[@class=\"relative text-16 font-normal text-gray-700\"]//div的元素")
except Exception as e:
logger.error(f"提取产品简介失败: {e}")
# 提取用户数(XPath: //*[@class=\"flex flex-row gap-2\"]//div/div[2]/span/p)
logger.info("正在提取用户数...")
try:
await self.record_dom_selection('//*[@class="flex flex-row gap-2"]//div/div[2]/span/p', "用户数")
user_count_element = await self.page.query_selector('xpath=//*[@class="flex flex-row gap-2"]//div/div[2]/span/p')
if user_count_element:
product_info["user_count"] = (await user_count_element.text_content()).strip()
logger.info(f"用户数: {product_info['user_count']}")
else:
logger.warning("未找到XPath为//*[@class=\"flex flex-row gap-2\"]//div/div[2]/span/p的元素")
except Exception as e:
logger.error(f"提取用户数失败: {e}")
# 提取制作人发言链接(XPath: //span[contains(@class, \"absolute\")]的父级a标签)
logger.info("正在提取制作人发言链接...")
try:
# 增加显性等待,等待页面元素加载完成
logger.info("等待页面元素加载...")
await self.page.wait_for_timeout(20000) # 等待20秒
# 先找到包含class="absolute"的span元素
await self.record_dom_selection('//span[contains(@class, "absolute")]', "制作人span标签")
span_element = await self.page.query_selector('xpath=//span[contains(@class, "absolute")]')
if span_element:
# 找到span元素的父级a标签
await self.record_dom_selection('//span[contains(@class, "absolute")]/parent::a', "制作人链接")
# 使用更可靠的方法获取父级a标签
a_element = await span_element.evaluate_handle('(element) => element.closest("a")')
# 检查a_element是否为有效的元素句柄
if a_element:
# 提取a标签的文本内容
maker_text = (await a_element.text_content()).strip()
# 提取a标签的href属性(超链接)
maker_link = await a_element.get_attribute('href')
# 拼凑完整的URL
if maker_link:
if not maker_link.startswith('http'):
# 如果是相对路径,拼凑为完整URL
base_url = "https://www.producthunt.com"
if maker_link.startswith('/'):
maker_link = base_url + maker_link
else:
maker_link = base_url + '/' + maker_link
# 验证URL是否有效(不能只是根路径)
if maker_link == "https://www.producthunt.com/" or maker_link == "https://www.producthunt.com":
logger.warning(f"制作人链接无效,跳过提取: {maker_link}")
product_info["maker_link"] = ""
product_info["maker_statement"] = ""
else:
product_info["maker_link"] = maker_link
logger.info(f"制作人链接: {maker_link}")
# 调用子函数在当前窗口中提取制作人发言
product_info["maker_statement"] = await self.extract_maker_statement_from_current_window(maker_link, maker_text)
else:
logger.warning("未获取到制作人链接")
product_info["maker_link"] = ""
product_info["maker_statement"] = ""
else:
logger.warning("未找到制作人链接的a标签")
else:
logger.warning("未找到XPath为//span[contains(@class, \"absolute\")]的元素")
except Exception as e:
logger.error(f"提取制作人发言链接失败: {e}")
# 保存到临时文件
temp_file_path = "temp_product_info.txt"
with open(temp_file_path, "w", encoding="utf-8") as f:
f.write("=== Product Hunt 产品信息 ===\n\n")
f.write(f"产品名称: {product_info.get('name', '未获取')}\n\n")
f.write(f"产品简介: {product_info.get('introduction', '未获取')}\n\n")
f.write(f"制作人发言: {product_info.get('maker_statement', '未获取')}\n\n")
f.write(f"用户数: {product_info.get('user_count', '未获取')}\n\n")
f.write(f"提取时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
logger.info(f"产品信息已保存到临时文件: {temp_file_path}")
# 截取页面截图
screenshot_path = "product_screenshot.png"
await self.page.screenshot(path=screenshot_path, full_page=True)
logger.info(f"页面截图已保存到: {screenshot_path}")
return product_info
except Exception as e:
logger.error(f"提取产品信息失败: {e}")
return None
async def close(self):
"""关闭连接"""
if self.browser:
await self.browser.close()
logger.info("浏览器连接已关闭")
if hasattr(self, 'playwright') and self.playwright:
await self.playwright.stop()
logger.info("Playwright实例已关闭")
async def main():
"""主函数"""
logger.info("开始ProductHunt数据抓取任务")
# 目标URL
target_url = "https://www.producthunt.com/products/palettebrain"
# 创建抓取器实例
scraper = ProductHuntScraper(debug_port=9222)
try:
# 连接到Chrome
if not await scraper.connect_to_existing_chrome():
logger.error("无法连接到Chrome,请确保Chrome已启动并启用远程调试")
return
# 导航到目标页面
if not await scraper.navigate_to_producthunt(target_url):
logger.error("页面访问失败")
return
# 提取产品信息
product_info = await scraper.extract_product_info()
if product_info:
logger.success("产品信息提取完成")
# 保存产品信息到JSON文件
import json
with open("product_info.json", "w", encoding="utf-8") as f:
json.dump(product_info, f, ensure_ascii=False, indent=2)
logger.info("产品信息已保存到 product_info.json")
# 保存点击和DOM选取行为记录
await scraper.save_behavior_records()
logger.info("行为记录已保存到 behavior_records.json")
else:
logger.warning("未能提取到产品信息")
except Exception as e:
logger.error(f"执行过程中发生错误: {e}")
finally:
# 关闭连接
await scraper.close()
logger.info("任务完成")
if __name__ == "__main__":
asyncio.run(main())