Files
tophux_scrape/product/playwright-get-data.py

656 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
使用Playwright连接远程Chrome调试端口访问ProductHunt页面
"""
import asyncio
from playwright.async_api import async_playwright
from loguru import logger
import sys
from datetime import datetime
# 配置日志
logger.remove()
logger.add(sys.stderr, level="INFO", format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>")
class ProductHuntScraper:
"""ProductHunt数据抓取器"""
def __init__(self, debug_port=9222):
self.debug_port = debug_port
self.browser = None
self.page = None
self.click_records = [] # 记录点击行为
self.dom_selection_records = [] # 记录DOM选取行为
async def connect_to_existing_chrome(self):
"""连接到已运行的Chrome实例"""
logger.info(f"正在连接到Chrome远程调试端口 {self.debug_port}")
try:
# 创建Playwright实例并保持引用
self.playwright = await async_playwright().start()
# 连接到已运行的Chrome实例
self.browser = await self.playwright.chromium.connect_over_cdp(
f"http://localhost:{self.debug_port}"
)
# 获取第一个上下文(通常是默认的)
contexts = self.browser.contexts
if contexts:
context = contexts[0]
# 获取第一个页面
pages = context.pages
if pages:
self.page = pages[0]
else:
# 如果没有页面,创建新页面
self.page = await context.new_page()
else:
# 如果没有上下文,创建新上下文
context = await self.browser.new_context()
self.page = await context.new_page()
logger.success("成功连接到Chrome浏览器")
return True
except Exception as e:
logger.error(f"连接Chrome失败: {e}")
return False
async def record_click(self, x, y, selector="", description=""):
"""记录点击行为"""
click_record = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"type": "click",
"x": x,
"y": y,
"selector": selector,
"description": description
}
self.click_records.append(click_record)
logger.info(f"记录点击: {description} - 坐标({x}, {y}) - 选择器: {selector}")
async def record_dom_selection(self, selector, description=""):
"""记录DOM选取行为"""
dom_record = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"type": "dom_selection",
"selector": selector,
"description": description
}
self.dom_selection_records.append(dom_record)
logger.info(f"记录DOM选取: {description} - 选择器: {selector}")
async def save_behavior_records(self):
"""保存行为记录到文件"""
import json
records = {
"click_records": self.click_records,
"dom_selection_records": self.dom_selection_records
}
filename = f"playwright_behavior_records_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(records, f, ensure_ascii=False, indent=2)
logger.success(f"行为记录已保存到: {filename}")
async def navigate_to_producthunt(self, url):
"""导航到ProductHunt页面"""
if not self.page:
logger.error("页面未初始化")
return False
try:
logger.info(f"正在访问: {url}")
# 增加页面导航超时时间到300秒
await self.page.goto(url, wait_until="domcontentloaded", timeout=300000)
# 等待页面标题包含"Product Hunt"最长等待300秒
logger.info("等待页面标题包含'Product Hunt'...")
max_wait_time = 60 # 最大等待时间(秒)
wait_interval = 5 # 检查间隔(秒)
waited_time = 0
while waited_time < max_wait_time:
# 获取页面标题
title = await self.page.title()
logger.info(f"当前页面标题: {title}")
# 检查标题是否包含"Product Hunt"
if "Product Hunt" in title:
logger.success(f"页面标题已包含'Product Hunt',等待时间: {waited_time}")
logger.success("Product Hunt网站已成功打开")
return True
# 检查是否遇到Cloudflare验证
if "Just a moment" in title or "请稍候" in title or "Checking your browser" in title:
logger.info("遇到Cloudflare验证等待验证完成...")
await asyncio.sleep(10) # 等待10秒
waited_time += 10
continue
# 检查是否已成功加载页面内容
try:
# 尝试查找页面中的关键元素
h1_element = await self.page.query_selector("h1")
if h1_element:
logger.success("检测到页面内容已加载")
return True
except Exception:
pass
# 等待一段时间后再次检查
await asyncio.sleep(wait_interval)
waited_time += wait_interval
logger.info(f"已等待 {waited_time} 秒,继续等待...")
# 如果超时仍未找到目标标题
logger.warning(f"等待超时({max_wait_time}秒),页面标题仍未包含'Product Hunt'")
logger.info(f"最终页面标题: {await self.page.title()}")
# 即使超时如果页面正常加载也返回True
final_title = await self.page.title()
if final_title and "Not Found" not in final_title and "Error" not in final_title:
logger.success("页面已正常加载,但标题不符合预期")
return True
else:
logger.error("页面加载失败")
return False
except Exception as e:
logger.error(f"访问页面失败: {e}")
return False
async def extract_maker_statement_from_current_window(self, maker_link, maker_text):
"""在当前窗口中提取制作人发言"""
if not maker_link:
logger.warning("制作人链接为空")
return ""
if not self.page:
logger.error("当前页面未初始化")
return ""
try:
# 记录点击制作人链接的行为
await self.record_click("制作人链接", "点击制作人链接在当前窗口打开")
# 保存当前页面的URL以便后续返回
original_url = self.page.url
logger.info(f"保存当前页面URL: {original_url}")
# 在当前页面导航到制作人链接
logger.info(f"正在在当前窗口打开制作人链接: {maker_link}")
# 设置更长的超时时间来处理模态窗口
try:
await self.page.goto(maker_link, wait_until="domcontentloaded", timeout=60000)
logger.success("页面导航成功")
except Exception as e:
logger.error(f"页面导航失败: {e}")
# 尝试返回原始页面
try:
await self.page.goto(original_url, wait_until="domcontentloaded")
logger.success(f"已返回原始页面: {original_url}")
except Exception as return_error:
logger.error(f"返回原始页面失败: {return_error}")
return ""
# 等待页面加载
await self.page.wait_for_load_state("networkidle")
# 检查并处理可能的模态窗口
try:
logger.info("检查是否存在模态窗口...")
modal_selectors = [
"[role='dialog']",
".modal",
".modal-dialog",
"[data-testid='modal']",
"[class*='modal']",
"[class*='overlay']",
"[class*='dialog']",
"[class*='popup']"
]
for selector in modal_selectors:
try:
modal_element = await self.page.query_selector(selector)
if modal_element:
logger.info(f"检测到模态窗口,选择器: {selector}")
# 尝试关闭模态窗口
close_selectors = [
"[aria-label='Close']",
".close",
".modal-close",
"[data-testid='close']",
"button:has-text('Close')",
"button:has-text('关闭')",
"button:has-text('X')"
]
for close_selector in close_selectors:
try:
close_button = await modal_element.query_selector(close_selector)
if close_button:
await close_button.click()
logger.success(f"已关闭模态窗口,使用选择器: {close_selector}")
await self.page.wait_for_timeout(1000) # 等待关闭动画
break
except Exception:
continue
# 如果模态窗口仍然存在,尝试点击模态窗口外部关闭
try:
await self.page.mouse.click(10, 10) # 点击页面左上角
logger.info("尝试点击页面外部关闭模态窗口")
await self.page.wait_for_timeout(1000)
except Exception:
pass
break
except Exception:
continue
except Exception as e:
logger.warning(f"检查模态窗口时出错: {e}")
# 快速检查页面是否已加载
logger.info("快速检查页面加载状态...")
# 立即尝试获取页面内容,不等待特定元素
try:
title_text = await self.page.title()
logger.info(f"页面标题: {title_text}")
except Exception as e:
logger.warning(f"获取页面标题失败: {e}")
# 快速检查页面是否有内容
try:
body_element = await self.page.query_selector("body")
if body_element:
body_text = await body_element.text_content()
if len(body_text.strip()) > 10:
logger.success("页面内容已加载")
else:
logger.warning("页面内容为空或过短")
except Exception as e:
logger.warning(f"检查页面内容失败: {e}")
# 短暂等待确保DOM稳定
logger.info("等待DOM稳定...")
await self.page.wait_for_timeout(2000) # 等待2秒
# 保存模态窗口截图用于调试
modal_screenshot = "modal_window_debug.png"
await self.page.screenshot(path=modal_screenshot, full_page=True)
logger.info(f"模态窗口调试截图已保存到: {modal_screenshot}")
# 首先检查页面内容,获取页面主要文本
try:
page_content = await self.page.content()
logger.info("页面内容已获取")
# 检查页面是否包含常见的关键词
keywords = ['comment', 'discussion', 'maker', 'creator', 'author', 'statement', 'description']
found_keywords = [kw for kw in keywords if kw in page_content.lower()]
if found_keywords:
logger.info(f"页面包含关键词: {found_keywords}")
else:
logger.warning("页面未检测到常见关键词")
except Exception as e:
logger.error(f"获取页面内容失败: {e}")
# 提取制作人评论内容 - 针对模态窗口的多种选择器策略
logger.info("正在提取制作人评论内容...")
# 策略1尝试多种XPath选择器
xpath_selectors = [
# 新的主要选择器包含prose、prose-format和richText类的div
"//div[contains(@class, 'prose') and contains(@class, 'prose-format') and contains(@class, 'richText')]",
# 备用选择器
'//*[@id="comment-4597755"]/div/div[2]/div/div/div', # 原始选择器
'//div[contains(@class, "comment")]//div[contains(@class, "text")]', # 通用评论选择器
'//div[contains(@class, "modal")]//div[contains(@class, "content")]', # 模态窗口内容
'//div[contains(@class, "dialog")]//div[contains(@class, "body")]', # 对话框内容
'//section//div[contains(@class, "text")]', # section内的文本内容
'//div[contains(@class, "launch")]//div[contains(@class, "description")]', # 发布描述
'//article//div[contains(@class, "content")]', # 文章内容
'//main//div[contains(@class, "text")]', # 主要内容区文本
# 其他备用选择器
"//div[contains(@class, 'styles_commentsContainer')]//div[contains(@class, 'styles_comment')]//div[contains(@class, 'styles_commentBody')]//p",
"//div[contains(@class, 'comment')]//p",
"//div[contains(@class, 'comments')]//p",
]
for i, xpath in enumerate(xpath_selectors, 1):
try:
logger.info(f"尝试选择器 {i}/{len(xpath_selectors)}: {xpath}")
comment_element = await self.page.query_selector(f'xpath={xpath}')
if comment_element:
maker_statement = (await comment_element.text_content()).strip()
if maker_statement: # 确保有内容
logger.success(f"使用选择器 {i} 成功提取制作人评论内容: {maker_statement[:200]}...")
# 提取完成后返回原始页面
logger.info("提取完成,正在返回原始产品页面...")
await self.page.goto(original_url, wait_until="domcontentloaded")
logger.success(f"已成功返回原始页面: {original_url}")
return maker_statement
else:
logger.warning(f"选择器 {i} 提取的内容为空")
except Exception as e:
logger.warning(f"选择器 {i} 失败: {e}")
# 策略2如果所有选择器都失败尝试提取页面主要文本内容
logger.info("所有选择器失败,尝试提取页面主要文本内容...")
try:
# 获取页面body文本
body_element = await self.page.query_selector('body')
if body_element:
full_text = (await body_element.text_content()).strip()
# 提取前500个字符作为制作人发言
if len(full_text) > 100:
maker_statement = full_text[:500]
logger.info(f"提取页面主要文本内容: {maker_statement[:200]}...")
# 提取完成后返回原始页面
logger.info("提取完成,正在返回原始产品页面...")
await self.page.goto(original_url, wait_until="domcontentloaded")
logger.success(f"已成功返回原始页面: {original_url}")
return maker_statement
except Exception as e:
logger.error(f"提取页面主要文本内容失败: {e}")
# 策略3如果仍然失败记录页面截图以便调试
logger.warning("所有提取策略都失败,保存截图用于调试...")
try:
screenshot_path = "modal_debug_screenshot.png"
await self.page.screenshot(path=screenshot_path, full_page=True)
logger.info(f"模态窗口截图已保存到: {screenshot_path}")
except Exception as e:
logger.error(f"保存截图失败: {e}")
# 即使未找到元素,也返回原始页面
logger.info("正在返回原始产品页面...")
await self.page.goto(original_url, wait_until="domcontentloaded")
logger.success(f"已成功返回原始页面: {original_url}")
return ""
except Exception as e:
logger.error(f"在当前窗口打开制作人链接失败: {e}")
# 保存当前页面截图用于调试
try:
debug_screenshot = "debug_maker_link_failure.png"
await self.page.screenshot(path=debug_screenshot, full_page=True)
logger.info(f"错误调试截图已保存到: {debug_screenshot}")
except Exception as screenshot_error:
logger.error(f"保存调试截图失败: {screenshot_error}")
# 发生异常时也尝试返回原始页面
try:
logger.info("发生异常,尝试返回原始产品页面...")
await self.page.goto(original_url, wait_until="domcontentloaded")
logger.success(f"已成功返回原始页面: {original_url}")
except Exception as return_error:
logger.error(f"返回原始页面失败: {return_error}")
return ""
async def _extract_maker_statement_direct_open(self, maker_link, maker_text):
"""备用方法:直接在新窗口中打开链接"""
try:
logger.info("使用备用方法:直接在新窗口中打开链接...")
# 创建新页面
new_page = await self.browser.new_page()
# 导航到制作人页面
await new_page.goto(maker_link, wait_until="domcontentloaded", timeout=30000)
# 等待页面加载
await new_page.wait_for_timeout(15000)
logger.info("页面加载等待完成,开始提取内容...")
# 抓取第一个section的tag
await self.record_dom_selection('section', "备用方法-新窗口第一个section标签")
first_section = await new_page.query_selector('section')
if first_section:
logger.success("找到第一个section标签")
# 在section下面找一个没有任何class的div标签
await self.record_dom_selection('div:not([class])', "备用方法-section下无class的div标签")
div_without_class = await first_section.query_selector('div:not([class])')
if div_without_class:
logger.success("找到无class的div标签")
# 提取div及其子标签的所有文本内容
maker_statement = await div_without_class.inner_text()
result = maker_statement.strip()
logger.info(f"制作人发言(新窗口): {result[:2000]}...")
else:
logger.warning("未找到无class的div标签")
# 回退到提取section的文本内容
section_text = await first_section.inner_text()
result = section_text.strip()
logger.info(f"制作人发言(回退section): {result[:200]}...")
else:
logger.warning("未找到section标签")
# 回退到原始a标签文本
result = maker_text
logger.info(f"制作人发言(回退a标签): {maker_text[:200]}...")
# 添加充分延迟,确保内容完全加载
logger.info("等待内容完全稳定...")
await new_page.wait_for_timeout(3000)
# 关闭新页面
await new_page.close()
logger.info("新窗口已关闭")
return result
except Exception as e:
logger.error(f"备用方法也失败: {e}")
# 如果备用方法也失败回退到原始a标签文本
return maker_text
async def extract_product_info(self):
"""提取产品信息"""
if not self.page:
logger.error("页面未初始化")
return None
try:
product_info = {}
# 提取产品名称XPath: //h1
logger.info("正在提取产品名称...")
try:
await self.record_dom_selection("//h1", "产品名称")
name_element = await self.page.query_selector("xpath=//h1")
if name_element:
product_info["name"] = (await name_element.text_content()).strip()
logger.info(f"产品名称: {product_info['name']}")
else:
logger.warning("未找到XPath为//h1的元素")
except Exception as e:
logger.error(f"提取产品名称失败: {e}")
# 提取产品简介XPath: //*[@class=\"relative text-16 font-normal text-gray-700\"]//div
logger.info("正在提取产品简介...")
try:
await self.record_dom_selection('//*[@class="relative text-16 font-normal text-gray-700"]//div', "产品简介")
intro_element = await self.page.query_selector('xpath=//*[@class="relative text-16 font-normal text-gray-700"]//div')
if intro_element:
product_info["introduction"] = (await intro_element.text_content()).strip()
logger.info(f"产品简介: {product_info['introduction'][:200]}...")
else:
logger.warning("未找到XPath为//*[@class=\"relative text-16 font-normal text-gray-700\"]//div的元素")
except Exception as e:
logger.error(f"提取产品简介失败: {e}")
# 提取用户数XPath: //*[@class=\"flex flex-row gap-2\"]//div/div[2]/span/p
logger.info("正在提取用户数...")
try:
await self.record_dom_selection('//*[@class="flex flex-row gap-2"]//div/div[2]/span/p', "用户数")
user_count_element = await self.page.query_selector('xpath=//*[@class="flex flex-row gap-2"]//div/div[2]/span/p')
if user_count_element:
product_info["user_count"] = (await user_count_element.text_content()).strip()
logger.info(f"用户数: {product_info['user_count']}")
else:
logger.warning("未找到XPath为//*[@class=\"flex flex-row gap-2\"]//div/div[2]/span/p的元素")
except Exception as e:
logger.error(f"提取用户数失败: {e}")
# 提取制作人发言链接XPath: //span[contains(@class, \"absolute\")]的父级a标签
logger.info("正在提取制作人发言链接...")
try:
# 增加显性等待,等待页面元素加载完成
logger.info("等待页面元素加载...")
await self.page.wait_for_timeout(20000) # 等待20秒
# 先找到包含class="absolute"的span元素
await self.record_dom_selection('//span[contains(@class, "absolute")]', "制作人span标签")
span_element = await self.page.query_selector('xpath=//span[contains(@class, "absolute")]')
if span_element:
# 找到span元素的父级a标签
await self.record_dom_selection('//span[contains(@class, "absolute")]/parent::a', "制作人链接")
# 使用更可靠的方法获取父级a标签
a_element = await span_element.evaluate_handle('(element) => element.closest("a")')
# 检查a_element是否为有效的元素句柄
if a_element:
# 提取a标签的文本内容
maker_text = (await a_element.text_content()).strip()
# 提取a标签的href属性超链接
maker_link = await a_element.get_attribute('href')
# 拼凑完整的URL
if maker_link:
if not maker_link.startswith('http'):
# 如果是相对路径拼凑为完整URL
base_url = "https://www.producthunt.com"
if maker_link.startswith('/'):
maker_link = base_url + maker_link
else:
maker_link = base_url + '/' + maker_link
# 验证URL是否有效不能只是根路径
if maker_link == "https://www.producthunt.com/" or maker_link == "https://www.producthunt.com":
logger.warning(f"制作人链接无效,跳过提取: {maker_link}")
product_info["maker_link"] = ""
product_info["maker_statement"] = ""
else:
product_info["maker_link"] = maker_link
logger.info(f"制作人链接: {maker_link}")
# 调用子函数在当前窗口中提取制作人发言
product_info["maker_statement"] = await self.extract_maker_statement_from_current_window(maker_link, maker_text)
else:
logger.warning("未获取到制作人链接")
product_info["maker_link"] = ""
product_info["maker_statement"] = ""
else:
logger.warning("未找到制作人链接的a标签")
else:
logger.warning("未找到XPath为//span[contains(@class, \"absolute\")]的元素")
except Exception as e:
logger.error(f"提取制作人发言链接失败: {e}")
# 保存到临时文件
temp_file_path = "temp_product_info.txt"
with open(temp_file_path, "w", encoding="utf-8") as f:
f.write("=== Product Hunt 产品信息 ===\n\n")
f.write(f"产品名称: {product_info.get('name', '未获取')}\n\n")
f.write(f"产品简介: {product_info.get('introduction', '未获取')}\n\n")
f.write(f"制作人发言: {product_info.get('maker_statement', '未获取')}\n\n")
f.write(f"用户数: {product_info.get('user_count', '未获取')}\n\n")
f.write(f"提取时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
logger.info(f"产品信息已保存到临时文件: {temp_file_path}")
# 截取页面截图
screenshot_path = "product_screenshot.png"
await self.page.screenshot(path=screenshot_path, full_page=True)
logger.info(f"页面截图已保存到: {screenshot_path}")
return product_info
except Exception as e:
logger.error(f"提取产品信息失败: {e}")
return None
async def close(self):
"""关闭连接"""
if self.browser:
await self.browser.close()
logger.info("浏览器连接已关闭")
if hasattr(self, 'playwright') and self.playwright:
await self.playwright.stop()
logger.info("Playwright实例已关闭")
async def main():
"""主函数"""
logger.info("开始ProductHunt数据抓取任务")
# 目标URL
target_url = "https://www.producthunt.com/products/palettebrain"
# 创建抓取器实例
scraper = ProductHuntScraper(debug_port=9222)
try:
# 连接到Chrome
if not await scraper.connect_to_existing_chrome():
logger.error("无法连接到Chrome请确保Chrome已启动并启用远程调试")
return
# 导航到目标页面
if not await scraper.navigate_to_producthunt(target_url):
logger.error("页面访问失败")
return
# 提取产品信息
product_info = await scraper.extract_product_info()
if product_info:
logger.success("产品信息提取完成")
# 保存产品信息到JSON文件
import json
with open("product_info.json", "w", encoding="utf-8") as f:
json.dump(product_info, f, ensure_ascii=False, indent=2)
logger.info("产品信息已保存到 product_info.json")
# 保存点击和DOM选取行为记录
await scraper.save_behavior_records()
logger.info("行为记录已保存到 behavior_records.json")
else:
logger.warning("未能提取到产品信息")
except Exception as e:
logger.error(f"执行过程中发生错误: {e}")
finally:
# 关闭连接
await scraper.close()
logger.info("任务完成")
if __name__ == "__main__":
asyncio.run(main())