Files
tophux_scrape/tophub_scraper.py

304 lines
12 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
TopHub网站数据抓取脚本
负责从tophub.today网站抓取数据根据指定规则过滤并保存
"""
import requests
from lxml import html
import json
import time
import os
import re
import subprocess
import sys
from datetime import datetime
from loguru import logger
# 配置日志
logger.add("tophub_scraper.log", rotation="10 MB", level="INFO")
class TopHubScraper:
"""TopHub网站数据抓取器"""
def __init__(self):
"""
初始化抓取器
"""
self.base_url = "https://tophub.today/"
self.ban_list_file = "tophub_ban_column.txt"
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
})
self.ban_list = self.load_ban_list()
def load_ban_list(self):
"""
加载需要过滤的栏目列表
Returns:
set: 需要过滤的栏目集合
"""
ban_list = set()
try:
if os.path.exists(self.ban_list_file):
with open(self.ban_list_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
ban_list.add(line)
logger.info(f"已加载 {len(ban_list)} 个需要过滤的栏目")
else:
logger.warning(f"过滤文件 {self.ban_list_file} 不存在,将不过滤任何栏目")
except Exception as e:
logger.error(f"加载过滤文件失败: {e}")
return ban_list
def fetch_webpage(self):
"""
获取网页内容
Returns:
str: 网页HTML内容
"""
logger.info(f"正在获取网页内容: {self.base_url}")
try:
response = self.session.get(self.base_url, timeout=10)
response.raise_for_status()
logger.info("网页内容获取成功")
return response.text
except requests.RequestException as e:
logger.error(f"获取网页内容失败: {e}")
raise
def delete_date_txt_files(self):
"""
删除本地目录下所有以日期格式开头的txt文件
匹配格式: YYYY年MM月DD日HHMMSS.txt
"""
logger.info("开始删除日期格式的txt文件")
deleted_count = 0
# 定义日期格式的正则表达式模式
date_pattern = r'^\d{4}\d{1,2}月\d{1,2}日\d{6}\.txt$'
try:
# 获取当前目录下的所有txt文件
for filename in os.listdir('.'):
if filename.endswith('.txt') and re.match(date_pattern, filename):
try:
os.remove(filename)
logger.info(f"已删除文件: {filename}")
deleted_count += 1
except Exception as e:
logger.error(f"删除文件 {filename} 失败: {e}")
logger.info(f"删除完成,共删除 {deleted_count} 个日期格式的txt文件")
except Exception as e:
logger.error(f"删除文件时出错: {e}")
def scrape_by_node_ids(self):
"""
根据节点ID范围抓取数据
Returns:
list: 包含已抓取数据的列表
"""
try:
# 运行逻辑前先删除所有日期格式的txt文件
self.delete_date_txt_files()
# 1. 获取网页内容
html_content = self.fetch_webpage()
tree = html.fromstring(html_content)
# 2. 创建输出文件名(基于当前日期时间)
now = datetime.now()
output_file = f"{now.year}{now.month}{now.day}{now.hour}{now.minute}{now.second}.txt"
scraped_data = []
# 3. 遍历节点ID范围
for node_id in range(1, 1000): # 从1到999
xpath = f'//*[@id="node-{node_id}"]'
logger.info(f"正在查找节点: {xpath}")
# 查找节点
nodes = tree.xpath(xpath)
if not nodes:
continue # 没有找到节点,跳过下一个数字
node = nodes[0]
# 查找span标签
spans = node.xpath('.//span')
if not spans:
logger.info(f"节点 {node_id} 中未找到span标签跳过")
continue
# 获取第一个span的文本内容
span_text = spans[0].text_content().strip()
if not span_text:
logger.info(f"节点 {node_id} 的span标签为空跳过")
continue
# 检查是否在过滤列表中(部分匹配)
should_skip = False
for ban_word in self.ban_list:
if ban_word in span_text:
logger.info(f"节点 {node_id} 的内容 '{span_text}' 包含过滤词 '{ban_word}',跳过")
should_skip = True
break
if should_skip:
continue
logger.info(f"节点 {node_id} 的内容 '{span_text}' 通过过滤,继续处理")
# 查找a元素
links = node.xpath('.//a')
if not links:
logger.info(f"节点 {node_id} 中未找到a元素跳过")
continue
# 提取所有链接和文本
for link in links:
link_text = link.text_content().strip()
href = link.get('href', '')
if link_text and href:
# 补全相对链接
if not href.startswith('http'):
href = f"https://tophub.today{href}"
# 当category和text的值相同时跳过当前循环
if span_text == link_text:
logger.info(f"节点 {node_id} 的分类和标题相同 ({span_text}),跳过")
continue
scraped_data.append({
'node_id': node_id,
'category': span_text,
'text': link_text,
'link': href
})
# 4. 保存数据到文件
if scraped_data:
self.save_to_file(scraped_data, output_file)
logger.info(f"成功抓取 {len(scraped_data)} 条数据,保存到 {output_file}")
else:
logger.warning("未抓取到任何数据")
return scraped_data
except Exception as e:
logger.error(f"抓取数据时出错: {e}")
raise
def save_to_file(self, data, filename):
"""
将数据保存到文件
Args:
data (list): 要保存的数据
filename (str): 文件名
"""
try:
with open(filename, 'w', encoding='utf-8') as f:
for item in data:
f.write(f"节点ID: {item['node_id']}\n")
f.write(f"分类: {item['category']}\n")
# 使用正则表达式清洗标题,去除数字序号和多余空白
title_text = item['text']
# 处理多行标题,提取实际内容
lines = title_text.strip().split('\n')
if len(lines) >= 2:
# 第二行通常是实际标题内容
cleaned_title = lines[1].strip()
else:
# 如果只有一行,尝试使用正则表达式
match = re.match(r'^\d+\s+(.+)$', title_text.strip(), re.DOTALL)
if match:
cleaned_title = match.group(1).strip()
else:
cleaned_title = title_text.strip()
f.write(f"标题: {cleaned_title}\n")
f.write(f"链接: {item['link']}\n")
f.write("-" * 50 + "\n")
logger.info(f"数据已保存到 {filename}")
except Exception as e:
logger.error(f"保存文件失败: {e}")
raise
def call_add_data_script(self):
"""
调用本地的tophub_add_data_to_db.py脚本
"""
logger.info("准备调用tophub_add_data_to_db.py脚本")
try:
# 检查tophub_add_data_to_db.py是否存在
if not os.path.exists("tophub_add_data_to_db.py"):
logger.error("tophub_add_data_to_db.py文件不存在无法调用")
return
# 调用tophub_add_data_to_db.py脚本
2025-11-14 21:03:48 +08:00
logger.info("正在调用tophub_add_data_to_db.py...")
2025-11-13 22:27:05 +08:00
# 使用Popen方式处理可能的编码问题
process = subprocess.Popen([sys.executable, "tophub_add_data_to_db.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding='utf-8',
errors='replace') # 使用replace模式处理无法解码的字符
# 实时读取输出以避免编码问题
try:
2026-01-12 20:36:44 +08:00
stdout, stderr = process.communicate(timeout=3600) # 1小时超时
2025-11-13 22:27:05 +08:00
except subprocess.TimeoutExpired:
process.kill()
logger.error("tophub_add_data_to_db.py执行超时")
return
if process.returncode == 0:
logger.info("tophub_add_data_to_db.py调用成功")
2025-11-13 22:27:05 +08:00
if stdout:
logger.info(f"脚本输出: {stdout}")
else:
2025-11-13 22:27:05 +08:00
logger.error(f"tophub_add_data_to_db.py调用失败返回码: {process.returncode}")
if stderr:
logger.error(f"错误信息: {stderr}")
if stdout:
logger.info(f"脚本输出: {stdout}")
2025-11-13 22:27:05 +08:00
except UnicodeDecodeError as e:
logger.error(f"编码解码错误: {e}")
logger.info("可能是脚本输出包含非UTF-8编码字符已尝试使用replace模式处理")
except Exception as e:
logger.error(f"调用tophub_add_data_to_db.py时出错: {e}")
if __name__ == "__main__":
scraper = TopHubScraper()
try:
# 抓取数据
scraped_data = scraper.scrape_by_node_ids()
# 抓取完成后调用tophub_add_data_to_db.py脚本
if scraped_data:
scraper.call_add_data_script()
else:
logger.warning("未抓取到数据跳过调用tophub_add_data_to_db.py脚本")
except Exception as e:
logger.error(f"程序执行出错: {e}")
raise