Files
guba-indicator/spider.py
xiaji 96f206ea78 feat: 新增股票数据波形图和截图功能
refactor: 重构数据库和LLM分析器逻辑

fix: 修复爬虫解析和UI显示问题

docs: 更新配置文件和注释

style: 优化代码格式和日志输出
2026-01-12 09:19:38 +08:00

272 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
爬虫模块 - 网站评论抓取
"""
import requests
from lxml import etree
import time
from typing import List, Dict, Optional
from urllib.parse import urljoin
from bs4 import BeautifulSoup
import random
from loguru import logger
class SpiderManager:
"""爬虫管理器"""
def __init__(self, config: Dict):
self.config = config
self.session = requests.Session()
self.session.headers.update({
'User-Agent': config.get('user_agent', 'Mozilla/5.0')
})
self.retry_times = config.get('retry_times', 3)
self.retry_interval = config.get('retry_interval', 5)
logger.info(f"爬虫管理器初始化完成目标URL: {config.get('target_url', '')}")
def fetch(self, url: str = None, xpath: str = None) -> List[Dict]:
"""
抓取网页评论
返回评论列表,每个元素包含 content 和 url
"""
target_url = url or self.config.get('target_url', '')
target_xpath = xpath or self.config.get('xpath', '')
if not target_url:
logger.warning("未设置目标URL")
return []
logger.info(f"开始抓取: {target_url}")
html = self._fetch_with_retry(target_url)
if not html:
logger.warning("网页获取失败")
return []
comments = self._parse_comments(html, target_xpath, target_url)
logger.info(f"解析完成,获取到 {len(comments)} 条评论")
return comments
def _fetch_with_retry(self, url: str, max_retries: int = None) -> Optional[str]:
"""带重试的网页获取"""
max_retries = max_retries or self.retry_times
for attempt in range(max_retries):
try:
logger.debug(f"尝试 {attempt + 1}/{max_retries} 获取网页")
response = self.session.get(url, timeout=30)
response.raise_for_status()
response.encoding = response.apparent_encoding
logger.debug(f"网页获取成功,大小: {len(response.text)} 字节")
return response.text
except requests.RequestException as e:
logger.warning(f"请求失败 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(self.retry_interval + random.uniform(0, 2))
else:
logger.error(f"所有重试均失败: {url}")
return None
return None
def get_page_title(self, url: str = None) -> str:
"""获取页面标题"""
target_url = url or self.config.get('target_url', '')
if not target_url:
logger.warning("未设置目标URL")
return ""
logger.info(f"获取页面标题: {target_url}")
html = self._fetch_with_retry(target_url)
if not html:
logger.warning("网页获取失败")
return ""
try:
# 使用 lxml 解析页面标题
tree = etree.HTML(html)
title_elements = tree.xpath('//title/text()')
if title_elements:
title = title_elements[0].strip()
logger.info(f"获取到页面标题: {title}")
return title
else:
logger.warning("未找到页面标题")
return ""
except Exception as e:
logger.error(f"解析页面标题失败: {e}")
return ""
def _parse_comments(self, html: str, xpath: str, base_url: str) -> List[Dict]:
"""解析评论"""
comments = []
try:
# 使用 lxml 解析
tree = etree.HTML(html)
elements = tree.xpath(xpath)
logger.debug(f"XPath 匹配到 {len(elements)} 个元素")
for elem in elements:
try:
text = etree.tostring(elem, method='text', encoding='unicode').strip()
if text:
# 获取链接的 href如果存在
href = elem.get('href')
full_url = urljoin(base_url, href) if href else base_url
comments.append({
'content': text,
'url': full_url
})
except Exception as e:
logger.error(f"解析元素失败: {e}")
continue
except Exception as e:
logger.error(f"XPath解析失败: {e}")
# 备选解析方法
comments = self._fallback_parse(html, base_url)
return comments
def _fallback_parse(self, html: str, base_url: str) -> List[Dict]:
"""备选解析方法 - 使用 BeautifulSoup"""
comments = []
try:
logger.debug("使用备选解析方法")
soup = BeautifulSoup(html, 'lxml')
# 尝试查找常见的评论元素
# 这里可以根据实际网站结构调整选择器
elements = soup.find_all(['a', 'div', 'p', 'span'], class_=lambda x: x and 'linkblack' in x if x else False)
for elem in elements[:50]: # 限制数量
text = elem.get_text().strip()
if text and len(text) > 5:
comments.append({
'content': text,
'url': base_url
})
logger.debug(f"备选解析获取到 {len(comments)} 条评论")
except Exception as e:
logger.error(f"备选解析失败: {e}")
return comments
def is_trading_time(self) -> bool:
"""判断当前是否为交易时间"""
from datetime import datetime, time
current_time = datetime.now().time()
# 上午交易时间: 9:30-11:30
morning_start = time(9, 30)
morning_end = time(11, 30)
# 下午交易时间: 13:00-15:00
afternoon_start = time(13, 0)
afternoon_end = time(15, 0)
# 判断是否在交易时间内
is_trading = ((morning_start <= current_time <= morning_end) or
(afternoon_start <= current_time <= afternoon_end))
logger.debug(f"当前时间 {current_time.strftime('%H:%M')} 是否为交易时间: {is_trading}")
return is_trading
def fetch_sse_stock_data(self) -> Dict[str, float]:
"""
爬取上海证券交易所股票数据
返回包含时间和数值的字典
"""
# 检查是否为交易时间
if not self.is_trading_time():
logger.info("当前为非交易时间,跳过股票数据爬取")
return {}
sse_url = "https://www.sse.com.cn/"
xpath = "//*[@id=\"hq_area\"]"
logger.info(f"开始爬取上海证券交易所数据: {sse_url}")
html = self._fetch_with_retry(sse_url)
if not html:
logger.warning("上海证券交易所网页获取失败")
return {}
try:
# 使用 lxml 解析
tree = etree.HTML(html)
# 尝试获取股票数值
elements = tree.xpath(xpath)
if not elements:
logger.warning("未找到股票数据元素尝试备用XPath")
# 尝试备用XPath
backup_xpaths = [
"//*[@id='hq_controller']//td[contains(@class, 'price')]//text()",
"//*[contains(@class, 'stock-price')]//text()",
"//*[contains(@class, 'price')]//text()"
]
for backup_xpath in backup_xpaths:
elements = tree.xpath(backup_xpath)
if elements:
logger.info(f"使用备用XPath获取到数据")
break
if elements:
# 获取文本内容并尝试转换为数值
text_content = etree.tostring(elements[0], method='text', encoding='unicode').strip()
logger.info(f"获取到股票数据文本: {text_content}")
# 提取数值(可能包含逗号、小数点等)
import re
# 匹配数字(包括小数点和逗号分隔符)
numbers = re.findall(r'[\d,]+(?:\.\d+)?', text_content)
if numbers:
# 去除逗号并转换为浮点数
value_str = numbers[0].replace(',', '')
try:
stock_value = float(value_str)
# 获取当前时间
from datetime import datetime
current_time = datetime.now().strftime("%H:%M")
logger.info(f"成功获取股票数据: 时间={current_time}, 值={stock_value}")
return {
'time': current_time,
'value': stock_value
}
except ValueError as e:
logger.error(f"数值转换失败: {value_str}, 错误: {e}")
else:
logger.warning(f"未找到有效数值: {text_content}")
else:
logger.warning("未找到股票数据元素")
except Exception as e:
logger.error(f"解析上海证券交易所数据失败: {e}")
return {}
def set_user_agent(self, user_agent: str):
"""更新User-Agent"""
self.session.headers.update({'User-Agent': user_agent})
def update_config(self, config: Dict):
"""更新配置"""
self.config.update(config)
if 'user_agent' in config:
self.set_user_agent(config['user_agent'])
if 'retry_times' in config:
self.retry_times = config['retry_times']
if 'retry_interval' in config:
self.retry_interval = config['retry_interval']
def get_fetch_interval(self) -> int:
"""获取爬取间隔"""
return self.config.get('fetch_interval', 60)