Files
guba-indicator/spider.py

360 lines
13 KiB
Python
Raw Normal View History

"""
爬虫模块 - 网站评论抓取
"""
import requests
from lxml import etree
import time
from typing import List, Dict, Optional
from urllib.parse import urljoin
from bs4 import BeautifulSoup
import random
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from loguru import logger
class SpiderManager:
"""爬虫管理器"""
def __init__(self, config: Dict):
self.config = config
self.session = requests.Session()
self.session.headers.update({
'User-Agent': config.get('user_agent', 'Mozilla/5.0')
})
self.retry_times = config.get('retry_times', 3)
self.retry_interval = config.get('retry_interval', 5)
logger.info(f"爬虫管理器初始化完成目标URL: {config.get('target_url', '')}")
def fetch(self, url: str = None, xpath: str = None) -> List[Dict]:
"""
抓取网页评论
返回评论列表每个元素包含 content url
"""
target_url = url or self.config.get('target_url', '')
target_xpath = xpath or self.config.get('xpath', '')
if not target_url:
logger.warning("未设置目标URL")
return []
logger.info(f"开始抓取: {target_url}")
html = self._fetch_with_retry(target_url)
if not html:
logger.warning("网页获取失败")
return []
comments = self._parse_comments(html, target_xpath, target_url)
logger.info(f"解析完成,获取到 {len(comments)} 条评论")
return comments
def _fetch_with_retry(self, url: str, max_retries: int = None) -> Optional[str]:
"""带重试的网页获取"""
max_retries = max_retries or self.retry_times
for attempt in range(max_retries):
try:
logger.debug(f"尝试 {attempt + 1}/{max_retries} 获取网页")
response = self.session.get(url, timeout=30)
response.raise_for_status()
response.encoding = response.apparent_encoding
logger.debug(f"网页获取成功,大小: {len(response.text)} 字节")
return response.text
except requests.RequestException as e:
logger.warning(f"请求失败 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(self.retry_interval + random.uniform(0, 2))
else:
logger.error(f"所有重试均失败: {url}")
return None
return None
def get_page_title(self, url: str = None) -> str:
"""获取页面标题"""
target_url = url or self.config.get('target_url', '')
if not target_url:
logger.warning("未设置目标URL")
return ""
logger.info(f"获取页面标题: {target_url}")
html = self._fetch_with_retry(target_url)
if not html:
logger.warning("网页获取失败")
return ""
try:
# 使用 lxml 解析页面标题
tree = etree.HTML(html)
title_elements = tree.xpath('//title/text()')
if title_elements:
title = title_elements[0].strip()
logger.info(f"获取到页面标题: {title}")
return title
else:
logger.warning("未找到页面标题")
return ""
except Exception as e:
logger.error(f"解析页面标题失败: {e}")
return ""
def _parse_comments(self, html: str, xpath: str, base_url: str) -> List[Dict]:
"""解析评论"""
comments = []
try:
# 使用 lxml 解析
tree = etree.HTML(html)
elements = tree.xpath(xpath)
logger.debug(f"XPath 匹配到 {len(elements)} 个元素")
for elem in elements:
try:
text = etree.tostring(elem, method='text', encoding='unicode').strip()
if text:
# 获取链接的 href如果存在
href = elem.get('href')
full_url = urljoin(base_url, href) if href else base_url
comments.append({
'content': text,
'url': full_url
})
except Exception as e:
logger.error(f"解析元素失败: {e}")
continue
except Exception as e:
logger.error(f"XPath解析失败: {e}")
# 备选解析方法
comments = self._fallback_parse(html, base_url)
return comments
def _fallback_parse(self, html: str, base_url: str) -> List[Dict]:
"""备选解析方法 - 使用 BeautifulSoup"""
comments = []
try:
logger.debug("使用备选解析方法")
soup = BeautifulSoup(html, 'lxml')
# 尝试查找常见的评论元素
# 这里可以根据实际网站结构调整选择器
elements = soup.find_all(['a', 'div', 'p', 'span'], class_=lambda x: x and 'linkblack' in x if x else False)
for elem in elements[:50]: # 限制数量
text = elem.get_text().strip()
if text and len(text) > 5:
comments.append({
'content': text,
'url': base_url
})
logger.debug(f"备选解析获取到 {len(comments)} 条评论")
except Exception as e:
logger.error(f"备选解析失败: {e}")
return comments
def _fetch_sse_with_selenium(self, url: str) -> Optional[str]:
"""使用 Selenium 获取页面内容"""
driver = None
try:
# 配置 Chrome 选项
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--window-size=1920,1080')
chrome_options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
# 创建 WebDriver
driver = webdriver.Chrome(options=chrome_options)
# 访问页面
driver.get(url)
# 等待页面加载
WebDriverWait(driver, 30).until(
EC.presence_of_element_located((By.ID, "wmt_china"))
)
# 额外等待确保数据加载完成
time.sleep(2)
# 获取页面内容
html = driver.page_source
logger.debug(f"Selenium 获取页面成功,大小: {len(html)} 字节")
return html
except Exception as e:
logger.error(f"Selenium 获取页面失败: {e}")
return None
finally:
if driver:
driver.quit()
def fetch_sse_stock_data(self) -> Dict[str, float]:
"""
使用新浪财经JS接口获取上证指数数据
返回包含时间和数值的字典
"""
from datetime import datetime
import re
sse_url = "https://hq.sinajs.cn/list=sh000001"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Referer': 'https://finance.sina.com.cn/'
}
logger.info(f"通过新浪JS接口获取上证指数数据: {sse_url}")
try:
response = self.session.get(sse_url, timeout=30, headers=headers)
response.raise_for_status()
content = response.text
logger.debug(f"获取到响应: {content}")
pattern = r'var hq_str_sh000001="([^"]+)"'
match = re.search(pattern, content)
if not match:
logger.warning("未能解析新浪JS接口返回数据")
return {}
data_fields = match.group(1).split(',')
if len(data_fields) < 32:
logger.warning(f"数据字段不足: {len(data_fields)}")
return {}
stock_name = data_fields[0]
current_price = float(data_fields[3]) if data_fields[3] else 0.0
current_time = datetime.now().strftime("%H:%M")
logger.info(f"✓ 成功获取 {stock_name}: {current_price} (时间: {current_time})")
return {
'time': current_time,
'value': current_price
}
except requests.RequestException as e:
logger.error(f"请求新浪JS接口失败: {e}")
return {}
except (ValueError, IndexError) as e:
logger.error(f"解析数据失败: {e}")
return {}
except Exception as e:
logger.error(f"获取股票数据异常: {e}")
return {}
def set_user_agent(self, user_agent: str):
"""更新User-Agent"""
self.session.headers.update({'User-Agent': user_agent})
def update_config(self, config: Dict):
"""更新配置"""
self.config.update(config)
if 'user_agent' in config:
self.set_user_agent(config['user_agent'])
if 'retry_times' in config:
self.retry_times = config['retry_times']
if 'retry_interval' in config:
self.retry_interval = config['retry_interval']
def get_fetch_interval(self) -> int:
"""获取爬取间隔"""
return self.config.get('fetch_interval', 60)
def fetch_sse_screenshot(self) -> str:
"""
爬取上证所网页指定元素截图
返回截图文件路径
"""
from playwright.sync_api import sync_playwright
import os
import sys
url = "https://www.sse.com.cn/"
xpath = "//div[contains(@class,'gray_bg')]//div[contains(@class,'col-md-7')]"
logger.info(f"开始爬取上证所网页截图: {url}")
logger.info(f"目标XPath: {xpath}")
# 获取当前脚本目录
if getattr(sys, 'frozen', False):
# 打包后的环境
current_dir = os.path.dirname(sys.executable)
# 设置Playwright浏览器路径
playwright_dir = os.path.join(current_dir, '_internal', 'ms-playwright')
logger.info(f"打包环境Playwright浏览器路径: {playwright_dir}")
else:
# 开发环境
current_dir = os.path.dirname(os.path.abspath(__file__))
playwright_dir = None
logger.info(f"开发环境,当前目录: {current_dir}")
output_dir = current_dir
screenshot_path = os.path.join(output_dir, "sse_screenshot.png")
try:
with sync_playwright() as p:
# 设置浏览器路径
browser_launch_options = {
'headless': True
}
if playwright_dir:
browser_launch_options['executable_path'] = os.path.join(playwright_dir, 'chromium-1091', 'chrome-win', 'chrome.exe')
logger.info(f"使用自定义浏览器路径: {browser_launch_options['executable_path']}")
browser = p.chromium.launch(**browser_launch_options)
page = browser.new_page()
page.set_default_timeout(60000)
logger.info("正在访问页面...")
page.goto(url, wait_until="networkidle")
logger.info("等待页面加载完成...")
page.wait_for_load_state("domcontentloaded")
page.wait_for_timeout(5000)
logger.info(f"查找XPath元素: {xpath}")
element = page.locator(f"xpath={xpath}")
if element.count() > 0:
logger.info("✓ 找到目标元素")
is_visible = element.is_visible()
logger.info(f"元素可见: {is_visible}")
if not is_visible:
logger.info("元素不可见,尝试滚动到可见区域...")
element.scroll_into_view_if_needed()
page.wait_for_timeout(2000)
logger.info(f"正在截取元素截图到: {screenshot_path}")
element.screenshot(path=screenshot_path)
logger.info("✓ 截屏成功")
browser.close()
return screenshot_path
else:
logger.warning("✗ 未找到目标元素")
browser.close()
return ""
except Exception as e:
logger.error(f"爬取上证所截图失败: {e}")
logger.exception(e) # 记录详细异常
return ""