#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 使用Playwright连接远程Chrome调试端口访问ProductHunt页面 """ import asyncio from playwright.async_api import async_playwright from loguru import logger import sys from datetime import datetime # 配置日志 logger.remove() logger.add(sys.stderr, level="INFO", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}") class ProductHuntScraper: """ProductHunt数据抓取器""" def __init__(self, debug_port=9222): self.debug_port = debug_port self.browser = None self.page = None async def connect_to_existing_chrome(self): """连接到已运行的Chrome实例""" logger.info(f"正在连接到Chrome远程调试端口 {self.debug_port}") try: # 创建Playwright实例并保持引用 self.playwright = await async_playwright().start() # 连接到已运行的Chrome实例 self.browser = await self.playwright.chromium.connect_over_cdp( f"http://localhost:{self.debug_port}" ) # 获取第一个上下文(通常是默认的) contexts = self.browser.contexts if contexts: context = contexts[0] # 获取第一个页面 pages = context.pages if pages: self.page = pages[0] else: # 如果没有页面,创建新页面 self.page = await context.new_page() else: # 如果没有上下文,创建新上下文 context = await self.browser.new_context() self.page = await context.new_page() logger.success("成功连接到Chrome浏览器") return True except Exception as e: logger.error(f"连接Chrome失败: {e}") return False async def navigate_to_producthunt(self, url): """导航到ProductHunt页面""" if not self.page: logger.error("页面未初始化") return False try: logger.info(f"正在访问: {url}") # 增加页面导航超时时间到300秒 await self.page.goto(url, wait_until="domcontentloaded", timeout=300000) # 等待页面标题包含"Product Hunt",最长等待300秒 logger.info("等待页面标题包含'Product Hunt'...") max_wait_time = 300 # 最大等待时间(秒) wait_interval = 5 # 检查间隔(秒) waited_time = 0 while waited_time < max_wait_time: # 获取页面标题 title = await self.page.title() logger.info(f"当前页面标题: {title}") # 检查标题是否包含"Product Hunt" if "Product Hunt" in title: logger.success(f"页面标题已包含'Product Hunt',等待时间: {waited_time}秒") logger.success("Product Hunt网站已成功打开") return True # 等待一段时间后再次检查 await asyncio.sleep(wait_interval) waited_time += wait_interval logger.info(f"已等待 {waited_time} 秒,继续等待...") # 如果超时仍未找到目标标题 logger.warning(f"等待超时({max_wait_time}秒),页面标题仍未包含'Product Hunt'") logger.info(f"最终页面标题: {await self.page.title()}") # 即使超时,如果页面正常加载也返回True final_title = await self.page.title() if final_title and "Not Found" not in final_title and "Error" not in final_title: logger.success("页面已正常加载,但标题不符合预期") return True else: logger.error("页面加载失败") return False except Exception as e: logger.error(f"访问页面失败: {e}") return False async def extract_maker_statement_from_new_window(self, maker_link, maker_text): """模拟点击链接在新窗口中提取制作人发言内容""" try: logger.info("模拟点击制作人链接...") # 查找包含制作人信息的div容器(class="flex flex-col gap-1") div_container = await self.page.query_selector('div.flex.flex-col.gap-1') if not div_container: logger.warning("未找到class='flex flex-col gap-1'的div容器,使用备用方法") # 备用方法:直接打开新窗口 return await self._extract_maker_statement_direct_open(maker_link, maker_text) # 获取div容器的边界框,用于点击中间位置 bbox = await div_container.bounding_box() if not bbox: logger.warning("无法获取div容器边界框,使用备用方法") return await self._extract_maker_statement_direct_open(maker_link, maker_text) # 计算div容器中前面几个元素的高度总和 # 获取div容器内的所有子元素 child_elements = await div_container.query_selector_all('*') # 计算前面几个元素的高度总和 total_height = 0 element_count = 0 max_elements = 3 # 考虑前面3个元素的高度 for child in child_elements[:max_elements]: child_bbox = await child.bounding_box() if child_bbox: total_height += child_bbox['height'] element_count += 1 logger.debug(f"元素{element_count}高度: {child_bbox['height']:.1f}px") # 如果无法获取子元素高度,使用div容器高度的一半 if total_height == 0: center_y = bbox['y'] + bbox['height'] / 2 logger.info("使用div容器高度的一半作为点击位置") else: # 计算点击位置:div容器的y坐标 + 前面元素高度总和 center_y = bbox['y'] + total_height logger.info(f"使用前面{element_count}个元素高度总和作为点击位置") center_x = bbox['x'] + bbox['width'] / 2 logger.info(f"点击位置: ({center_x:.1f}, {center_y:.1f})") # 监听新窗口打开事件 async with self.page.context.expect_page() as new_page_info: # 模拟点击计算出的位置 await self.page.mouse.click(center_x, center_y) # 获取新页面 new_page = await new_page_info.value # 等待新页面加载完成 await new_page.wait_for_load_state("domcontentloaded") await new_page.wait_for_timeout(5000) # 额外等待2秒确保内容加载 logger.success("新窗口已加载完成") # 抓取第一个section的tag first_section = await new_page.query_selector('section') if first_section: logger.success("找到第一个section标签") # 在section下面找一个没有任何class的div标签 div_without_class = await first_section.query_selector('div:not([class])') if div_without_class: logger.success("找到无class的div标签") # 提取div及其子标签的所有文本内容 maker_statement = await div_without_class.inner_text() result = maker_statement.strip() logger.info(f"制作人发言(新窗口): {result[:2000]}...") else: logger.warning("未找到无class的div标签") # 回退到提取section的文本内容 section_text = await first_section.inner_text() result = section_text.strip() logger.info(f"制作人发言(回退section): {result[:200]}...") else: logger.warning("未找到section标签") # 回退到原始a标签文本 result = maker_text logger.info(f"制作人发言(回退a标签): {maker_text[:200]}...") # 关闭新页面 await new_page.close() logger.info("新窗口已关闭") return result except Exception as new_page_error: logger.error(f"模拟点击操作失败: {new_page_error}") # 如果模拟点击失败,使用备用方法 return await self._extract_maker_statement_direct_open(maker_link, maker_text) async def _extract_maker_statement_direct_open(self, maker_link, maker_text): """备用方法:直接在新窗口中打开链接""" try: logger.info("使用备用方法:直接在新窗口中打开链接...") # 创建新页面 new_page = await self.browser.new_page() # 导航到制作人页面 await new_page.goto(maker_link, wait_until="domcontentloaded", timeout=3000000) # 等待页面加载 await new_page.wait_for_timeout(3000) # 抓取第一个section的tag first_section = await new_page.query_selector('section') if first_section: logger.success("找到第一个section标签") # 在section下面找一个没有任何class的div标签 div_without_class = await first_section.query_selector('div:not([class])') if div_without_class: logger.success("找到无class的div标签") # 提取div及其子标签的所有文本内容 maker_statement = await div_without_class.inner_text() result = maker_statement.strip() logger.info(f"制作人发言(新窗口): {result[:2000]}...") else: logger.warning("未找到无class的div标签") # 回退到提取section的文本内容 section_text = await first_section.inner_text() result = section_text.strip() logger.info(f"制作人发言(回退section): {result[:200]}...") else: logger.warning("未找到section标签") # 回退到原始a标签文本 result = maker_text logger.info(f"制作人发言(回退a标签): {maker_text[:200]}...") # 关闭新页面 await new_page.close() logger.info("新窗口已关闭") return result except Exception as e: logger.error(f"备用方法也失败: {e}") # 如果备用方法也失败,回退到原始a标签文本 return maker_text async def extract_product_info(self): """提取产品信息""" if not self.page: logger.error("页面未初始化") return None try: product_info = {} # 提取产品名称(h1标签) name_element = await self.page.query_selector("h1") if name_element: product_info["name"] = (await name_element.text_content()).strip() logger.info(f"产品名称: {product_info['name']}") # 提取产品简介(class为"relative text-16 font-normal text-gray-700"的div) logger.info("正在提取产品简介...") try: intro_div = await self.page.query_selector('div.relative.text-16.font-normal.text-gray-700') if intro_div: product_info["introduction"] = (await intro_div.text_content()).strip() logger.info(f"产品简介: {product_info['introduction'][:200]}...") else: logger.warning("未找到class为'relative text-16 font-normal text-gray-700'的div") except Exception as e: logger.error(f"提取产品简介失败: {e}") # 等待制作人发言动态加载(等待class="flex flex-col gap-2"的section标签出现) logger.info("等待制作人发言动态加载...") try: # 等待section标签出现,最长等待60秒 section_element = await self.page.wait_for_selector( 'section.flex.flex-col.gap-2', timeout=60000 ) if section_element: logger.success("制作人发言区域已加载") # 查找section标签下面的第一个a标签 a_element = await section_element.query_selector('a') if a_element: # 提取a标签的文本内容 maker_text = (await a_element.text_content()).strip() # 提取a标签的href属性(超链接) maker_link = await a_element.get_attribute('href') # 拼凑完整的URL if maker_link and not maker_link.startswith('http'): # 如果是相对路径,拼凑为完整URL base_url = "https://www.producthunt.com" if maker_link.startswith('/'): maker_link = base_url + maker_link else: maker_link = base_url + '/' + maker_link product_info["maker_link"] = maker_link logger.info(f"制作人链接: {maker_link}") # 调用子函数在新窗口中提取制作人发言 product_info["maker_statement"] = await self.extract_maker_statement_from_new_window(maker_link, maker_text) else: logger.warning("在section中未找到a标签") # 如果没有a标签,尝试查找span标签 span_element = await section_element.query_selector('span') if span_element: product_info["maker_statement"] = (await span_element.text_content()).strip() logger.info(f"制作人发言(回退span): {product_info['maker_statement'][:200]}...") else: logger.warning("未找到span标签") else: logger.warning("制作人发言区域未加载") except Exception as e: logger.error(f"等待制作人发言加载失败: {e}") # 提取用户数(class="text-14 font-medium text-gray-700"的p标签) logger.info("正在提取用户数...") try: user_count_element = await self.page.query_selector('p.text-14.font-medium.text-gray-700') if user_count_element: product_info["user_count"] = (await user_count_element.text_content()).strip() logger.info(f"用户数: {product_info['user_count']}") else: logger.warning("未找到class为'text-14 font-medium text-gray-700'的p标签") except Exception as e: logger.error(f"提取用户数失败: {e}") # 保存到临时文件 temp_file_path = "temp_product_info.txt" with open(temp_file_path, "w", encoding="utf-8") as f: f.write("=== Product Hunt 产品信息 ===\n\n") f.write(f"产品名称: {product_info.get('name', '未获取')}\n\n") f.write(f"产品简介: {product_info.get('introduction', '未获取')}\n\n") f.write(f"制作人发言: {product_info.get('maker_statement', '未获取')}\n\n") f.write(f"用户数: {product_info.get('user_count', '未获取')}\n\n") f.write(f"提取时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") logger.info(f"产品信息已保存到临时文件: {temp_file_path}") # 截取页面截图 screenshot_path = "product_screenshot.png" await self.page.screenshot(path=screenshot_path, full_page=True) logger.info(f"页面截图已保存到: {screenshot_path}") return product_info except Exception as e: logger.error(f"提取产品信息失败: {e}") return None async def close(self): """关闭连接""" if self.browser: await self.browser.close() logger.info("浏览器连接已关闭") if hasattr(self, 'playwright') and self.playwright: await self.playwright.stop() logger.info("Playwright实例已关闭") async def main(): """主函数""" logger.info("开始ProductHunt数据抓取任务") # 目标URL target_url = "https://www.producthunt.com/products/notion" # 创建抓取器实例 scraper = ProductHuntScraper(debug_port=9222) try: # 连接到Chrome if not await scraper.connect_to_existing_chrome(): logger.error("无法连接到Chrome,请确保Chrome已启动并启用远程调试") return # 导航到目标页面 if not await scraper.navigate_to_producthunt(target_url): logger.error("页面访问失败") return # 提取产品信息 product_info = await scraper.extract_product_info() if product_info: logger.success("产品信息提取完成") # 保存产品信息到JSON文件 import json with open("product_info.json", "w", encoding="utf-8") as f: json.dump(product_info, f, ensure_ascii=False, indent=2) logger.info("产品信息已保存到 product_info.json") else: logger.warning("未能提取到产品信息") except Exception as e: logger.error(f"执行过程中发生错误: {e}") finally: # 关闭连接 await scraper.close() logger.info("任务完成") if __name__ == "__main__": asyncio.run(main())