Compare commits

...

2 Commits

Author SHA1 Message Date
74dfa978cf 更新playwright控制chrome远程端口 2025-11-17 22:10:40 +08:00
e851d0d5fb 增加对producthunt的中间结果 2025-11-17 07:40:50 +08:00
28 changed files with 60875 additions and 64670 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,104 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
调试脚本,详细诊断 new_data_stealth.py 的问题
"""
import asyncio
import sys
from loguru import logger
# 配置日志,确保输出到控制台
logger.remove()
logger.add(sys.stderr, level="INFO", format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>")
async def debug_stealth():
"""调试 stealth 功能"""
logger.info("=== 开始调试 new_data_stealth.py ===")
try:
# 测试导入
logger.info("1. 测试导入模块...")
from playwright.async_api import async_playwright
from playwright_stealth.stealth import Stealth
from product.new_data_stealth import ProductHuntScraper
logger.success("✅ 所有模块导入成功")
# 测试类实例化
logger.info("2. 测试类实例化...")
scraper = ProductHuntScraper()
logger.success("✅ ProductHuntScraper 实例化成功")
# 测试浏览器启动
logger.info("3. 测试浏览器启动...")
browser_started = await scraper.start_browser()
if browser_started:
logger.success("✅ 浏览器启动成功")
# 测试页面导航
logger.info("4. 测试页面导航...")
try:
# 测试访问简单页面
await scraper.page.goto("https://httpbin.org/user-agent", {"waitUntil": "networkidle", "timeout": 30000})
# 获取页面内容
content = await scraper.page.content()
if "user-agent" in content.lower():
logger.success("✅ 页面导航成功")
else:
logger.warning("⚠️ 页面内容异常")
except Exception as e:
logger.error(f"❌ 页面导航失败: {e}")
# 测试截图功能
logger.info("5. 测试截图功能...")
try:
await scraper.take_screenshot()
logger.success("✅ 截图功能正常")
except Exception as e:
logger.error(f"❌ 截图功能失败: {e}")
# 测试HTML保存功能
logger.info("6. 测试HTML保存功能...")
try:
await scraper.save_html()
logger.success("✅ HTML保存功能正常")
except Exception as e:
logger.error(f"❌ HTML保存功能失败: {e}")
# 关闭浏览器
logger.info("7. 关闭浏览器...")
await scraper.close_browser()
logger.success("✅ 浏览器关闭成功")
logger.success("🎉 所有调试测试通过!")
return True
else:
logger.error("❌ 浏览器启动失败")
return False
except ImportError as e:
logger.error(f"❌ 导入失败: {e}")
return False
except Exception as e:
logger.error(f"❌ 调试过程中发生错误: {e}")
return False
async def main():
"""主函数"""
logger.info("开始调试过程...")
success = await debug_stealth()
if success:
logger.info("\n🎉 调试完成!脚本修复成功")
logger.info("现在可以正常运行: python product/new_data_stealth.py")
else:
logger.error("\n💥 调试发现存在问题")
logger.info("请检查错误信息并进一步调试")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,238 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Product Hunt网站数据抓取脚本
使用Selenium连接到现有的Chrome实例抓取Product Hunt产品信息
"""
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
import time
import json
import re
from datetime import datetime
from loguru import logger
# 配置日志
logger.add("producthunt_scraper.log", rotation="10 MB", level="INFO")
class ProductHuntScraper:
"""Product Hunt网站数据抓取器"""
def __init__(self, debug_address="127.0.0.1:5003"):
"""
初始化抓取器连接到现有的Chrome实例
Args:
debug_address (str): Chrome调试地址默认为"127.0.0.1:5003"
"""
self.debug_address = debug_address
self.driver = None
self.wait = None
def connect_to_chrome(self):
"""
连接到现有的Chrome实例
Returns:
bool: 连接是否成功
"""
try:
logger.info(f"尝试连接到Chrome调试实例: {self.debug_address}")
options = webdriver.ChromeOptions()
options.add_experimental_option("debuggerAddress", self.debug_address)
# 使用webdriver-manager自动管理Chrome驱动程序
service = Service(ChromeDriverManager().install())
self.driver = webdriver.Chrome(service=service, options=options)
self.wait = WebDriverWait(self.driver, 10)
logger.info(f"成功连接到Chrome实例当前页面标题: {self.driver.title}")
return True
except Exception as e:
logger.error(f"连接Chrome实例失败: {e}")
return False
def navigate_to_product(self, product_url):
"""
导航到指定的Product Hunt产品页面
Args:
product_url (str): 产品页面URL
Returns:
bool: 导航是否成功
"""
try:
logger.info(f"正在导航到产品页面: {product_url}")
self.driver.get(product_url)
# 等待页面加载完成
self.wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
time.sleep(3) # 额外等待,确保动态内容加载
logger.info(f"成功导航到产品页面,当前标题: {self.driver.title}")
return True
except TimeoutException:
logger.error("页面加载超时")
return False
except Exception as e:
logger.error(f"导航到产品页面失败: {e}")
return False
def extract_product_info(self):
"""
从当前页面提取产品信息
Returns:
dict: 包含产品信息的字典
"""
product_info = {}
try:
# 提取产品名称 - h1标签下的字符串
try:
h1_element = self.driver.find_element(By.TAG_NAME, "h1")
product_name = h1_element.text.strip()
logger.info(f"找到产品名称: {product_name}")
product_info['name'] = product_name or "未找到产品名称"
except NoSuchElementException:
logger.error("未找到h1标签")
product_info['name'] = "未找到产品名称"
except Exception as e:
logger.error(f"提取产品名称失败: {e}")
product_info['name'] = "提取失败"
# 提取产品简介 - div的class="relative text-16 font-normal text-gray-700"的字符串
try:
description_element = self.driver.find_element(By.CSS_SELECTOR, "div.relative.text-16.font-normal.text-gray-700")
product_description = description_element.text.strip()
logger.info(f"找到产品简介: {product_description[:50]}...")
product_info['description'] = product_description or "未找到产品简介"
except NoSuchElementException:
logger.error("未找到产品简介div")
product_info['description'] = "未找到产品简介"
except Exception as e:
logger.error(f"提取产品简介失败: {e}")
product_info['description'] = "提取失败"
# 提取第一个评论 - div的class是flex flex-1 flex-col gap-2提取字符串的所有内容
try:
comment_elements = self.driver.find_elements(By.CSS_SELECTOR, "div.flex.flex-1.flex-col.gap-2")
if comment_elements:
first_comment = comment_elements[0].text.strip()
logger.info(f"找到第一个评论: {first_comment[:50]}...")
product_info['first_comment'] = first_comment
else:
logger.warning("未找到任何评论")
product_info['first_comment'] = "未找到评论"
except Exception as e:
logger.error(f"提取第一个评论失败: {e}")
product_info['first_comment'] = "提取失败"
# 添加当前URL和抓取时间
product_info['url'] = self.driver.current_url
product_info['scraped_at'] = datetime.now().isoformat()
return product_info
except Exception as e:
logger.error(f"提取产品信息时出错: {e}")
return {
'name': "提取失败",
'description': "提取失败",
'first_comment': "提取失败",
'url': self.driver.current_url if self.driver else "未知",
'scraped_at': datetime.now().isoformat(),
'error': str(e)
}
def scrape_product(self, product_url):
"""
抓取指定URL的产品信息
Args:
product_url (str): 产品页面URL
Returns:
dict: 产品信息字典
"""
if not self.driver:
if not self.connect_to_chrome():
logger.error("无法连接到Chrome实例")
return None
if not self.navigate_to_product(product_url):
logger.error("无法导航到产品页面")
return None
return self.extract_product_info()
def save_to_file(self, product_info, filename=None):
"""
将产品信息保存到JSON文件
Args:
product_info (dict): 产品信息
filename (str, optional): 文件名。如果未提供,将自动生成
Returns:
str: 保存的文件名
"""
if not filename:
now = datetime.now()
filename = f"product_{now.strftime('%Y%m%d_%H%M%S')}.json"
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(product_info, f, ensure_ascii=False, indent=2)
logger.info(f"产品信息已保存到 {filename}")
return filename
except Exception as e:
logger.error(f"保存文件失败: {e}")
raise
def close(self):
"""
关闭连接
"""
if self.driver:
self.driver.quit()
logger.info("已关闭Chrome连接")
if __name__ == "__main__":
# 示例用法
scraper = ProductHuntScraper()
try:
# 要抓取的产品URL
product_url = "https://www.producthunt.com/products/elsie-ai-beta"
# 抓取产品信息
product_info = scraper.scrape_product(product_url)
if product_info:
# 打印产品信息
logger.info("抓取到的产品信息:")
logger.info(json.dumps(product_info, ensure_ascii=False, indent=2))
# 保存到文件
filename = scraper.save_to_file(product_info)
logger.info(f"产品信息已保存到: {filename}")
else:
logger.error("未能获取产品信息")
except Exception as e:
logger.error(f"程序执行出错: {e}")
finally:
# 注意这里不关闭连接以便可以继续使用Chrome实例
# 如果需要关闭连接,取消下面的注释
# scraper.close()
pass

View File

@@ -1,212 +0,0 @@
import os
import json
import time
from datetime import datetime
import requests
from bs4 import BeautifulSoup
from loguru import logger
class ProductHuntScraper:
def __init__(self):
self.session = requests.Session()
self.product_url = "https://www.producthunt.com/products/elsie-ai-beta"
# 设置更复杂的请求头,模拟真实浏览器
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Cache-Control': 'max-age=0',
'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
def get_page_content(self):
"""获取页面内容"""
try:
logger.info(f"正在获取页面内容: {self.product_url}")
# 首先访问主页
logger.info("首先访问ProductHunt主页")
main_page = self.session.get("https://www.producthunt.com/", headers=self.headers)
logger.info(f"主页状态码: {main_page.status_code}")
# 等待一下模拟人类行为
time.sleep(2)
# 然后访问产品页面
response = self.session.get(self.product_url, headers=self.headers)
# 检查响应状态码
if response.status_code == 200:
logger.info("成功获取页面内容")
return response.text
else:
logger.error(f"获取页面失败,状态码: {response.status_code}")
logger.info(f"响应头: {response.headers}")
return None
except Exception as e:
logger.error(f"获取页面内容失败: {str(e)}")
return None
def extract_product_info(self, html_content):
"""从HTML内容中提取产品信息"""
try:
logger.info("开始解析HTML内容")
soup = BeautifulSoup(html_content, 'html.parser')
product_info = {
"url": self.product_url,
"scraped_at": datetime.now().isoformat()
}
# 提取产品名称 (h1标签)
try:
name_element = soup.find('h1')
if name_element:
product_info["name"] = name_element.get_text(strip=True)
logger.info(f"产品名称: {product_info['name']}")
else:
logger.warning("未找到产品名称 (h1标签)")
product_info["name"] = "未找到"
except Exception as e:
logger.warning(f"提取产品名称时出错: {str(e)}")
product_info["name"] = "未找到"
# 提取产品简介 - 尝试多种可能的CSS选择器
desc_selectors = [
"div.relative.text-16.font-normal.text-gray-700",
".text-16.font-normal.text-gray-700",
"[class*='text-16'][class*='font-normal'][class*='text-gray-700']",
"div[class*='description']",
".product-description",
"div[class*='tagline']",
"div[class*='subtitle']",
"p[class*='text-gray']",
"div[class*='mb-4']"
]
for selector in desc_selectors:
try:
desc_element = soup.select_one(selector)
if desc_element and desc_element.get_text(strip=True):
product_info["description"] = desc_element.get_text(strip=True)
logger.info(f"使用选择器 {selector} 找到产品简介: {product_info['description'][:50]}...")
break
except Exception as e:
logger.debug(f"使用选择器 {selector} 提取产品简介时出错: {str(e)}")
if "description" not in product_info:
logger.warning("未找到产品简介")
product_info["description"] = "未找到"
# 提取第一个评论 - 尝试多种可能的CSS选择器
comment_selectors = [
"div.flex.flex-1.flex-col.gap-2",
".flex.flex-1.flex-col.gap-2",
"[class*='flex'][class*='flex-1'][class*='flex-col'][class*='gap-2']",
"div[class*='comment']",
".comment-text",
"div[class*='review']",
"div[class*='feedback']",
"blockquote",
"div[class*='border']"
]
for selector in comment_selectors:
try:
comment_element = soup.select_one(selector)
if comment_element and comment_element.get_text(strip=True):
product_info["first_comment"] = comment_element.get_text(strip=True)
logger.info(f"使用选择器 {selector} 找到第一个评论: {product_info['first_comment'][:50]}...")
break
except Exception as e:
logger.debug(f"使用选择器 {selector} 提取第一个评论时出错: {str(e)}")
if "first_comment" not in product_info:
logger.warning("未找到第一个评论")
product_info["first_comment"] = "未找到"
# 尝试提取其他有用信息
try:
# 尝试获取产品标签
tag_elements = soup.select("[class*='tag'], [class*='category'], [class*='topic']")
if tag_elements:
tags = [tag.get_text(strip=True) for tag in tag_elements if tag.get_text(strip=True)]
product_info["tags"] = tags[:5] # 最多取5个标签
logger.info(f"找到标签: {tags[:3]}")
except Exception as e:
logger.debug(f"提取标签时出错: {str(e)}")
return product_info
except Exception as e:
logger.error(f"解析HTML内容失败: {str(e)}")
return None
def save_to_file(self, data, filename="product_info.json"):
"""保存数据到文件"""
try:
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"数据已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存数据失败: {str(e)}")
return False
def save_html(self, html_content, filename="product_page.html"):
"""保存HTML内容到文件用于调试"""
try:
with open(filename, "w", encoding="utf-8") as f:
f.write(html_content)
logger.info(f"HTML内容已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存HTML内容失败: {str(e)}")
return False
def scrape_product(self):
"""执行完整的抓取流程"""
html_content = self.get_page_content()
if not html_content:
logger.error("无法获取页面内容")
return False
# 保存HTML内容用于调试
self.save_html(html_content)
product_info = self.extract_product_info(html_content)
if product_info:
self.save_to_file(product_info)
return True
else:
logger.error("未能提取产品信息")
return False
def main():
logger.info("开始ProductHunt产品信息抓取")
scraper = ProductHuntScraper()
# 可以修改product_url来抓取其他产品
# scraper.product_url = "https://www.producthunt.com/products/your-product"
success = scraper.scrape_product()
if success:
logger.info("产品信息抓取完成")
else:
logger.error("产品信息抓取失败")
if __name__ == "__main__":
main()

View File

@@ -1,232 +0,0 @@
import os
import json
import time
from datetime import datetime
from playwright.sync_api import sync_playwright
from loguru import logger
class ProductHuntScraper:
def __init__(self):
self.browser = None
self.context = None
self.page = None
self.product_url = "https://www.producthunt.com/products/elsie-ai-beta"
def connect_to_browser(self):
"""连接到浏览器"""
try:
logger.info("正在初始化Playwright浏览器...")
# 启动Playwright
self.playwright = sync_playwright().start()
# 启动Chromium浏览器
self.browser = self.playwright.chromium.launch(
headless=False, # 设置为False以便观察浏览器行为
args=[
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--window-size=1920,1080"
]
)
# 创建浏览器上下文
self.context = self.browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
viewport={"width": 1920, "height": 1080}
)
# 创建新页面
self.page = self.context.new_page()
logger.info("成功连接到Playwright浏览器")
return True
except Exception as e:
logger.error(f"连接浏览器失败: {str(e)}")
return False
def navigate_to_product(self):
"""导航到产品页面"""
try:
logger.info(f"正在导航到产品页面: {self.product_url}")
# 首先访问主页
logger.info("首先访问ProductHunt主页")
self.page.goto("https://www.producthunt.com/", wait_until="domcontentloaded")
time.sleep(3) # 等待页面加载
# 然后访问产品页面
self.page.goto(self.product_url, wait_until="domcontentloaded")
# 等待页面加载
logger.info("等待页面加载...")
time.sleep(10) # 等待动态内容加载
logger.info("页面加载完成")
return True
except Exception as e:
logger.error(f"导航到产品页面失败: {str(e)}")
return False
def extract_product_info(self):
"""提取产品信息"""
try:
logger.info("开始提取产品信息")
product_info = {
"url": self.product_url,
"scraped_at": datetime.now().isoformat()
}
# 提取产品名称 (h1标签)
try:
name_element = self.page.query_selector("h1")
if name_element:
product_info["name"] = name_element.text_content().strip()
logger.info(f"产品名称: {product_info['name']}")
else:
logger.warning("未找到产品名称 (h1标签)")
product_info["name"] = "未找到"
except Exception as e:
logger.warning(f"提取产品名称时出错: {str(e)}")
product_info["name"] = "未找到"
# 提取产品简介 (class为"relative text-16 font-normal text-gray-700"的div)
try:
desc_selector = "div.relative.text-16.font-normal.text-gray-700"
desc_element = self.page.query_selector(desc_selector)
if desc_element:
product_info["description"] = desc_element.text_content().strip()
logger.info(f"产品简介: {product_info['description'][:50]}...")
else:
logger.warning(f"未找到产品简介 ({desc_selector})")
product_info["description"] = "未找到"
except Exception as e:
logger.warning(f"提取产品简介时出错: {str(e)}")
product_info["description"] = "未找到"
# 提取第一个评论 (class为"flex flex-1 flex-col gap-2"的div)
try:
comment_selector = "div.flex.flex-1.flex-col.gap-2"
comment_element = self.page.query_selector(comment_selector)
if comment_element:
product_info["first_comment"] = comment_element.text_content().strip()
logger.info(f"第一个评论: {product_info['first_comment'][:50]}...")
else:
logger.warning(f"未找到第一个评论 ({comment_selector})")
product_info["first_comment"] = "未找到"
except Exception as e:
logger.warning(f"提取第一个评论时出错: {str(e)}")
product_info["first_comment"] = "未找到"
# 尝试提取其他有用信息
try:
# 尝试获取产品标签
tag_elements = self.page.query_selector_all("[class*='tag'], [class*='category'], [class*='topic']")
if tag_elements:
tags = [tag.text_content().strip() for tag in tag_elements if tag.text_content().strip()]
product_info["tags"] = tags[:5] # 最多取5个标签
logger.info(f"找到标签: {tags[:3]}")
except Exception as e:
logger.debug(f"提取标签时出错: {str(e)}")
# 尝试获取点赞数
try:
like_elements = self.page.query_selector_all("[class*='vote'], [class*='like'], [class*='upvote']")
if like_elements:
product_info["likes"] = like_elements[0].text_content().strip()
logger.info(f"点赞数: {product_info['likes']}")
except Exception as e:
logger.debug(f"提取点赞数时出错: {str(e)}")
# 尝试获取评论数
try:
comment_count_elements = self.page.query_selector_all("[class*='comment-count'], [class*='comments']")
if comment_count_elements:
product_info["comment_count"] = comment_count_elements[0].text_content().strip()
logger.info(f"评论数: {product_info['comment_count']}")
except Exception as e:
logger.debug(f"提取评论数时出错: {str(e)}")
return product_info
except Exception as e:
logger.error(f"提取产品信息失败: {str(e)}")
return None
def save_to_file(self, data, filename="product_info.json"):
"""保存数据到文件"""
try:
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"数据已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存数据失败: {str(e)}")
return False
def save_screenshot(self, filename="product_screenshot.png"):
"""保存页面截图,用于调试"""
try:
self.page.screenshot(path=filename)
logger.info(f"页面截图已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存页面截图失败: {str(e)}")
return False
def close(self):
"""关闭浏览器"""
try:
if self.context:
self.context.close()
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
logger.info("浏览器已关闭")
except Exception as e:
logger.error(f"关闭浏览器时出错: {str(e)}")
def scrape_product(self):
"""执行完整的抓取流程"""
if not self.connect_to_browser():
logger.error("无法连接到浏览器")
return False
try:
if not self.navigate_to_product():
logger.error("无法导航到产品页面")
return False
# 保存截图用于调试
self.save_screenshot()
product_info = self.extract_product_info()
if product_info:
self.save_to_file(product_info)
return True
else:
logger.error("未能提取产品信息")
return False
finally:
self.close()
def main():
logger.info("开始ProductHunt产品信息抓取")
scraper = ProductHuntScraper()
# 可以修改product_url来抓取其他产品
# scraper.product_url = "https://www.producthunt.com/products/your-product"
success = scraper.scrape_product()
if success:
logger.info("产品信息抓取完成")
else:
logger.error("产品信息抓取失败")
if __name__ == "__main__":
main()

View File

@@ -1,418 +0,0 @@
import os
import json
import time
from datetime import datetime
from playwright.sync_api import sync_playwright
from loguru import logger
class ProductHuntScraper:
def __init__(self):
self.browser = None
self.context = None
self.page = None
self.product_url = "https://www.producthunt.com/products/elsie-ai-beta"
def connect_to_browser(self):
"""连接到浏览器"""
try:
logger.info("正在初始化Playwright浏览器...")
# 启动Playwright
self.playwright = sync_playwright().start()
# 启动Chromium浏览器
self.browser = self.playwright.chromium.launch(
headless=False, # 设置为False以便观察浏览器行为
args=[
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--window-size=1920,1080",
"--disable-blink-features=AutomationControlled",
"--disable-web-security",
"--disable-features=VizDisplayCompositor"
]
)
# 创建浏览器上下文
self.context = self.browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
viewport={"width": 1920, "height": 1080},
ignore_https_errors=True,
extra_http_headers={
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"Cache-Control": "no-cache",
"Pragma": "no-cache"
}
)
# 创建新页面
self.page = self.context.new_page()
# 添加额外的初始化脚本,防止被检测为自动化工具
self.page.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// 覆盖permissions API
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
""")
logger.info("成功连接到Playwright浏览器")
return True
except Exception as e:
logger.error(f"连接浏览器失败: {str(e)}")
return False
def wait_for_cloudflare(self, timeout=120):
"""等待Cloudflare验证完成"""
logger.info("等待Cloudflare验证完成...")
start_time = time.time()
while time.time() - start_time < timeout:
try:
# 检查是否还在Cloudflare验证页面
title = self.page.title()
logger.info(f"当前页面标题: {title}")
# 如果标题包含"Product Hunt"或产品名称,说明验证成功
if "Product Hunt" in title or "elsie" in title.lower():
logger.info("Cloudflare验证成功")
return True
# 检查是否有Cloudflare验证元素
cloudflare_element = self.page.query_selector("#challenge-form")
if cloudflare_element:
logger.info("检测到Cloudflare验证页面等待验证...")
time.sleep(5)
continue
# 检查是否有验证成功的元素
success_element = self.page.query_selector("#challenge-success-text")
if success_element:
logger.info("Cloudflare验证成功等待页面跳转...")
time.sleep(5)
continue
# 检查是否已经跳转到产品页面
current_url = self.page.url
if "products/elsie-ai-beta" in current_url and "challenge" not in current_url:
logger.info("已成功跳转到产品页面")
return True
time.sleep(2)
except Exception as e:
logger.debug(f"等待Cloudflare验证时出错: {str(e)}")
time.sleep(2)
logger.warning(f"等待Cloudflare验证超时 ({timeout}秒)")
return False
def navigate_to_product(self):
"""导航到产品页面"""
try:
logger.info(f"正在导航到产品页面: {self.product_url}")
# 直接访问产品页面
logger.info("直接访问产品页面")
self.page.goto(self.product_url, wait_until="domcontentloaded", timeout=60000)
# 等待Cloudflare验证完成
if not self.wait_for_cloudflare():
logger.error("Cloudflare验证失败或超时")
return False
# 等待页面加载
logger.info("等待页面内容加载...")
time.sleep(10) # 等待动态内容加载
# 检查页面URL和标题
current_url = self.page.url
page_title = self.page.title()
logger.info(f"当前页面URL: {current_url}")
logger.info(f"页面标题: {page_title}")
# 尝试等待特定元素加载
try:
logger.info("等待页面内容加载...")
# 等待可能的加载指示器消失
self.page.wait_for_selector("body", timeout=30000)
# 尝试等待一些可能存在的元素
possible_selectors = [
"h1",
"[data-test='product-name']",
".product-name",
"div[class*='product']",
"div[class*='styles_']"
]
for selector in possible_selectors:
try:
self.page.wait_for_selector(selector, timeout=5000)
logger.info(f"找到元素: {selector}")
break
except:
continue
except Exception as e:
logger.warning(f"等待页面元素时出错: {str(e)}")
logger.info("页面加载完成")
return True
except Exception as e:
logger.error(f"导航到产品页面失败: {str(e)}")
return False
def extract_product_info(self):
"""提取产品信息"""
try:
logger.info("开始提取产品信息")
product_info = {
"url": self.page.url,
"scraped_at": datetime.now().isoformat()
}
# 提取产品名称 - 尝试多种选择器
name_selectors = [
"h1",
"[data-test='product-name']",
".product-name",
"[class*='product'][class*='name']",
".styles_productName__",
"[class*='heading'][class*='xl']",
"div[class*='text-2xl']",
"div[class*='text-3xl']",
"div[class*='text-4xl']",
"div[class*='text-5xl']",
"div[class*='text-6xl']",
"div[class*='font-bold']",
"div[class*='font-semibold']"
]
for selector in name_selectors:
try:
name_element = self.page.query_selector(selector)
if name_element:
name_text = name_element.text_content().strip()
if name_text and name_text != "www.producthunt.com" and len(name_text) > 2:
product_info["name"] = name_text
logger.info(f"使用选择器 {selector} 找到产品名称: {product_info['name']}")
break
except Exception as e:
logger.debug(f"使用选择器 {selector} 提取产品名称时出错: {str(e)}")
if "name" not in product_info:
logger.warning("未找到产品名称")
product_info["name"] = "未找到"
# 提取产品简介 - 尝试多种选择器
desc_selectors = [
"div.relative.text-16.font-normal.text-gray-700",
".text-16.font-normal.text-gray-700",
"[class*='text-16'][class*='font-normal'][class*='text-gray-700']",
"div[class*='description']",
".product-description",
"div[class*='tagline']",
"[data-test='product-tagline']",
".styles_tagline__",
"p[class*='text-gray']",
"div[class*='mb-4']",
"div[class*='text-base']",
"div[class*='text-lg']",
"div[class*='text-gray-600']",
"div[class*='text-gray-700']",
"div[class*='text-gray-800']"
]
for selector in desc_selectors:
try:
desc_element = self.page.query_selector(selector)
if desc_element:
desc_text = desc_element.text_content().strip()
if desc_text and len(desc_text) > 10: # 确保是有意义的描述
product_info["description"] = desc_text
logger.info(f"使用选择器 {selector} 找到产品简介: {product_info['description'][:50]}...")
break
except Exception as e:
logger.debug(f"使用选择器 {selector} 提取产品简介时出错: {str(e)}")
if "description" not in product_info:
logger.warning("未找到产品简介")
product_info["description"] = "未找到"
# 提取第一个评论 - 尝试多种选择器
comment_selectors = [
"div.flex.flex-1.flex-col.gap-2",
".flex.flex-1.flex-col.gap-2",
"[class*='flex'][class*='flex-1'][class*='flex-col'][class*='gap-2']",
"div[class*='comment']",
".comment-text",
"div[class*='review']",
"div[class*='feedback']",
"blockquote",
"div[class*='border']",
"[data-test='comment']",
"div[class*='text-sm']",
"div[class*='text-xs']",
"div[class*='mt-2']",
"div[class*='mb-2']"
]
for selector in comment_selectors:
try:
comment_element = self.page.query_selector(selector)
if comment_element:
comment_text = comment_element.text_content().strip()
if comment_text and len(comment_text) > 10: # 确保是有意义的评论
product_info["first_comment"] = comment_text
logger.info(f"使用选择器 {selector} 找到第一个评论: {product_info['first_comment'][:50]}...")
break
except Exception as e:
logger.debug(f"使用选择器 {selector} 提取第一个评论时出错: {str(e)}")
if "first_comment" not in product_info:
logger.warning("未找到第一个评论")
product_info["first_comment"] = "未找到"
# 尝试提取其他有用信息
try:
# 尝试获取产品标签
tag_elements = self.page.query_selector_all("[class*='tag'], [class*='category'], [class*='topic'], [class*='badge']")
if tag_elements:
tags = [tag.text_content().strip() for tag in tag_elements if tag.text_content().strip()]
product_info["tags"] = tags[:5] # 最多取5个标签
logger.info(f"找到标签: {tags[:3]}")
except Exception as e:
logger.debug(f"提取标签时出错: {str(e)}")
# 尝试获取点赞数
try:
like_elements = self.page.query_selector_all("[class*='vote'], [class*='like'], [class*='upvote'], [data-test='vote-count'], [class*='count']")
if like_elements:
product_info["likes"] = like_elements[0].text_content().strip()
logger.info(f"点赞数: {product_info['likes']}")
except Exception as e:
logger.debug(f"提取点赞数时出错: {str(e)}")
# 尝试获取评论数
try:
comment_count_elements = self.page.query_selector_all("[class*='comment-count'], [class*='comments'], [data-test='comment-count'], [class*='count']")
if comment_count_elements:
product_info["comment_count"] = comment_count_elements[0].text_content().strip()
logger.info(f"评论数: {product_info['comment_count']}")
except Exception as e:
logger.debug(f"提取评论数时出错: {str(e)}")
# 尝试获取产品图片
try:
img_elements = self.page.query_selector_all("img[class*='product'], img[alt*='product'], img[alt*='logo'], img[src*='product']")
if img_elements:
product_info["image_url"] = img_elements[0].get_attribute("src")
logger.info(f"产品图片URL: {product_info['image_url']}")
except Exception as e:
logger.debug(f"提取产品图片时出错: {str(e)}")
return product_info
except Exception as e:
logger.error(f"提取产品信息失败: {str(e)}")
return None
def save_to_file(self, data, filename="product_info.json"):
"""保存数据到文件"""
try:
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"数据已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存数据失败: {str(e)}")
return False
def save_screenshot(self, filename="product_screenshot.png"):
"""保存页面截图,用于调试"""
try:
self.page.screenshot(path=filename, full_page=True)
logger.info(f"页面截图已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存页面截图失败: {str(e)}")
return False
def save_html(self, filename="product_page.html"):
"""保存页面HTML用于调试"""
try:
html_content = self.page.content()
with open(filename, "w", encoding="utf-8") as f:
f.write(html_content)
logger.info(f"页面HTML已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存页面HTML失败: {str(e)}")
return False
def close(self):
"""关闭浏览器"""
try:
if self.context:
self.context.close()
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
logger.info("浏览器已关闭")
except Exception as e:
logger.error(f"关闭浏览器时出错: {str(e)}")
def scrape_product(self):
"""执行完整的抓取流程"""
if not self.connect_to_browser():
logger.error("无法连接到浏览器")
return False
try:
if not self.navigate_to_product():
logger.error("无法导航到产品页面")
return False
# 保存截图和HTML用于调试
self.save_screenshot()
self.save_html()
product_info = self.extract_product_info()
if product_info:
self.save_to_file(product_info)
return True
else:
logger.error("未能提取产品信息")
return False
finally:
self.close()
def main():
logger.info("开始ProductHunt产品信息抓取")
scraper = ProductHuntScraper()
# 可以修改product_url来抓取其他产品
# scraper.product_url = "https://www.producthunt.com/products/your-product"
success = scraper.scrape_product()
if success:
logger.info("产品信息抓取完成")
else:
logger.error("产品信息抓取失败")
if __name__ == "__main__":
main()

View File

@@ -1,369 +0,0 @@
import os
import json
import time
from datetime import datetime
from playwright.sync_api import sync_playwright
from loguru import logger
class ProductHuntScraper:
def __init__(self):
self.browser = None
self.context = None
self.page = None
self.product_url = "https://www.producthunt.com/products/elsie-ai-beta"
def connect_to_browser(self):
"""连接到浏览器"""
try:
logger.info("正在初始化Playwright浏览器...")
# 启动Playwright
self.playwright = sync_playwright().start()
# 启动Chromium浏览器
self.browser = self.playwright.chromium.launch(
headless=False, # 设置为False以便观察浏览器行为
args=[
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--window-size=1920,1080",
"--disable-blink-features=AutomationControlled",
"--disable-web-security",
"--disable-features=VizDisplayCompositor"
]
)
# 创建浏览器上下文
self.context = self.browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
viewport={"width": 1920, "height": 1080},
ignore_https_errors=True,
extra_http_headers={
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"Cache-Control": "no-cache",
"Pragma": "no-cache"
}
)
# 创建新页面
self.page = self.context.new_page()
# 添加额外的初始化脚本,防止被检测为自动化工具
self.page.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// 覆盖permissions API
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
""")
logger.info("成功连接到Playwright浏览器")
return True
except Exception as e:
logger.error(f"连接浏览器失败: {str(e)}")
return False
def navigate_to_product(self):
"""导航到产品页面"""
try:
logger.info(f"正在导航到产品页面: {self.product_url}")
# 直接访问产品页面,跳过主页
logger.info("直接访问产品页面")
self.page.goto(self.product_url, wait_until="domcontentloaded", timeout=60000)
# 等待页面加载
logger.info("等待页面加载...")
time.sleep(10) # 等待动态内容加载
# 检查页面URL和标题
current_url = self.page.url
page_title = self.page.title()
logger.info(f"当前页面URL: {current_url}")
logger.info(f"页面标题: {page_title}")
# 尝试等待特定元素加载
try:
logger.info("等待页面内容加载...")
# 等待可能的加载指示器消失
self.page.wait_for_selector("body", timeout=30000)
# 尝试等待一些可能存在的元素
possible_selectors = [
"h1",
"[data-test='product-name']",
".product-name",
"div[class*='product']",
"div[class*='styles_']"
]
for selector in possible_selectors:
try:
self.page.wait_for_selector(selector, timeout=5000)
logger.info(f"找到元素: {selector}")
break
except:
continue
except Exception as e:
logger.warning(f"等待页面元素时出错: {str(e)}")
logger.info("页面加载完成")
return True
except Exception as e:
logger.error(f"导航到产品页面失败: {str(e)}")
return False
def extract_product_info(self):
"""提取产品信息"""
try:
logger.info("开始提取产品信息")
product_info = {
"url": self.page.url,
"scraped_at": datetime.now().isoformat()
}
# 提取产品名称 - 尝试多种选择器
name_selectors = [
"h1",
"[data-test='product-name']",
".product-name",
"[class*='product'][class*='name']",
".styles_productName__",
"[class*='heading'][class*='xl']",
"div[class*='text-2xl']",
"div[class*='text-3xl']",
"div[class*='text-4xl']",
"div[class*='text-5xl']",
"div[class*='text-6xl']",
"div[class*='font-bold']",
"div[class*='font-semibold']"
]
for selector in name_selectors:
try:
name_element = self.page.query_selector(selector)
if name_element:
name_text = name_element.text_content().strip()
if name_text and name_text != "www.producthunt.com" and len(name_text) > 2:
product_info["name"] = name_text
logger.info(f"使用选择器 {selector} 找到产品名称: {product_info['name']}")
break
except Exception as e:
logger.debug(f"使用选择器 {selector} 提取产品名称时出错: {str(e)}")
if "name" not in product_info:
logger.warning("未找到产品名称")
product_info["name"] = "未找到"
# 提取产品简介 - 尝试多种选择器
desc_selectors = [
"div.relative.text-16.font-normal.text-gray-700",
".text-16.font-normal.text-gray-700",
"[class*='text-16'][class*='font-normal'][class*='text-gray-700']",
"div[class*='description']",
".product-description",
"div[class*='tagline']",
"[data-test='product-tagline']",
".styles_tagline__",
"p[class*='text-gray']",
"div[class*='mb-4']",
"div[class*='text-base']",
"div[class*='text-lg']",
"div[class*='text-gray-600']",
"div[class*='text-gray-700']",
"div[class*='text-gray-800']"
]
for selector in desc_selectors:
try:
desc_element = self.page.query_selector(selector)
if desc_element:
desc_text = desc_element.text_content().strip()
if desc_text and len(desc_text) > 10: # 确保是有意义的描述
product_info["description"] = desc_text
logger.info(f"使用选择器 {selector} 找到产品简介: {product_info['description'][:50]}...")
break
except Exception as e:
logger.debug(f"使用选择器 {selector} 提取产品简介时出错: {str(e)}")
if "description" not in product_info:
logger.warning("未找到产品简介")
product_info["description"] = "未找到"
# 提取第一个评论 - 尝试多种选择器
comment_selectors = [
"div.flex.flex-1.flex-col.gap-2",
".flex.flex-1.flex-col.gap-2",
"[class*='flex'][class*='flex-1'][class*='flex-col'][class*='gap-2']",
"div[class*='comment']",
".comment-text",
"div[class*='review']",
"div[class*='feedback']",
"blockquote",
"div[class*='border']",
"[data-test='comment']",
"div[class*='text-sm']",
"div[class*='text-xs']",
"div[class*='mt-2']",
"div[class*='mb-2']"
]
for selector in comment_selectors:
try:
comment_element = self.page.query_selector(selector)
if comment_element:
comment_text = comment_element.text_content().strip()
if comment_text and len(comment_text) > 10: # 确保是有意义的评论
product_info["first_comment"] = comment_text
logger.info(f"使用选择器 {selector} 找到第一个评论: {product_info['first_comment'][:50]}...")
break
except Exception as e:
logger.debug(f"使用选择器 {selector} 提取第一个评论时出错: {str(e)}")
if "first_comment" not in product_info:
logger.warning("未找到第一个评论")
product_info["first_comment"] = "未找到"
# 尝试提取其他有用信息
try:
# 尝试获取产品标签
tag_elements = self.page.query_selector_all("[class*='tag'], [class*='category'], [class*='topic'], [class*='badge']")
if tag_elements:
tags = [tag.text_content().strip() for tag in tag_elements if tag.text_content().strip()]
product_info["tags"] = tags[:5] # 最多取5个标签
logger.info(f"找到标签: {tags[:3]}")
except Exception as e:
logger.debug(f"提取标签时出错: {str(e)}")
# 尝试获取点赞数
try:
like_elements = self.page.query_selector_all("[class*='vote'], [class*='like'], [class*='upvote'], [data-test='vote-count'], [class*='count']")
if like_elements:
product_info["likes"] = like_elements[0].text_content().strip()
logger.info(f"点赞数: {product_info['likes']}")
except Exception as e:
logger.debug(f"提取点赞数时出错: {str(e)}")
# 尝试获取评论数
try:
comment_count_elements = self.page.query_selector_all("[class*='comment-count'], [class*='comments'], [data-test='comment-count'], [class*='count']")
if comment_count_elements:
product_info["comment_count"] = comment_count_elements[0].text_content().strip()
logger.info(f"评论数: {product_info['comment_count']}")
except Exception as e:
logger.debug(f"提取评论数时出错: {str(e)}")
# 尝试获取产品图片
try:
img_elements = self.page.query_selector_all("img[class*='product'], img[alt*='product'], img[alt*='logo'], img[src*='product']")
if img_elements:
product_info["image_url"] = img_elements[0].get_attribute("src")
logger.info(f"产品图片URL: {product_info['image_url']}")
except Exception as e:
logger.debug(f"提取产品图片时出错: {str(e)}")
return product_info
except Exception as e:
logger.error(f"提取产品信息失败: {str(e)}")
return None
def save_to_file(self, data, filename="product_info.json"):
"""保存数据到文件"""
try:
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"数据已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存数据失败: {str(e)}")
return False
def save_screenshot(self, filename="product_screenshot.png"):
"""保存页面截图,用于调试"""
try:
self.page.screenshot(path=filename, full_page=True)
logger.info(f"页面截图已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存页面截图失败: {str(e)}")
return False
def save_html(self, filename="product_page.html"):
"""保存页面HTML用于调试"""
try:
html_content = self.page.content()
with open(filename, "w", encoding="utf-8") as f:
f.write(html_content)
logger.info(f"页面HTML已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存页面HTML失败: {str(e)}")
return False
def close(self):
"""关闭浏览器"""
try:
if self.context:
self.context.close()
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
logger.info("浏览器已关闭")
except Exception as e:
logger.error(f"关闭浏览器时出错: {str(e)}")
def scrape_product(self):
"""执行完整的抓取流程"""
if not self.connect_to_browser():
logger.error("无法连接到浏览器")
return False
try:
if not self.navigate_to_product():
logger.error("无法导航到产品页面")
return False
# 保存截图和HTML用于调试
self.save_screenshot()
self.save_html()
product_info = self.extract_product_info()
if product_info:
self.save_to_file(product_info)
return True
else:
logger.error("未能提取产品信息")
return False
finally:
self.close()
def main():
logger.info("开始ProductHunt产品信息抓取")
scraper = ProductHuntScraper()
# 可以修改product_url来抓取其他产品
# scraper.product_url = "https://www.producthunt.com/products/your-product"
success = scraper.scrape_product()
if success:
logger.info("产品信息抓取完成")
else:
logger.error("产品信息抓取失败")
if __name__ == "__main__":
main()

View File

@@ -1,323 +0,0 @@
import os
import json
import time
from datetime import datetime
from playwright.sync_api import sync_playwright
from loguru import logger
class ProductHuntScraper:
def __init__(self):
self.browser = None
self.context = None
self.page = None
self.product_url = "https://www.producthunt.com/products/elsie-ai-beta"
def connect_to_browser(self):
"""连接到浏览器"""
try:
logger.info("正在初始化Playwright浏览器...")
# 启动Playwright
self.playwright = sync_playwright().start()
# 启动Chromium浏览器
self.browser = self.playwright.chromium.launch(
headless=False, # 设置为False以便观察浏览器行为
args=[
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--window-size=1920,1080",
"--disable-blink-features=AutomationControlled"
]
)
# 创建浏览器上下文
self.context = self.browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
viewport={"width": 1920, "height": 1080},
ignore_https_errors=True
)
# 创建新页面
self.page = self.context.new_page()
# 添加额外的初始化脚本,防止被检测为自动化工具
self.page.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
""")
logger.info("成功连接到Playwright浏览器")
return True
except Exception as e:
logger.error(f"连接浏览器失败: {str(e)}")
return False
def navigate_to_product(self):
"""导航到产品页面"""
try:
logger.info(f"正在导航到产品页面: {self.product_url}")
# 首先访问主页
logger.info("首先访问ProductHunt主页")
self.page.goto("https://www.producthunt.com/", wait_until="networkidle")
time.sleep(5) # 等待页面加载
# 然后访问产品页面
logger.info("访问产品页面")
self.page.goto(self.product_url, wait_until="networkidle")
# 等待页面加载
logger.info("等待页面加载...")
time.sleep(10) # 等待动态内容加载
# 检查页面URL和标题
current_url = self.page.url
page_title = self.page.title()
logger.info(f"当前页面URL: {current_url}")
logger.info(f"页面标题: {page_title}")
# 如果URL不包含预期的产品路径可能需要处理重定向
if "products/elsie-ai-beta" not in current_url:
logger.warning("页面可能被重定向,尝试直接访问产品页面")
self.page.goto(self.product_url, wait_until="networkidle")
time.sleep(10)
current_url = self.page.url
logger.info(f"重试后当前页面URL: {current_url}")
logger.info("页面加载完成")
return True
except Exception as e:
logger.error(f"导航到产品页面失败: {str(e)}")
return False
def extract_product_info(self):
"""提取产品信息"""
try:
logger.info("开始提取产品信息")
product_info = {
"url": self.page.url,
"scraped_at": datetime.now().isoformat()
}
# 提取产品名称 - 尝试多种选择器
name_selectors = [
"h1",
"[data-test='product-name']",
".product-name",
"[class*='product'][class*='name']",
".styles_productName__",
"[class*='heading'][class*='xl']"
]
for selector in name_selectors:
try:
name_element = self.page.query_selector(selector)
if name_element:
name_text = name_element.text_content().strip()
if name_text and name_text != "www.producthunt.com":
product_info["name"] = name_text
logger.info(f"使用选择器 {selector} 找到产品名称: {product_info['name']}")
break
except Exception as e:
logger.debug(f"使用选择器 {selector} 提取产品名称时出错: {str(e)}")
if "name" not in product_info:
logger.warning("未找到产品名称")
product_info["name"] = "未找到"
# 提取产品简介 - 尝试多种选择器
desc_selectors = [
"div.relative.text-16.font-normal.text-gray-700",
".text-16.font-normal.text-gray-700",
"[class*='text-16'][class*='font-normal'][class*='text-gray-700']",
"div[class*='description']",
".product-description",
"div[class*='tagline']",
"[data-test='product-tagline']",
".styles_tagline__",
"p[class*='text-gray']",
"div[class*='mb-4']"
]
for selector in desc_selectors:
try:
desc_element = self.page.query_selector(selector)
if desc_element:
desc_text = desc_element.text_content().strip()
if desc_text and len(desc_text) > 10: # 确保是有意义的描述
product_info["description"] = desc_text
logger.info(f"使用选择器 {selector} 找到产品简介: {product_info['description'][:50]}...")
break
except Exception as e:
logger.debug(f"使用选择器 {selector} 提取产品简介时出错: {str(e)}")
if "description" not in product_info:
logger.warning("未找到产品简介")
product_info["description"] = "未找到"
# 提取第一个评论 - 尝试多种选择器
comment_selectors = [
"div.flex.flex-1.flex-col.gap-2",
".flex.flex-1.flex-col.gap-2",
"[class*='flex'][class*='flex-1'][class*='flex-col'][class*='gap-2']",
"div[class*='comment']",
".comment-text",
"div[class*='review']",
"div[class*='feedback']",
"blockquote",
"div[class*='border']",
"[data-test='comment']"
]
for selector in comment_selectors:
try:
comment_element = self.page.query_selector(selector)
if comment_element:
comment_text = comment_element.text_content().strip()
if comment_text and len(comment_text) > 10: # 确保是有意义的评论
product_info["first_comment"] = comment_text
logger.info(f"使用选择器 {selector} 找到第一个评论: {product_info['first_comment'][:50]}...")
break
except Exception as e:
logger.debug(f"使用选择器 {selector} 提取第一个评论时出错: {str(e)}")
if "first_comment" not in product_info:
logger.warning("未找到第一个评论")
product_info["first_comment"] = "未找到"
# 尝试提取其他有用信息
try:
# 尝试获取产品标签
tag_elements = self.page.query_selector_all("[class*='tag'], [class*='category'], [class*='topic']")
if tag_elements:
tags = [tag.text_content().strip() for tag in tag_elements if tag.text_content().strip()]
product_info["tags"] = tags[:5] # 最多取5个标签
logger.info(f"找到标签: {tags[:3]}")
except Exception as e:
logger.debug(f"提取标签时出错: {str(e)}")
# 尝试获取点赞数
try:
like_elements = self.page.query_selector_all("[class*='vote'], [class*='like'], [class*='upvote'], [data-test='vote-count']")
if like_elements:
product_info["likes"] = like_elements[0].text_content().strip()
logger.info(f"点赞数: {product_info['likes']}")
except Exception as e:
logger.debug(f"提取点赞数时出错: {str(e)}")
# 尝试获取评论数
try:
comment_count_elements = self.page.query_selector_all("[class*='comment-count'], [class*='comments'], [data-test='comment-count']")
if comment_count_elements:
product_info["comment_count"] = comment_count_elements[0].text_content().strip()
logger.info(f"评论数: {product_info['comment_count']}")
except Exception as e:
logger.debug(f"提取评论数时出错: {str(e)}")
# 尝试获取产品图片
try:
img_elements = self.page.query_selector_all("img[class*='product'], img[alt*='product'], img[alt*='logo']")
if img_elements:
product_info["image_url"] = img_elements[0].get_attribute("src")
logger.info(f"产品图片URL: {product_info['image_url']}")
except Exception as e:
logger.debug(f"提取产品图片时出错: {str(e)}")
return product_info
except Exception as e:
logger.error(f"提取产品信息失败: {str(e)}")
return None
def save_to_file(self, data, filename="product_info.json"):
"""保存数据到文件"""
try:
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"数据已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存数据失败: {str(e)}")
return False
def save_screenshot(self, filename="product_screenshot.png"):
"""保存页面截图,用于调试"""
try:
self.page.screenshot(path=filename, full_page=True)
logger.info(f"页面截图已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存页面截图失败: {str(e)}")
return False
def save_html(self, filename="product_page.html"):
"""保存页面HTML用于调试"""
try:
html_content = self.page.content()
with open(filename, "w", encoding="utf-8") as f:
f.write(html_content)
logger.info(f"页面HTML已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存页面HTML失败: {str(e)}")
return False
def close(self):
"""关闭浏览器"""
try:
if self.context:
self.context.close()
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
logger.info("浏览器已关闭")
except Exception as e:
logger.error(f"关闭浏览器时出错: {str(e)}")
def scrape_product(self):
"""执行完整的抓取流程"""
if not self.connect_to_browser():
logger.error("无法连接到浏览器")
return False
try:
if not self.navigate_to_product():
logger.error("无法导航到产品页面")
return False
# 保存截图和HTML用于调试
self.save_screenshot()
self.save_html()
product_info = self.extract_product_info()
if product_info:
self.save_to_file(product_info)
return True
else:
logger.error("未能提取产品信息")
return False
finally:
self.close()
def main():
logger.info("开始ProductHunt产品信息抓取")
scraper = ProductHuntScraper()
# 可以修改product_url来抓取其他产品
# scraper.product_url = "https://www.producthunt.com/products/your-product"
success = scraper.scrape_product()
if success:
logger.info("产品信息抓取完成")
else:
logger.error("产品信息抓取失败")
if __name__ == "__main__":
main()

View File

@@ -1,176 +0,0 @@
import os
import json
import time
from datetime import datetime
import requests
from bs4 import BeautifulSoup
from loguru import logger
class ProductHuntScraper:
def __init__(self):
self.session = requests.Session()
self.product_url = "https://www.producthunt.com/products/elsie-ai-beta"
# 设置请求头,模拟浏览器访问
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Referer': 'https://www.producthunt.com/',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
def get_page_content(self):
"""获取页面内容"""
try:
logger.info(f"正在获取页面内容: {self.product_url}")
response = self.session.get(self.product_url, headers=self.headers)
# 检查响应状态码
if response.status_code == 200:
logger.info("成功获取页面内容")
return response.text
else:
logger.error(f"获取页面失败,状态码: {response.status_code}")
return None
except Exception as e:
logger.error(f"获取页面内容失败: {str(e)}")
return None
def extract_product_info(self, html_content):
"""从HTML内容中提取产品信息"""
try:
logger.info("开始解析HTML内容")
soup = BeautifulSoup(html_content, 'html.parser')
product_info = {
"url": self.product_url,
"scraped_at": datetime.now().isoformat()
}
# 提取产品名称 (h1标签)
try:
name_element = soup.find('h1')
if name_element:
product_info["name"] = name_element.get_text(strip=True)
logger.info(f"产品名称: {product_info['name']}")
else:
logger.warning("未找到产品名称 (h1标签)")
product_info["name"] = "未找到"
except Exception as e:
logger.warning(f"提取产品名称时出错: {str(e)}")
product_info["name"] = "未找到"
# 提取产品简介 - 尝试多种可能的CSS选择器
desc_selectors = [
"div.relative.text-16.font-normal.text-gray-700",
".text-16.font-normal.text-gray-700",
"[class*='text-16'][class*='font-normal'][class*='text-gray-700']",
"div[class*='description']",
".product-description",
"div[class*='tagline']"
]
for selector in desc_selectors:
try:
desc_element = soup.select_one(selector)
if desc_element:
product_info["description"] = desc_element.get_text(strip=True)
logger.info(f"使用选择器 {selector} 找到产品简介: {product_info['description'][:50]}...")
break
except Exception as e:
logger.debug(f"使用选择器 {selector} 提取产品简介时出错: {str(e)}")
if "description" not in product_info:
logger.warning("未找到产品简介")
product_info["description"] = "未找到"
# 提取第一个评论 - 尝试多种可能的CSS选择器
comment_selectors = [
"div.flex.flex-1.flex-col.gap-2",
".flex.flex-1.flex-col.gap-2",
"[class*='flex'][class*='flex-1'][class*='flex-col'][class*='gap-2']",
"div[class*='comment']",
".comment-text",
"div[class*='review']"
]
for selector in comment_selectors:
try:
comment_element = soup.select_one(selector)
if comment_element:
product_info["first_comment"] = comment_element.get_text(strip=True)
logger.info(f"使用选择器 {selector} 找到第一个评论: {product_info['first_comment'][:50]}...")
break
except Exception as e:
logger.debug(f"使用选择器 {selector} 提取第一个评论时出错: {str(e)}")
if "first_comment" not in product_info:
logger.warning("未找到第一个评论")
product_info["first_comment"] = "未找到"
return product_info
except Exception as e:
logger.error(f"解析HTML内容失败: {str(e)}")
return None
def save_to_file(self, data, filename="product_info.json"):
"""保存数据到文件"""
try:
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"数据已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存数据失败: {str(e)}")
return False
def save_html(self, html_content, filename="product_page.html"):
"""保存HTML内容到文件用于调试"""
try:
with open(filename, "w", encoding="utf-8") as f:
f.write(html_content)
logger.info(f"HTML内容已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存HTML内容失败: {str(e)}")
return False
def scrape_product(self):
"""执行完整的抓取流程"""
html_content = self.get_page_content()
if not html_content:
logger.error("无法获取页面内容")
return False
# 保存HTML内容用于调试
self.save_html(html_content)
product_info = self.extract_product_info(html_content)
if product_info:
self.save_to_file(product_info)
return True
else:
logger.error("未能提取产品信息")
return False
def main():
logger.info("开始ProductHunt产品信息抓取")
scraper = ProductHuntScraper()
# 可以修改product_url来抓取其他产品
# scraper.product_url = "https://www.producthunt.com/products/your-product"
success = scraper.scrape_product()
if success:
logger.info("产品信息抓取完成")
else:
logger.error("产品信息抓取失败")
if __name__ == "__main__":
main()

View File

@@ -1,172 +0,0 @@
import os
import json
import time
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from loguru import logger
class ProductHuntScraper:
def __init__(self):
self.driver = None
self.product_url = "https://www.producthunt.com/products/elsie-ai-beta"
def connect_to_chrome(self):
"""连接到Chrome实例"""
try:
logger.info("正在初始化Chrome驱动...")
# 配置Chrome选项
chrome_options = Options()
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
# 尝试直接使用ChromeDriver
try:
self.driver = webdriver.Chrome(options=chrome_options)
logger.info("成功连接到Chrome实例")
return True
except Exception as e:
logger.error(f"使用ChromeDriver连接失败: {str(e)}")
# 尝试使用系统Chrome
try:
chrome_options.binary_location = "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe"
self.driver = webdriver.Chrome(options=chrome_options)
logger.info("成功连接到系统Chrome实例")
return True
except Exception as e2:
logger.error(f"使用系统Chrome连接失败: {str(e2)}")
return False
except Exception as e:
logger.error(f"连接Chrome实例失败: {str(e)}")
return False
def navigate_to_product(self):
"""导航到产品页面"""
try:
logger.info(f"正在导航到产品页面: {self.product_url}")
self.driver.get(self.product_url)
# 等待页面加载
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
logger.info("页面加载完成")
return True
except TimeoutException:
logger.error("页面加载超时")
return False
except Exception as e:
logger.error(f"导航到产品页面失败: {str(e)}")
return False
def extract_product_info(self):
"""提取产品信息"""
try:
logger.info("开始提取产品信息")
# 等待页面完全加载
time.sleep(5)
product_info = {
"url": self.product_url,
"scraped_at": datetime.now().isoformat()
}
# 提取产品名称 (h1标签)
try:
name_element = self.driver.find_element(By.TAG_NAME, "h1")
product_info["name"] = name_element.text.strip()
logger.info(f"产品名称: {product_info['name']}")
except NoSuchElementException:
logger.warning("未找到产品名称 (h1标签)")
product_info["name"] = "未找到"
# 提取产品简介 (class为"relative text-16 font-normal text-gray-700"的div)
try:
desc_selector = "div.relative.text-16.font-normal.text-gray-700"
desc_element = self.driver.find_element(By.CSS_SELECTOR, desc_selector)
product_info["description"] = desc_element.text.strip()
logger.info(f"产品简介: {product_info['description'][:50]}...")
except NoSuchElementException:
logger.warning("未找到产品简介 (div.relative.text-16.font-normal.text-gray-700)")
product_info["description"] = "未找到"
# 提取第一个评论 (class为"flex flex-1 flex-col gap-2"的div)
try:
comment_selector = "div.flex.flex-1.flex-col.gap-2"
comment_element = self.driver.find_element(By.CSS_SELECTOR, comment_selector)
product_info["first_comment"] = comment_element.text.strip()
logger.info(f"第一个评论: {product_info['first_comment'][:50]}...")
except NoSuchElementException:
logger.warning("未找到第一个评论 (div.flex.flex-1.flex-col.gap-2)")
product_info["first_comment"] = "未找到"
return product_info
except Exception as e:
logger.error(f"提取产品信息失败: {str(e)}")
return None
def save_to_file(self, data, filename="product_info.json"):
"""保存数据到文件"""
try:
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"数据已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存数据失败: {str(e)}")
return False
def close(self):
"""关闭浏览器"""
if self.driver:
self.driver.quit()
logger.info("浏览器已关闭")
def scrape_product(self):
"""执行完整的抓取流程"""
if not self.connect_to_chrome():
logger.error("无法连接到Chrome实例")
return False
try:
if not self.navigate_to_product():
logger.error("无法导航到产品页面")
return False
product_info = self.extract_product_info()
if product_info:
self.save_to_file(product_info)
return True
else:
logger.error("未能提取产品信息")
return False
finally:
self.close()
def main():
logger.info("开始ProductHunt产品信息抓取")
scraper = ProductHuntScraper()
# 可以修改product_url来抓取其他产品
# scraper.product_url = "https://www.producthunt.com/products/your-product"
success = scraper.scrape_product()
if success:
logger.info("产品信息抓取完成")
else:
logger.error("产品信息抓取失败")
if __name__ == "__main__":
main()

View File

@@ -1,252 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Product Hunt网站数据抓取脚本
使用Selenium启动新的Chrome实例抓取Product Hunt产品信息
"""
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
import time
import json
import re
from datetime import datetime
from loguru import logger
# 配置日志
logger.add("producthunt_scraper.log", rotation="10 MB", level="INFO")
class ProductHuntScraper:
"""Product Hunt网站数据抓取器"""
def __init__(self, use_debug_address=False, debug_address="127.0.0.1:5003"):
"""
初始化抓取器
Args:
use_debug_address (bool): 是否使用调试地址连接到现有Chrome实例
debug_address (str): Chrome调试地址默认为"127.0.0.1:5003"
"""
self.use_debug_address = use_debug_address
self.debug_address = debug_address
self.driver = None
self.wait = None
def connect_to_chrome(self):
"""
连接到Chrome实例现有或新建
Returns:
bool: 连接是否成功
"""
try:
options = webdriver.ChromeOptions()
if self.use_debug_address:
# 连接到现有的Chrome实例
logger.info(f"尝试连接到Chrome调试实例: {self.debug_address}")
options.add_experimental_option("debuggerAddress", self.debug_address)
else:
# 启动新的Chrome实例
logger.info("启动新的Chrome实例")
# 添加一些有用的选项
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("--window-size=1920,1080")
# 设置用户代理,模拟真实浏览器
options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
# 使用webdriver-manager自动管理Chrome驱动程序
service = Service(ChromeDriverManager().install())
self.driver = webdriver.Chrome(service=service, options=options)
self.wait = WebDriverWait(self.driver, 10)
logger.info(f"成功连接到Chrome实例当前页面标题: {self.driver.title}")
return True
except Exception as e:
logger.error(f"连接Chrome实例失败: {e}")
return False
def navigate_to_product(self, product_url):
"""
导航到指定的Product Hunt产品页面
Args:
product_url (str): 产品页面URL
Returns:
bool: 导航是否成功
"""
try:
logger.info(f"正在导航到产品页面: {product_url}")
self.driver.get(product_url)
# 等待页面加载完成
self.wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
time.sleep(5) # 额外等待,确保动态内容加载
logger.info(f"成功导航到产品页面,当前标题: {self.driver.title}")
return True
except TimeoutException:
logger.error("页面加载超时")
return False
except Exception as e:
logger.error(f"导航到产品页面失败: {e}")
return False
def extract_product_info(self):
"""
从当前页面提取产品信息
Returns:
dict: 包含产品信息的字典
"""
product_info = {}
try:
# 提取产品名称 - h1标签下的字符串
try:
h1_element = self.driver.find_element(By.TAG_NAME, "h1")
product_name = h1_element.text.strip()
logger.info(f"找到产品名称: {product_name}")
product_info['name'] = product_name or "未找到产品名称"
except NoSuchElementException:
logger.error("未找到h1标签")
product_info['name'] = "未找到产品名称"
except Exception as e:
logger.error(f"提取产品名称失败: {e}")
product_info['name'] = "提取失败"
# 提取产品简介 - div的class="relative text-16 font-normal text-gray-700"的字符串
try:
description_element = self.driver.find_element(By.CSS_SELECTOR, "div.relative.text-16.font-normal.text-gray-700")
product_description = description_element.text.strip()
logger.info(f"找到产品简介: {product_description[:50]}...")
product_info['description'] = product_description or "未找到产品简介"
except NoSuchElementException:
logger.error("未找到产品简介div")
product_info['description'] = "未找到产品简介"
except Exception as e:
logger.error(f"提取产品简介失败: {e}")
product_info['description'] = "提取失败"
# 提取第一个评论 - div的class是flex flex-1 flex-col gap-2提取字符串的所有内容
try:
comment_elements = self.driver.find_elements(By.CSS_SELECTOR, "div.flex.flex-1.flex-col.gap-2")
if comment_elements:
first_comment = comment_elements[0].text.strip()
logger.info(f"找到第一个评论: {first_comment[:50]}...")
product_info['first_comment'] = first_comment
else:
logger.warning("未找到任何评论")
product_info['first_comment'] = "未找到评论"
except Exception as e:
logger.error(f"提取第一个评论失败: {e}")
product_info['first_comment'] = "提取失败"
# 添加当前URL和抓取时间
product_info['url'] = self.driver.current_url
product_info['scraped_at'] = datetime.now().isoformat()
return product_info
except Exception as e:
logger.error(f"提取产品信息时出错: {e}")
return {
'name': "提取失败",
'description': "提取失败",
'first_comment': "提取失败",
'url': self.driver.current_url if self.driver else "未知",
'scraped_at': datetime.now().isoformat(),
'error': str(e)
}
def scrape_product(self, product_url):
"""
抓取指定URL的产品信息
Args:
product_url (str): 产品页面URL
Returns:
dict: 产品信息字典
"""
if not self.driver:
if not self.connect_to_chrome():
logger.error("无法连接到Chrome实例")
return None
if not self.navigate_to_product(product_url):
logger.error("无法导航到产品页面")
return None
return self.extract_product_info()
def save_to_file(self, product_info, filename=None):
"""
将产品信息保存到JSON文件
Args:
product_info (dict): 产品信息
filename (str, optional): 文件名。如果未提供,将自动生成
Returns:
str: 保存的文件名
"""
if not filename:
now = datetime.now()
filename = f"product_{now.strftime('%Y%m%d_%H%M%S')}.json"
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(product_info, f, ensure_ascii=False, indent=2)
logger.info(f"产品信息已保存到 {filename}")
return filename
except Exception as e:
logger.error(f"保存文件失败: {e}")
raise
def close(self):
"""
关闭连接
"""
if self.driver:
self.driver.quit()
logger.info("已关闭Chrome连接")
if __name__ == "__main__":
# 示例用法
# 如果您有现有的Chrome调试实例设置use_debug_address=True
scraper = ProductHuntScraper(use_debug_address=False)
try:
# 要抓取的产品URL
product_url = "https://www.producthunt.com/products/elsie-ai-beta"
# 抓取产品信息
product_info = scraper.scrape_product(product_url)
if product_info:
# 打印产品信息
logger.info("抓取到的产品信息:")
logger.info(json.dumps(product_info, ensure_ascii=False, indent=2))
# 保存到文件
filename = scraper.save_to_file(product_info)
logger.info(f"产品信息已保存到: {filename}")
else:
logger.error("未能获取产品信息")
except Exception as e:
logger.error(f"程序执行出错: {e}")
finally:
# 关闭Chrome实例
scraper.close()

View File

@@ -1,363 +0,0 @@
import json
import asyncio
from loguru import logger
from playwright.async_api import async_playwright
from playwright_stealth.stealth import Stealth
class ProductHuntScraper:
def __init__(self):
self.browser = None
self.page = None
self.product_url = "https://www.producthunt.com/products/elsie-ai-beta"
async def start_browser(self):
"""启动浏览器"""
try:
logger.info("正在启动Playwright浏览器...")
playwright = await async_playwright().start()
# 使用更真实的浏览器配置
self.browser = await playwright.chromium.launch(
headless=True, # 设置为True避免显示浏览器窗口
args=[
'--disable-blink-features=AutomationControlled',
'--disable-web-security',
'--disable-features=VizDisplayCompositor',
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-infobars',
'--disable-dev-shm-usage',
'--disable-accelerated-2d-canvas',
'--no-first-run',
'--no-zygote',
'--disable-gpu'
]
)
# 创建页面上下文,使用更真实的用户代理
context = await self.browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36',
viewport={'width': 1920, 'height': 1080},
locale='en-US',
timezone_id='America/New_York'
)
self.page = await context.new_page()
# 应用stealth设置使浏览器看起来更像真实用户
stealth = Stealth()
await stealth.apply_stealth_async(self.page)
# 设置额外的请求头
await self.page.set_extra_http_headers({
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
'Cache-Control': 'no-cache',
'Pragma': 'no-cache'
})
logger.success("浏览器启动成功")
return True
except Exception as e:
logger.error(f"启动浏览器失败: {str(e)}")
return False
async def wait_for_cloudflare(self, timeout=120000):
"""等待Cloudflare验证完成"""
try:
logger.info("等待Cloudflare验证完成...")
# 等待页面标题不再是"Just a moment..."或者验证成功元素出现
await self.page.wait_for_function(
"""() => {
return document.title !== "Just a moment..." &&
!document.querySelector('.lds-ring') &&
!document.querySelector('#challenge-error-text');
}""",
timeout=timeout
)
logger.success("Cloudflare验证完成")
return True
except Exception as e:
logger.error(f"等待Cloudflare验证超时: {str(e)}")
return False
async def navigate_to_product(self):
"""导航到产品页面"""
try:
logger.info(f"正在导航到产品页面: {self.product_url}")
# 先访问主页建立会话
logger.info("先访问ProductHunt主页...")
await self.page.goto("https://www.producthunt.com", {"waitUntil": "networkidle", "timeout": 60000})
# 等待一下,模拟真实用户行为
await asyncio.sleep(3)
# 再访问产品页面
logger.info("访问产品页面...")
await self.page.goto(self.product_url, {"waitUntil": "networkidle", "timeout": 60000})
# 等待Cloudflare验证
if not await self.wait_for_cloudflare():
logger.error("Cloudflare验证失败")
return False
# 等待页面加载完成
await asyncio.sleep(5)
logger.success("成功导航到产品页面")
return True
except Exception as e:
logger.error(f"导航到产品页面失败: {str(e)}")
return False
async def extract_product_info(self):
"""提取产品信息"""
try:
logger.info("正在提取产品信息...")
# 尝试多种选择器来获取产品名称
name_selectors = [
'h1[data-test="post-name"]',
'h1[data-test="post-title"]',
'h1[class*="styles_name"]',
'h1',
'[data-test="post-name"]',
'[data-test="post-title"]',
'.styles_name__',
'.styles_title__',
'h1[class*="name"]',
'h1[class*="title"]'
]
product_name = "未找到产品名称"
for selector in name_selectors:
try:
element = await self.page.wait_for_selector(selector, {"timeout": 5000})
if element:
product_name = await element.inner_text()
if product_name and product_name.strip():
logger.info(f"使用选择器 {selector} 找到产品名称: {product_name}")
break
except:
continue
# 尝试多种选择器来获取产品简介
description_selectors = [
'[data-test="post-description"]',
'[data-test="post-tagline"]',
'.styles_tagline__',
'.styles_description__',
'div[class*="tagline"]',
'div[class*="description"]',
'p[class*="tagline"]',
'p[class*="description"]'
]
product_description = "未找到产品简介"
for selector in description_selectors:
try:
element = await self.page.wait_for_selector(selector, {"timeout": 5000})
if element:
product_description = await element.inner_text()
if product_description and product_description.strip():
logger.info(f"使用选择器 {selector} 找到产品简介: {product_description}")
break
except:
continue
# 尝试获取评论
comments_selectors = [
'[data-test="comment-item"]',
'.styles_comment__',
'div[class*="comment"]',
'article[class*="comment"]'
]
comments = []
for selector in comments_selectors:
try:
elements = await self.page.query_selector_all(selector)
if elements:
for element in elements[:5]: # 只获取前5条评论
try:
comment_text = await element.inner_text()
if comment_text and comment_text.strip():
comments.append(comment_text.strip())
except:
continue
if comments:
logger.info(f"使用选择器 {selector} 找到 {len(comments)} 条评论")
break
except:
continue
if not comments:
comments = ["未找到评论"]
# 尝试获取标签
tags_selectors = [
'[data-test="post-topic"]',
'.styles_topic__',
'a[class*="topic"]',
'span[class*="topic"]'
]
tags = []
for selector in tags_selectors:
try:
elements = await self.page.query_selector_all(selector)
if elements:
for element in elements:
try:
tag_text = await element.inner_text()
if tag_text and tag_text.strip():
tags.append(tag_text.strip())
except:
continue
if tags:
logger.info(f"使用选择器 {selector} 找到 {len(tags)} 个标签")
break
except:
continue
if not tags:
tags = ["未找到标签"]
# 尝试获取点赞数和评论数
upvotes = "未找到"
comments_count = "未找到"
try:
upvotes_element = await self.page.query_selector('[data-test="vote-button"]')
if upvotes_element:
upvotes_text = await upvotes_element.inner_text()
if upvotes_text and upvotes_text.strip():
upvotes = upvotes_text.strip()
except:
pass
try:
comments_count_element = await self.page.query_selector('[data-test="comment-count"]')
if comments_count_element:
comments_count_text = await comments_count_element.inner_text()
if comments_count_text and comments_count_text.strip():
comments_count = comments_count_text.strip()
except:
pass
# 尝试获取产品图片
image_url = "未找到图片"
try:
image_element = await self.page.query_selector('img[data-test="product-image"]')
if image_element:
image_url = await image_element.get_attribute('src')
if not image_url:
image_url = await image_element.get_attribute('data-src')
except:
pass
product_info = {
"name": product_name,
"description": product_description,
"tags": tags,
"upvotes": upvotes,
"comments_count": comments_count,
"comments": comments,
"image_url": image_url,
"url": self.product_url
}
logger.success("产品信息提取完成")
return product_info
except Exception as e:
logger.error(f"提取产品信息失败: {str(e)}")
return None
async def save_data(self, data):
"""保存数据到JSON文件"""
try:
with open('product_info_stealth.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.success("数据已保存到 product_info_stealth.json")
return True
except Exception as e:
logger.error(f"保存数据失败: {str(e)}")
return False
async def take_screenshot(self):
"""保存页面截图"""
try:
await self.page.screenshot(path='product_screenshot_stealth.png', full_page=True)
logger.success("页面截图已保存到 product_screenshot_stealth.png")
return True
except Exception as e:
logger.error(f"保存截图失败: {str(e)}")
return False
async def save_html(self):
"""保存页面HTML内容"""
try:
html_content = await self.page.content()
with open('product_page_stealth.html', 'w', encoding='utf-8') as f:
f.write(html_content)
logger.success("页面HTML已保存到 product_page_stealth.html")
return True
except Exception as e:
logger.error(f"保存HTML失败: {str(e)}")
return False
async def close_browser(self):
"""关闭浏览器"""
if self.browser:
await self.browser.close()
logger.info("浏览器已关闭")
async def scrape(self):
"""执行完整的抓取流程"""
try:
if not await self.start_browser():
return False
if not await self.navigate_to_product():
return False
# 保存HTML和截图用于调试
await self.save_html()
await self.take_screenshot()
product_info = await self.extract_product_info()
if product_info:
await self.save_data(product_info)
logger.info(f"抓取完成: {product_info['name']}")
return True
else:
logger.error("未能提取产品信息")
return False
except Exception as e:
logger.error(f"抓取过程中发生错误: {str(e)}")
return False
finally:
await self.close_browser()
async def main():
"""主函数"""
logger.info("开始ProductHunt产品信息抓取使用Stealth模式")
scraper = ProductHuntScraper()
success = await scraper.scrape()
if success:
logger.success("抓取成功完成")
else:
logger.error("抓取失败")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,203 +0,0 @@
import os
import json
import time
from datetime import datetime
import undetected_chromedriver as uc
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from loguru import logger
class ProductHuntScraper:
def __init__(self):
self.driver = None
self.product_url = "https://www.producthunt.com/products/elsie-ai-beta"
def connect_to_chrome(self):
"""连接到Chrome实例"""
try:
logger.info("正在初始化未检测的Chrome驱动...")
# 使用undetected-chromedriver创建驱动实例
options = uc.ChromeOptions()
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("--window-size=1920,1080")
# 创建驱动
self.driver = uc.Chrome(options=options, version_main=142)
logger.info("成功连接到未检测的Chrome实例")
return True
except Exception as e:
logger.error(f"连接Chrome实例失败: {str(e)}")
return False
def navigate_to_product(self):
"""导航到产品页面"""
try:
logger.info(f"正在导航到产品页面: {self.product_url}")
self.driver.get(self.product_url)
# 等待页面加载
logger.info("等待页面加载...")
WebDriverWait(self.driver, 20).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# 额外等待,确保动态内容加载
logger.info("等待动态内容加载...")
time.sleep(10)
logger.info("页面加载完成")
return True
except TimeoutException:
logger.error("页面加载超时")
return False
except Exception as e:
logger.error(f"导航到产品页面失败: {str(e)}")
return False
def extract_product_info(self):
"""提取产品信息"""
try:
logger.info("开始提取产品信息")
product_info = {
"url": self.product_url,
"scraped_at": datetime.now().isoformat()
}
# 提取产品名称 (h1标签)
try:
name_element = self.driver.find_element(By.TAG_NAME, "h1")
product_info["name"] = name_element.text.strip()
logger.info(f"产品名称: {product_info['name']}")
except NoSuchElementException:
logger.warning("未找到产品名称 (h1标签)")
product_info["name"] = "未找到"
# 提取产品简介 (class为"relative text-16 font-normal text-gray-700"的div)
try:
desc_selector = "div.relative.text-16.font-normal.text-gray-700"
desc_element = self.driver.find_element(By.CSS_SELECTOR, desc_selector)
product_info["description"] = desc_element.text.strip()
logger.info(f"产品简介: {product_info['description'][:50]}...")
except NoSuchElementException:
logger.warning("未找到产品简介 (div.relative.text-16.font-normal.text-gray-700)")
product_info["description"] = "未找到"
# 提取第一个评论 (class为"flex flex-1 flex-col gap-2"的div)
try:
comment_selector = "div.flex.flex-1.flex-col.gap-2"
comment_element = self.driver.find_element(By.CSS_SELECTOR, comment_selector)
product_info["first_comment"] = comment_element.text.strip()
logger.info(f"第一个评论: {product_info['first_comment'][:50]}...")
except NoSuchElementException:
logger.warning("未找到第一个评论 (div.flex.flex-1.flex-col.gap-2)")
product_info["first_comment"] = "未找到"
# 尝试提取其他有用信息
try:
# 尝试获取产品标签
tag_elements = self.driver.find_elements(By.CSS_SELECTOR, "[class*='tag'], [class*='category'], [class*='topic']")
if tag_elements:
tags = [tag.text.strip() for tag in tag_elements if tag.text.strip()]
product_info["tags"] = tags[:5] # 最多取5个标签
logger.info(f"找到标签: {tags[:3]}")
except Exception as e:
logger.debug(f"提取标签时出错: {str(e)}")
# 尝试获取点赞数
try:
like_elements = self.driver.find_elements(By.CSS_SELECTOR, "[class*='vote'], [class*='like'], [class*='upvote']")
if like_elements:
product_info["likes"] = like_elements[0].text.strip()
logger.info(f"点赞数: {product_info['likes']}")
except Exception as e:
logger.debug(f"提取点赞数时出错: {str(e)}")
# 尝试获取评论数
try:
comment_count_elements = self.driver.find_elements(By.CSS_SELECTOR, "[class*='comment-count'], [class*='comments']")
if comment_count_elements:
product_info["comment_count"] = comment_count_elements[0].text.strip()
logger.info(f"评论数: {product_info['comment_count']}")
except Exception as e:
logger.debug(f"提取评论数时出错: {str(e)}")
return product_info
except Exception as e:
logger.error(f"提取产品信息失败: {str(e)}")
return None
def save_to_file(self, data, filename="product_info.json"):
"""保存数据到文件"""
try:
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"数据已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存数据失败: {str(e)}")
return False
def save_screenshot(self, filename="product_screenshot.png"):
"""保存页面截图,用于调试"""
try:
self.driver.save_screenshot(filename)
logger.info(f"页面截图已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存页面截图失败: {str(e)}")
return False
def close(self):
"""关闭浏览器"""
if self.driver:
self.driver.quit()
logger.info("浏览器已关闭")
def scrape_product(self):
"""执行完整的抓取流程"""
if not self.connect_to_chrome():
logger.error("无法连接到Chrome实例")
return False
try:
if not self.navigate_to_product():
logger.error("无法导航到产品页面")
return False
# 保存截图用于调试
self.save_screenshot()
product_info = self.extract_product_info()
if product_info:
self.save_to_file(product_info)
return True
else:
logger.error("未能提取产品信息")
return False
finally:
self.close()
def main():
logger.info("开始ProductHunt产品信息抓取")
scraper = ProductHuntScraper()
# 可以修改product_url来抓取其他产品
# scraper.product_url = "https://www.producthunt.com/products/your-product"
success = scraper.scrape_product()
if success:
logger.info("产品信息抓取完成")
else:
logger.error("产品信息抓取失败")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,217 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
使用Playwright连接远程Chrome调试端口访问ProductHunt页面
"""
import asyncio
from playwright.async_api import async_playwright
from loguru import logger
import sys
# 配置日志
logger.remove()
logger.add(sys.stderr, level="INFO", format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>")
class ProductHuntScraper:
"""ProductHunt数据抓取器"""
def __init__(self, debug_port=9222):
self.debug_port = debug_port
self.browser = None
self.page = None
async def connect_to_existing_chrome(self):
"""连接到已运行的Chrome实例"""
logger.info(f"正在连接到Chrome远程调试端口 {self.debug_port}")
try:
# 创建Playwright实例并保持引用
self.playwright = await async_playwright().start()
# 连接到已运行的Chrome实例
self.browser = await self.playwright.chromium.connect_over_cdp(
f"http://localhost:{self.debug_port}"
)
# 获取第一个上下文(通常是默认的)
contexts = self.browser.contexts
if contexts:
context = contexts[0]
# 获取第一个页面
pages = context.pages
if pages:
self.page = pages[0]
else:
# 如果没有页面,创建新页面
self.page = await context.new_page()
else:
# 如果没有上下文,创建新上下文
context = await self.browser.new_context()
self.page = await context.new_page()
logger.success("成功连接到Chrome浏览器")
return True
except Exception as e:
logger.error(f"连接Chrome失败: {e}")
return False
async def navigate_to_producthunt(self, url):
"""导航到ProductHunt页面"""
if not self.page:
logger.error("页面未初始化")
return False
try:
logger.info(f"正在访问: {url}")
# 增加页面导航超时时间到300秒
await self.page.goto(url, wait_until="domcontentloaded", timeout=300000)
# 等待页面加载完成,增加超时时间
await self.page.wait_for_load_state("networkidle", timeout=300000)
# 等待页面标题包含"Product Hunt"最长等待300秒
logger.info("等待页面标题包含'Product Hunt'...")
max_wait_time = 300 # 最大等待时间(秒)
wait_interval = 5 # 检查间隔(秒)
waited_time = 0
while waited_time < max_wait_time:
# 获取页面标题
title = await self.page.title()
logger.info(f"当前页面标题: {title}")
# 检查标题是否包含"Product Hunt"
if "Product Hunt" in title:
logger.success(f"页面标题已包含'Product Hunt',等待时间: {waited_time}")
logger.success("Product Hunt网站已成功打开")
return True
# 等待一段时间后再次检查
await asyncio.sleep(wait_interval)
waited_time += wait_interval
logger.info(f"已等待 {waited_time} 秒,继续等待...")
# 如果超时仍未找到目标标题
logger.warning(f"等待超时({max_wait_time}秒),页面标题仍未包含'Product Hunt'")
logger.info(f"最终页面标题: {await self.page.title()}")
# 即使超时如果页面正常加载也返回True
final_title = await self.page.title()
if final_title and "Not Found" not in final_title and "Error" not in final_title:
logger.success("页面已正常加载,但标题不符合预期")
return True
else:
logger.error("页面加载失败")
return False
except Exception as e:
logger.error(f"访问页面失败: {e}")
return False
async def extract_product_info(self):
"""提取产品信息"""
if not self.page:
logger.error("页面未初始化")
return None
try:
product_info = {}
# 提取产品名称
name_element = await self.page.query_selector("h1")
if name_element:
product_info["name"] = await name_element.text_content()
logger.info(f"产品名称: {product_info['name']}")
# 提取产品描述
desc_element = await self.page.query_selector("[data-testid='product-description']")
if not desc_element:
desc_element = await self.page.query_selector(".styles_description__")
if desc_element:
product_info["description"] = await desc_element.text_content()
logger.info(f"产品描述: {product_info['description'][:100]}...")
# 提取投票数
votes_element = await self.page.query_selector("[data-testid='vote-button']")
if votes_element:
votes_text = await votes_element.text_content()
product_info["votes"] = votes_text
logger.info(f"投票数: {votes_text}")
# 提取产品链接
website_element = await self.page.query_selector("a[href*='://']")
if website_element:
product_info["website"] = await website_element.get_attribute("href")
logger.info(f"产品网站: {product_info['website']}")
# 截取页面截图
screenshot_path = "product_screenshot.png"
await self.page.screenshot(path=screenshot_path, full_page=True)
logger.info(f"页面截图已保存到: {screenshot_path}")
return product_info
except Exception as e:
logger.error(f"提取产品信息失败: {e}")
return None
async def close(self):
"""关闭连接"""
if self.browser:
await self.browser.close()
logger.info("浏览器连接已关闭")
if hasattr(self, 'playwright') and self.playwright:
await self.playwright.stop()
logger.info("Playwright实例已关闭")
async def main():
"""主函数"""
logger.info("开始ProductHunt数据抓取任务")
# 目标URL
target_url = "https://www.producthunt.com/products/notion"
# 创建抓取器实例
scraper = ProductHuntScraper(debug_port=9222)
try:
# 连接到Chrome
if not await scraper.connect_to_existing_chrome():
logger.error("无法连接到Chrome请确保Chrome已启动并启用远程调试")
return
# 导航到目标页面
if not await scraper.navigate_to_producthunt(target_url):
logger.error("页面访问失败")
return
# 提取产品信息
product_info = await scraper.extract_product_info()
if product_info:
logger.success("产品信息提取完成")
# 保存产品信息到JSON文件
import json
with open("product_info.json", "w", encoding="utf-8") as f:
json.dump(product_info, f, ensure_ascii=False, indent=2)
logger.info("产品信息已保存到 product_info.json")
else:
logger.warning("未能提取到产品信息")
except Exception as e:
logger.error(f"执行过程中发生错误: {e}")
finally:
# 关闭连接
await scraper.close()
logger.info("任务完成")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,8 +0,0 @@
{
"url": "https://www.producthunt.com/products/elsie-ai-beta",
"scraped_at": "2025-11-16T22:47:18.351246",
"name": "未找到",
"description": "未找到",
"first_comment": "未找到",
"image_url": "/favicon.ico"
}

File diff suppressed because one or more lines are too long

Binary file not shown.

Before

Width:  |  Height:  |  Size: 37 KiB

View File

@@ -5,3 +5,4 @@ loguru>=0.5.3
zhipuai>=2.1.0
PySide6
selenium>=4.15.0
playwright>=1.40.0

42
start_chrome_debug.ps1 Normal file
View File

@@ -0,0 +1,42 @@
# PowerShell脚本启动Chrome浏览器并启用远程调试
# Chrome浏览器路径
$chromePath = "C:\Program Files\Google\Chrome\Application\chrome.exe"
# 检查Chrome是否已安装
if (-not (Test-Path $chromePath)) {
Write-Host "错误Chrome浏览器未找到请检查安装路径" -ForegroundColor Red
exit 1
}
# 用户数据目录
$userDataDir = "C:\temp\chrome_debug"
# 创建用户数据目录(如果不存在)
if (-not (Test-Path $userDataDir)) {
New-Item -ItemType Directory -Path $userDataDir -Force | Out-Null
Write-Host "已创建用户数据目录: $userDataDir" -ForegroundColor Green
}
# 启动Chrome浏览器
$arguments = @(
"--remote-debugging-port=9222",
"--start-maximized",
"--user-data-dir=`"$userDataDir`""
)
Write-Host "正在启动Chrome浏览器..." -ForegroundColor Yellow
Write-Host "命令: $chromePath $arguments" -ForegroundColor Cyan
# 启动Chrome进程
$process = Start-Process -FilePath $chromePath -ArgumentList $arguments -PassThru
if ($process) {
Write-Host "Chrome浏览器已启动进程ID: $($process.Id)" -ForegroundColor Green
Write-Host "远程调试端口: 9222" -ForegroundColor Green
Write-Host "用户数据目录: $userDataDir" -ForegroundColor Green
Write-Host ""
Write-Host "现在可以运行Playwright脚本来连接此Chrome实例" -ForegroundColor Yellow
} else {
Write-Host "启动Chrome浏览器失败" -ForegroundColor Red
}

View File

@@ -1,76 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
简化版测试脚本,验证 new_data_stealth.py 的基本功能
"""
import asyncio
from loguru import logger
from playwright.async_api import async_playwright
from playwright_stealth.stealth import Stealth
async def test_basic_functionality():
"""测试基本功能"""
logger.info("=== 测试 new_data_stealth.py 基本功能 ===")
try:
# 导入 ProductHuntScraper 类
from product.new_data_stealth import ProductHuntScraper
logger.success("ProductHuntScraper 类导入成功")
# 创建实例
scraper = ProductHuntScraper()
logger.success("ProductHuntScraper 实例创建成功")
# 测试浏览器启动
logger.info("测试浏览器启动...")
browser_started = await scraper.start_browser()
if browser_started:
logger.success("浏览器启动测试通过")
# 测试页面导航(简化版)
logger.info("测试页面导航...")
try:
# 只访问主页测试
await scraper.page.goto("https://www.google.com", {"waitUntil": "networkidle", "timeout": 30000})
logger.success("页面导航测试通过")
# 保存截图测试
await scraper.take_screenshot()
logger.success("截图功能测试通过")
# 保存HTML测试
await scraper.save_html()
logger.success("HTML保存功能测试通过")
except Exception as e:
logger.error(f"页面导航测试失败: {e}")
finally:
# 关闭浏览器
await scraper.close_browser()
logger.info("浏览器已关闭")
logger.success("✅ 所有基本功能测试通过!")
return True
else:
logger.error("❌ 浏览器启动失败")
return False
except Exception as e:
logger.error(f"❌ 测试失败: {e}")
return False
async def main():
"""主函数"""
success = await test_basic_functionality()
if success:
logger.info("🎉 脚本修复成功!现在可以正常运行 new_data_stealth.py")
logger.info("运行命令: python product/new_data_stealth.py")
else:
logger.error("💥 脚本仍有问题,需要进一步调试")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,68 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试 playwright_stealth 修复
"""
import asyncio
from loguru import logger
from playwright.async_api import async_playwright
from playwright_stealth.stealth import Stealth
async def test_stealth():
"""测试 stealth 功能"""
logger.info("开始测试 playwright_stealth 功能")
try:
# 测试 Stealth 类导入和实例化
stealth = Stealth()
logger.success("Stealth 类实例化成功")
# 检查 Stealth 类的方法
methods = [m for m in dir(stealth) if 'apply' in m.lower()]
logger.info(f"Stealth 类包含的方法: {methods}")
# 测试 Playwright 浏览器启动
logger.info("测试 Playwright 浏览器启动...")
playwright = await async_playwright().start()
# 启动浏览器
browser = await playwright.chromium.launch(headless=True)
logger.success("Playwright 浏览器启动成功")
# 创建页面上下文
context = await browser.new_context()
page = await context.new_page()
logger.success("页面创建成功")
# 测试 stealth 应用
logger.info("测试应用 stealth 设置...")
await stealth.apply_stealth_async(page)
logger.success("stealth 设置应用成功")
# 关闭浏览器
await browser.close()
await playwright.stop()
logger.success("浏览器关闭成功")
logger.success("所有测试通过playwright_stealth 修复成功")
return True
except Exception as e:
logger.error(f"测试失败: {e}")
return False
async def main():
"""主函数"""
logger.info("=== playwright_stealth 修复测试 ===")
success = await test_stealth()
if success:
logger.info("✅ 修复成功!您现在可以正常运行 new_data_stealth.py")
logger.info("运行命令: python product/new_data_stealth.py")
else:
logger.error("❌ 修复失败,请检查错误信息")
if __name__ == "__main__":
asyncio.run(main())

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

Binary file not shown.

File diff suppressed because it is too large Load Diff