""" 爬虫模块 - 网站评论抓取 """ import requests from lxml import etree import time from typing import List, Dict, Optional from urllib.parse import urljoin from bs4 import BeautifulSoup import random from loguru import logger class SpiderManager: """爬虫管理器""" def __init__(self, config: Dict): self.config = config self.session = requests.Session() self.session.headers.update({ 'User-Agent': config.get('user_agent', 'Mozilla/5.0') }) self.retry_times = config.get('retry_times', 3) self.retry_interval = config.get('retry_interval', 5) logger.info(f"爬虫管理器初始化完成,目标URL: {config.get('target_url', '')}") def fetch(self, url: str = None, xpath: str = None) -> List[Dict]: """ 抓取网页评论 返回评论列表,每个元素包含 content 和 url """ target_url = url or self.config.get('target_url', '') target_xpath = xpath or self.config.get('xpath', '') if not target_url: logger.warning("未设置目标URL") return [] logger.info(f"开始抓取: {target_url}") html = self._fetch_with_retry(target_url) if not html: logger.warning("网页获取失败") return [] comments = self._parse_comments(html, target_xpath, target_url) logger.info(f"解析完成,获取到 {len(comments)} 条评论") return comments def _fetch_with_retry(self, url: str, max_retries: int = None) -> Optional[str]: """带重试的网页获取""" max_retries = max_retries or self.retry_times for attempt in range(max_retries): try: logger.debug(f"尝试 {attempt + 1}/{max_retries} 获取网页") response = self.session.get(url, timeout=30) response.raise_for_status() response.encoding = response.apparent_encoding logger.debug(f"网页获取成功,大小: {len(response.text)} 字节") return response.text except requests.RequestException as e: logger.warning(f"请求失败 (尝试 {attempt + 1}/{max_retries}): {e}") if attempt < max_retries - 1: time.sleep(self.retry_interval + random.uniform(0, 2)) else: logger.error(f"所有重试均失败: {url}") return None return None def get_page_title(self, url: str = None) -> str: """获取页面标题""" target_url = url or self.config.get('target_url', '') if not target_url: logger.warning("未设置目标URL") return "" logger.info(f"获取页面标题: {target_url}") html = self._fetch_with_retry(target_url) if not html: logger.warning("网页获取失败") return "" try: # 使用 lxml 解析页面标题 tree = etree.HTML(html) title_elements = tree.xpath('//title/text()') if title_elements: title = title_elements[0].strip() logger.info(f"获取到页面标题: {title}") return title else: logger.warning("未找到页面标题") return "" except Exception as e: logger.error(f"解析页面标题失败: {e}") return "" def _parse_comments(self, html: str, xpath: str, base_url: str) -> List[Dict]: """解析评论""" comments = [] try: # 使用 lxml 解析 tree = etree.HTML(html) elements = tree.xpath(xpath) logger.debug(f"XPath 匹配到 {len(elements)} 个元素") for elem in elements: try: text = etree.tostring(elem, method='text', encoding='unicode').strip() if text: # 获取链接的 href(如果存在) href = elem.get('href') full_url = urljoin(base_url, href) if href else base_url comments.append({ 'content': text, 'url': full_url }) except Exception as e: logger.error(f"解析元素失败: {e}") continue except Exception as e: logger.error(f"XPath解析失败: {e}") # 备选解析方法 comments = self._fallback_parse(html, base_url) return comments def _fallback_parse(self, html: str, base_url: str) -> List[Dict]: """备选解析方法 - 使用 BeautifulSoup""" comments = [] try: logger.debug("使用备选解析方法") soup = BeautifulSoup(html, 'lxml') # 尝试查找常见的评论元素 # 这里可以根据实际网站结构调整选择器 elements = soup.find_all(['a', 'div', 'p', 'span'], class_=lambda x: x and 'linkblack' in x if x else False) for elem in elements[:50]: # 限制数量 text = elem.get_text().strip() if text and len(text) > 5: comments.append({ 'content': text, 'url': base_url }) logger.debug(f"备选解析获取到 {len(comments)} 条评论") except Exception as e: logger.error(f"备选解析失败: {e}") return comments def is_trading_time(self) -> bool: """判断当前是否为交易时间""" from datetime import datetime, time current_time = datetime.now().time() # 上午交易时间: 9:30-11:30 morning_start = time(9, 30) morning_end = time(11, 30) # 下午交易时间: 13:00-15:00 afternoon_start = time(13, 0) afternoon_end = time(15, 0) # 判断是否在交易时间内 is_trading = ((morning_start <= current_time <= morning_end) or (afternoon_start <= current_time <= afternoon_end)) logger.debug(f"当前时间 {current_time.strftime('%H:%M')} 是否为交易时间: {is_trading}") return is_trading def fetch_sse_stock_data(self) -> Dict[str, float]: """ 爬取上海证券交易所股票数据 返回包含时间和数值的字典 """ # 检查是否为交易时间 if not self.is_trading_time(): logger.info("当前为非交易时间,跳过股票数据爬取") return {} sse_url = "https://www.sse.com.cn/" xpath = "//*[@id=\"hq_area\"]" logger.info(f"开始爬取上海证券交易所数据: {sse_url}") html = self._fetch_with_retry(sse_url) if not html: logger.warning("上海证券交易所网页获取失败") return {} try: # 使用 lxml 解析 tree = etree.HTML(html) # 尝试获取股票数值 elements = tree.xpath(xpath) if not elements: logger.warning("未找到股票数据元素,尝试备用XPath") # 尝试备用XPath backup_xpaths = [ "//*[@id='hq_controller']//td[contains(@class, 'price')]//text()", "//*[contains(@class, 'stock-price')]//text()", "//*[contains(@class, 'price')]//text()" ] for backup_xpath in backup_xpaths: elements = tree.xpath(backup_xpath) if elements: logger.info(f"使用备用XPath获取到数据") break if elements: # 获取文本内容并尝试转换为数值 text_content = etree.tostring(elements[0], method='text', encoding='unicode').strip() logger.info(f"获取到股票数据文本: {text_content}") # 提取数值(可能包含逗号、小数点等) import re # 匹配数字(包括小数点和逗号分隔符) numbers = re.findall(r'[\d,]+(?:\.\d+)?', text_content) if numbers: # 去除逗号并转换为浮点数 value_str = numbers[0].replace(',', '') try: stock_value = float(value_str) # 获取当前时间 from datetime import datetime current_time = datetime.now().strftime("%H:%M") logger.info(f"成功获取股票数据: 时间={current_time}, 值={stock_value}") return { 'time': current_time, 'value': stock_value } except ValueError as e: logger.error(f"数值转换失败: {value_str}, 错误: {e}") else: logger.warning(f"未找到有效数值: {text_content}") else: logger.warning("未找到股票数据元素") except Exception as e: logger.error(f"解析上海证券交易所数据失败: {e}") return {} def set_user_agent(self, user_agent: str): """更新User-Agent""" self.session.headers.update({'User-Agent': user_agent}) def update_config(self, config: Dict): """更新配置""" self.config.update(config) if 'user_agent' in config: self.set_user_agent(config['user_agent']) if 'retry_times' in config: self.retry_times = config['retry_times'] if 'retry_interval' in config: self.retry_interval = config['retry_interval'] def get_fetch_interval(self) -> int: """获取爬取间隔""" return self.config.get('fetch_interval', 60)