#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ TopHub网站数据抓取脚本 负责从tophub.today网站抓取数据,根据指定规则过滤并保存 """ import requests from lxml import html import json import time import os import re from datetime import datetime from loguru import logger # 配置日志 logger.add("tophub_scraper.log", rotation="10 MB", level="INFO") class TopHubScraper: """TopHub网站数据抓取器""" def __init__(self): """ 初始化抓取器 """ self.base_url = "https://tophub.today/" self.ban_list_file = "tophub_ban_column.txt" self.session = requests.Session() self.session.headers.update({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }) self.ban_list = self.load_ban_list() def load_ban_list(self): """ 加载需要过滤的栏目列表 Returns: set: 需要过滤的栏目集合 """ ban_list = set() try: if os.path.exists(self.ban_list_file): with open(self.ban_list_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line: ban_list.add(line) logger.info(f"已加载 {len(ban_list)} 个需要过滤的栏目") else: logger.warning(f"过滤文件 {self.ban_list_file} 不存在,将不过滤任何栏目") except Exception as e: logger.error(f"加载过滤文件失败: {e}") return ban_list def fetch_webpage(self): """ 获取网页内容 Returns: str: 网页HTML内容 """ logger.info(f"正在获取网页内容: {self.base_url}") try: response = self.session.get(self.base_url, timeout=10) response.raise_for_status() logger.info("网页内容获取成功") return response.text except requests.RequestException as e: logger.error(f"获取网页内容失败: {e}") raise def scrape_by_node_ids(self): """ 根据节点ID范围抓取数据 Returns: list: 包含已抓取数据的列表 """ try: # 1. 获取网页内容 html_content = self.fetch_webpage() tree = html.fromstring(html_content) # 2. 创建输出文件名(基于当前日期时间) now = datetime.now() output_file = f"{now.year}年{now.month}月{now.day}日{now.hour}{now.minute}{now.second}.txt" scraped_data = [] # 3. 遍历节点ID范围 for node_id in range(1, 1000): # 从1到999 xpath = f'//*[@id="node-{node_id}"]' logger.info(f"正在查找节点: {xpath}") # 查找节点 nodes = tree.xpath(xpath) if not nodes: continue # 没有找到节点,跳过下一个数字 node = nodes[0] # 查找span标签 spans = node.xpath('.//span') if not spans: logger.info(f"节点 {node_id} 中未找到span标签,跳过") continue # 获取第一个span的文本内容 span_text = spans[0].text_content().strip() if not span_text: logger.info(f"节点 {node_id} 的span标签为空,跳过") continue # 检查是否在过滤列表中(部分匹配) should_skip = False for ban_word in self.ban_list: if ban_word in span_text: logger.info(f"节点 {node_id} 的内容 '{span_text}' 包含过滤词 '{ban_word}',跳过") should_skip = True break if should_skip: continue logger.info(f"节点 {node_id} 的内容 '{span_text}' 通过过滤,继续处理") # 查找a元素 links = node.xpath('.//a') if not links: logger.info(f"节点 {node_id} 中未找到a元素,跳过") continue # 提取所有链接和文本 for link in links: link_text = link.text_content().strip() href = link.get('href', '') if link_text and href: # 补全相对链接 if not href.startswith('http'): href = f"https://tophub.today{href}" # 当category和text的值相同时,跳过当前循环 if span_text == link_text: logger.info(f"节点 {node_id} 的分类和标题相同 ({span_text}),跳过") continue scraped_data.append({ 'node_id': node_id, 'category': span_text, 'text': link_text, 'link': href }) # 4. 保存数据到文件 if scraped_data: self.save_to_file(scraped_data, output_file) logger.info(f"成功抓取 {len(scraped_data)} 条数据,保存到 {output_file}") else: logger.warning("未抓取到任何数据") return scraped_data except Exception as e: logger.error(f"抓取数据时出错: {e}") raise def save_to_file(self, data, filename): """ 将数据保存到文件 Args: data (list): 要保存的数据 filename (str): 文件名 """ try: with open(filename, 'w', encoding='utf-8') as f: for item in data: f.write(f"节点ID: {item['node_id']}\n") f.write(f"分类: {item['category']}\n") # 使用正则表达式清洗标题,去除数字序号和多余空白 title_text = item['text'] # 处理多行标题,提取实际内容 lines = title_text.strip().split('\n') if len(lines) >= 2: # 第二行通常是实际标题内容 cleaned_title = lines[1].strip() else: # 如果只有一行,尝试使用正则表达式 match = re.match(r'^\d+\s+(.+)$', title_text.strip(), re.DOTALL) if match: cleaned_title = match.group(1).strip() else: cleaned_title = title_text.strip() f.write(f"标题: {cleaned_title}\n") f.write(f"链接: {item['link']}\n") f.write("-" * 50 + "\n") logger.info(f"数据已保存到 {filename}") except Exception as e: logger.error(f"保存文件失败: {e}") raise if __name__ == "__main__": scraper = TopHubScraper() scraper.scrape_by_node_ids()