import json import asyncio from loguru import logger from playwright.async_api import async_playwright from playwright_stealth.stealth import Stealth class ProductHuntScraper: def __init__(self): self.browser = None self.page = None self.product_url = "https://www.producthunt.com/products/elsie-ai-beta" async def start_browser(self): """启动浏览器""" try: logger.info("正在启动Playwright浏览器...") playwright = await async_playwright().start() # 使用更真实的浏览器配置 self.browser = await playwright.chromium.launch( headless=True, # 设置为True避免显示浏览器窗口 args=[ '--disable-blink-features=AutomationControlled', '--disable-web-security', '--disable-features=VizDisplayCompositor', '--no-sandbox', '--disable-setuid-sandbox', '--disable-infobars', '--disable-dev-shm-usage', '--disable-accelerated-2d-canvas', '--no-first-run', '--no-zygote', '--disable-gpu' ] ) # 创建页面上下文,使用更真实的用户代理 context = await self.browser.new_context( user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36', viewport={'width': 1920, 'height': 1080}, locale='en-US', timezone_id='America/New_York' ) self.page = await context.new_page() # 应用stealth设置,使浏览器看起来更像真实用户 stealth = Stealth() await stealth.apply_stealth_async(self.page) # 设置额外的请求头 await self.page.set_extra_http_headers({ 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache' }) logger.success("浏览器启动成功") return True except Exception as e: logger.error(f"启动浏览器失败: {str(e)}") return False async def wait_for_cloudflare(self, timeout=120000): """等待Cloudflare验证完成""" try: logger.info("等待Cloudflare验证完成...") # 等待页面标题不再是"Just a moment..."或者验证成功元素出现 await self.page.wait_for_function( """() => { return document.title !== "Just a moment..." && !document.querySelector('.lds-ring') && !document.querySelector('#challenge-error-text'); }""", timeout=timeout ) logger.success("Cloudflare验证完成") return True except Exception as e: logger.error(f"等待Cloudflare验证超时: {str(e)}") return False async def navigate_to_product(self): """导航到产品页面""" try: logger.info(f"正在导航到产品页面: {self.product_url}") # 先访问主页建立会话 logger.info("先访问ProductHunt主页...") await self.page.goto("https://www.producthunt.com", {"waitUntil": "networkidle", "timeout": 60000}) # 等待一下,模拟真实用户行为 await asyncio.sleep(3) # 再访问产品页面 logger.info("访问产品页面...") await self.page.goto(self.product_url, {"waitUntil": "networkidle", "timeout": 60000}) # 等待Cloudflare验证 if not await self.wait_for_cloudflare(): logger.error("Cloudflare验证失败") return False # 等待页面加载完成 await asyncio.sleep(5) logger.success("成功导航到产品页面") return True except Exception as e: logger.error(f"导航到产品页面失败: {str(e)}") return False async def extract_product_info(self): """提取产品信息""" try: logger.info("正在提取产品信息...") # 尝试多种选择器来获取产品名称 name_selectors = [ 'h1[data-test="post-name"]', 'h1[data-test="post-title"]', 'h1[class*="styles_name"]', 'h1', '[data-test="post-name"]', '[data-test="post-title"]', '.styles_name__', '.styles_title__', 'h1[class*="name"]', 'h1[class*="title"]' ] product_name = "未找到产品名称" for selector in name_selectors: try: element = await self.page.wait_for_selector(selector, {"timeout": 5000}) if element: product_name = await element.inner_text() if product_name and product_name.strip(): logger.info(f"使用选择器 {selector} 找到产品名称: {product_name}") break except: continue # 尝试多种选择器来获取产品简介 description_selectors = [ '[data-test="post-description"]', '[data-test="post-tagline"]', '.styles_tagline__', '.styles_description__', 'div[class*="tagline"]', 'div[class*="description"]', 'p[class*="tagline"]', 'p[class*="description"]' ] product_description = "未找到产品简介" for selector in description_selectors: try: element = await self.page.wait_for_selector(selector, {"timeout": 5000}) if element: product_description = await element.inner_text() if product_description and product_description.strip(): logger.info(f"使用选择器 {selector} 找到产品简介: {product_description}") break except: continue # 尝试获取评论 comments_selectors = [ '[data-test="comment-item"]', '.styles_comment__', 'div[class*="comment"]', 'article[class*="comment"]' ] comments = [] for selector in comments_selectors: try: elements = await self.page.query_selector_all(selector) if elements: for element in elements[:5]: # 只获取前5条评论 try: comment_text = await element.inner_text() if comment_text and comment_text.strip(): comments.append(comment_text.strip()) except: continue if comments: logger.info(f"使用选择器 {selector} 找到 {len(comments)} 条评论") break except: continue if not comments: comments = ["未找到评论"] # 尝试获取标签 tags_selectors = [ '[data-test="post-topic"]', '.styles_topic__', 'a[class*="topic"]', 'span[class*="topic"]' ] tags = [] for selector in tags_selectors: try: elements = await self.page.query_selector_all(selector) if elements: for element in elements: try: tag_text = await element.inner_text() if tag_text and tag_text.strip(): tags.append(tag_text.strip()) except: continue if tags: logger.info(f"使用选择器 {selector} 找到 {len(tags)} 个标签") break except: continue if not tags: tags = ["未找到标签"] # 尝试获取点赞数和评论数 upvotes = "未找到" comments_count = "未找到" try: upvotes_element = await self.page.query_selector('[data-test="vote-button"]') if upvotes_element: upvotes_text = await upvotes_element.inner_text() if upvotes_text and upvotes_text.strip(): upvotes = upvotes_text.strip() except: pass try: comments_count_element = await self.page.query_selector('[data-test="comment-count"]') if comments_count_element: comments_count_text = await comments_count_element.inner_text() if comments_count_text and comments_count_text.strip(): comments_count = comments_count_text.strip() except: pass # 尝试获取产品图片 image_url = "未找到图片" try: image_element = await self.page.query_selector('img[data-test="product-image"]') if image_element: image_url = await image_element.get_attribute('src') if not image_url: image_url = await image_element.get_attribute('data-src') except: pass product_info = { "name": product_name, "description": product_description, "tags": tags, "upvotes": upvotes, "comments_count": comments_count, "comments": comments, "image_url": image_url, "url": self.product_url } logger.success("产品信息提取完成") return product_info except Exception as e: logger.error(f"提取产品信息失败: {str(e)}") return None async def save_data(self, data): """保存数据到JSON文件""" try: with open('product_info_stealth.json', 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.success("数据已保存到 product_info_stealth.json") return True except Exception as e: logger.error(f"保存数据失败: {str(e)}") return False async def take_screenshot(self): """保存页面截图""" try: await self.page.screenshot(path='product_screenshot_stealth.png', full_page=True) logger.success("页面截图已保存到 product_screenshot_stealth.png") return True except Exception as e: logger.error(f"保存截图失败: {str(e)}") return False async def save_html(self): """保存页面HTML内容""" try: html_content = await self.page.content() with open('product_page_stealth.html', 'w', encoding='utf-8') as f: f.write(html_content) logger.success("页面HTML已保存到 product_page_stealth.html") return True except Exception as e: logger.error(f"保存HTML失败: {str(e)}") return False async def close_browser(self): """关闭浏览器""" if self.browser: await self.browser.close() logger.info("浏览器已关闭") async def scrape(self): """执行完整的抓取流程""" try: if not await self.start_browser(): return False if not await self.navigate_to_product(): return False # 保存HTML和截图用于调试 await self.save_html() await self.take_screenshot() product_info = await self.extract_product_info() if product_info: await self.save_data(product_info) logger.info(f"抓取完成: {product_info['name']}") return True else: logger.error("未能提取产品信息") return False except Exception as e: logger.error(f"抓取过程中发生错误: {str(e)}") return False finally: await self.close_browser() async def main(): """主函数""" logger.info("开始ProductHunt产品信息抓取(使用Stealth模式)") scraper = ProductHuntScraper() success = await scraper.scrape() if success: logger.success("抓取成功完成") else: logger.error("抓取失败") if __name__ == "__main__": asyncio.run(main())