Files
tophux_scrape/product/new_data_playwright_cloudflare.py

418 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import json
import time
from datetime import datetime
from playwright.sync_api import sync_playwright
from loguru import logger
class ProductHuntScraper:
def __init__(self):
self.browser = None
self.context = None
self.page = None
self.product_url = "https://www.producthunt.com/products/elsie-ai-beta"
def connect_to_browser(self):
"""连接到浏览器"""
try:
logger.info("正在初始化Playwright浏览器...")
# 启动Playwright
self.playwright = sync_playwright().start()
# 启动Chromium浏览器
self.browser = self.playwright.chromium.launch(
headless=False, # 设置为False以便观察浏览器行为
args=[
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--window-size=1920,1080",
"--disable-blink-features=AutomationControlled",
"--disable-web-security",
"--disable-features=VizDisplayCompositor"
]
)
# 创建浏览器上下文
self.context = self.browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
viewport={"width": 1920, "height": 1080},
ignore_https_errors=True,
extra_http_headers={
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"Cache-Control": "no-cache",
"Pragma": "no-cache"
}
)
# 创建新页面
self.page = self.context.new_page()
# 添加额外的初始化脚本,防止被检测为自动化工具
self.page.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// 覆盖permissions API
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
""")
logger.info("成功连接到Playwright浏览器")
return True
except Exception as e:
logger.error(f"连接浏览器失败: {str(e)}")
return False
def wait_for_cloudflare(self, timeout=120):
"""等待Cloudflare验证完成"""
logger.info("等待Cloudflare验证完成...")
start_time = time.time()
while time.time() - start_time < timeout:
try:
# 检查是否还在Cloudflare验证页面
title = self.page.title()
logger.info(f"当前页面标题: {title}")
# 如果标题包含"Product Hunt"或产品名称,说明验证成功
if "Product Hunt" in title or "elsie" in title.lower():
logger.info("Cloudflare验证成功")
return True
# 检查是否有Cloudflare验证元素
cloudflare_element = self.page.query_selector("#challenge-form")
if cloudflare_element:
logger.info("检测到Cloudflare验证页面等待验证...")
time.sleep(5)
continue
# 检查是否有验证成功的元素
success_element = self.page.query_selector("#challenge-success-text")
if success_element:
logger.info("Cloudflare验证成功等待页面跳转...")
time.sleep(5)
continue
# 检查是否已经跳转到产品页面
current_url = self.page.url
if "products/elsie-ai-beta" in current_url and "challenge" not in current_url:
logger.info("已成功跳转到产品页面")
return True
time.sleep(2)
except Exception as e:
logger.debug(f"等待Cloudflare验证时出错: {str(e)}")
time.sleep(2)
logger.warning(f"等待Cloudflare验证超时 ({timeout}秒)")
return False
def navigate_to_product(self):
"""导航到产品页面"""
try:
logger.info(f"正在导航到产品页面: {self.product_url}")
# 直接访问产品页面
logger.info("直接访问产品页面")
self.page.goto(self.product_url, wait_until="domcontentloaded", timeout=60000)
# 等待Cloudflare验证完成
if not self.wait_for_cloudflare():
logger.error("Cloudflare验证失败或超时")
return False
# 等待页面加载
logger.info("等待页面内容加载...")
time.sleep(10) # 等待动态内容加载
# 检查页面URL和标题
current_url = self.page.url
page_title = self.page.title()
logger.info(f"当前页面URL: {current_url}")
logger.info(f"页面标题: {page_title}")
# 尝试等待特定元素加载
try:
logger.info("等待页面内容加载...")
# 等待可能的加载指示器消失
self.page.wait_for_selector("body", timeout=30000)
# 尝试等待一些可能存在的元素
possible_selectors = [
"h1",
"[data-test='product-name']",
".product-name",
"div[class*='product']",
"div[class*='styles_']"
]
for selector in possible_selectors:
try:
self.page.wait_for_selector(selector, timeout=5000)
logger.info(f"找到元素: {selector}")
break
except:
continue
except Exception as e:
logger.warning(f"等待页面元素时出错: {str(e)}")
logger.info("页面加载完成")
return True
except Exception as e:
logger.error(f"导航到产品页面失败: {str(e)}")
return False
def extract_product_info(self):
"""提取产品信息"""
try:
logger.info("开始提取产品信息")
product_info = {
"url": self.page.url,
"scraped_at": datetime.now().isoformat()
}
# 提取产品名称 - 尝试多种选择器
name_selectors = [
"h1",
"[data-test='product-name']",
".product-name",
"[class*='product'][class*='name']",
".styles_productName__",
"[class*='heading'][class*='xl']",
"div[class*='text-2xl']",
"div[class*='text-3xl']",
"div[class*='text-4xl']",
"div[class*='text-5xl']",
"div[class*='text-6xl']",
"div[class*='font-bold']",
"div[class*='font-semibold']"
]
for selector in name_selectors:
try:
name_element = self.page.query_selector(selector)
if name_element:
name_text = name_element.text_content().strip()
if name_text and name_text != "www.producthunt.com" and len(name_text) > 2:
product_info["name"] = name_text
logger.info(f"使用选择器 {selector} 找到产品名称: {product_info['name']}")
break
except Exception as e:
logger.debug(f"使用选择器 {selector} 提取产品名称时出错: {str(e)}")
if "name" not in product_info:
logger.warning("未找到产品名称")
product_info["name"] = "未找到"
# 提取产品简介 - 尝试多种选择器
desc_selectors = [
"div.relative.text-16.font-normal.text-gray-700",
".text-16.font-normal.text-gray-700",
"[class*='text-16'][class*='font-normal'][class*='text-gray-700']",
"div[class*='description']",
".product-description",
"div[class*='tagline']",
"[data-test='product-tagline']",
".styles_tagline__",
"p[class*='text-gray']",
"div[class*='mb-4']",
"div[class*='text-base']",
"div[class*='text-lg']",
"div[class*='text-gray-600']",
"div[class*='text-gray-700']",
"div[class*='text-gray-800']"
]
for selector in desc_selectors:
try:
desc_element = self.page.query_selector(selector)
if desc_element:
desc_text = desc_element.text_content().strip()
if desc_text and len(desc_text) > 10: # 确保是有意义的描述
product_info["description"] = desc_text
logger.info(f"使用选择器 {selector} 找到产品简介: {product_info['description'][:50]}...")
break
except Exception as e:
logger.debug(f"使用选择器 {selector} 提取产品简介时出错: {str(e)}")
if "description" not in product_info:
logger.warning("未找到产品简介")
product_info["description"] = "未找到"
# 提取第一个评论 - 尝试多种选择器
comment_selectors = [
"div.flex.flex-1.flex-col.gap-2",
".flex.flex-1.flex-col.gap-2",
"[class*='flex'][class*='flex-1'][class*='flex-col'][class*='gap-2']",
"div[class*='comment']",
".comment-text",
"div[class*='review']",
"div[class*='feedback']",
"blockquote",
"div[class*='border']",
"[data-test='comment']",
"div[class*='text-sm']",
"div[class*='text-xs']",
"div[class*='mt-2']",
"div[class*='mb-2']"
]
for selector in comment_selectors:
try:
comment_element = self.page.query_selector(selector)
if comment_element:
comment_text = comment_element.text_content().strip()
if comment_text and len(comment_text) > 10: # 确保是有意义的评论
product_info["first_comment"] = comment_text
logger.info(f"使用选择器 {selector} 找到第一个评论: {product_info['first_comment'][:50]}...")
break
except Exception as e:
logger.debug(f"使用选择器 {selector} 提取第一个评论时出错: {str(e)}")
if "first_comment" not in product_info:
logger.warning("未找到第一个评论")
product_info["first_comment"] = "未找到"
# 尝试提取其他有用信息
try:
# 尝试获取产品标签
tag_elements = self.page.query_selector_all("[class*='tag'], [class*='category'], [class*='topic'], [class*='badge']")
if tag_elements:
tags = [tag.text_content().strip() for tag in tag_elements if tag.text_content().strip()]
product_info["tags"] = tags[:5] # 最多取5个标签
logger.info(f"找到标签: {tags[:3]}")
except Exception as e:
logger.debug(f"提取标签时出错: {str(e)}")
# 尝试获取点赞数
try:
like_elements = self.page.query_selector_all("[class*='vote'], [class*='like'], [class*='upvote'], [data-test='vote-count'], [class*='count']")
if like_elements:
product_info["likes"] = like_elements[0].text_content().strip()
logger.info(f"点赞数: {product_info['likes']}")
except Exception as e:
logger.debug(f"提取点赞数时出错: {str(e)}")
# 尝试获取评论数
try:
comment_count_elements = self.page.query_selector_all("[class*='comment-count'], [class*='comments'], [data-test='comment-count'], [class*='count']")
if comment_count_elements:
product_info["comment_count"] = comment_count_elements[0].text_content().strip()
logger.info(f"评论数: {product_info['comment_count']}")
except Exception as e:
logger.debug(f"提取评论数时出错: {str(e)}")
# 尝试获取产品图片
try:
img_elements = self.page.query_selector_all("img[class*='product'], img[alt*='product'], img[alt*='logo'], img[src*='product']")
if img_elements:
product_info["image_url"] = img_elements[0].get_attribute("src")
logger.info(f"产品图片URL: {product_info['image_url']}")
except Exception as e:
logger.debug(f"提取产品图片时出错: {str(e)}")
return product_info
except Exception as e:
logger.error(f"提取产品信息失败: {str(e)}")
return None
def save_to_file(self, data, filename="product_info.json"):
"""保存数据到文件"""
try:
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"数据已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存数据失败: {str(e)}")
return False
def save_screenshot(self, filename="product_screenshot.png"):
"""保存页面截图,用于调试"""
try:
self.page.screenshot(path=filename, full_page=True)
logger.info(f"页面截图已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存页面截图失败: {str(e)}")
return False
def save_html(self, filename="product_page.html"):
"""保存页面HTML用于调试"""
try:
html_content = self.page.content()
with open(filename, "w", encoding="utf-8") as f:
f.write(html_content)
logger.info(f"页面HTML已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存页面HTML失败: {str(e)}")
return False
def close(self):
"""关闭浏览器"""
try:
if self.context:
self.context.close()
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
logger.info("浏览器已关闭")
except Exception as e:
logger.error(f"关闭浏览器时出错: {str(e)}")
def scrape_product(self):
"""执行完整的抓取流程"""
if not self.connect_to_browser():
logger.error("无法连接到浏览器")
return False
try:
if not self.navigate_to_product():
logger.error("无法导航到产品页面")
return False
# 保存截图和HTML用于调试
self.save_screenshot()
self.save_html()
product_info = self.extract_product_info()
if product_info:
self.save_to_file(product_info)
return True
else:
logger.error("未能提取产品信息")
return False
finally:
self.close()
def main():
logger.info("开始ProductHunt产品信息抓取")
scraper = ProductHuntScraper()
# 可以修改product_url来抓取其他产品
# scraper.product_url = "https://www.producthunt.com/products/your-product"
success = scraper.scrape_product()
if success:
logger.info("产品信息抓取完成")
else:
logger.error("产品信息抓取失败")
if __name__ == "__main__":
main()