283 lines
11 KiB
Python
283 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
TopHub网站数据抓取脚本
|
||
负责从tophub.today网站抓取数据,根据指定规则过滤并保存
|
||
"""
|
||
|
||
import requests
|
||
from lxml import html
|
||
import json
|
||
import time
|
||
import os
|
||
import re
|
||
import subprocess
|
||
import sys
|
||
from datetime import datetime
|
||
from loguru import logger
|
||
|
||
# 配置日志
|
||
logger.add("tophub_scraper.log", rotation="10 MB", level="INFO")
|
||
|
||
class TopHubScraper:
|
||
"""TopHub网站数据抓取器"""
|
||
|
||
def __init__(self):
|
||
"""
|
||
初始化抓取器
|
||
"""
|
||
self.base_url = "https://tophub.today/"
|
||
self.ban_list_file = "tophub_ban_column.txt"
|
||
self.session = requests.Session()
|
||
self.session.headers.update({
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
||
})
|
||
self.ban_list = self.load_ban_list()
|
||
|
||
def load_ban_list(self):
|
||
"""
|
||
加载需要过滤的栏目列表
|
||
|
||
Returns:
|
||
set: 需要过滤的栏目集合
|
||
"""
|
||
ban_list = set()
|
||
try:
|
||
if os.path.exists(self.ban_list_file):
|
||
with open(self.ban_list_file, 'r', encoding='utf-8') as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line:
|
||
ban_list.add(line)
|
||
logger.info(f"已加载 {len(ban_list)} 个需要过滤的栏目")
|
||
else:
|
||
logger.warning(f"过滤文件 {self.ban_list_file} 不存在,将不过滤任何栏目")
|
||
except Exception as e:
|
||
logger.error(f"加载过滤文件失败: {e}")
|
||
return ban_list
|
||
|
||
def fetch_webpage(self):
|
||
"""
|
||
获取网页内容
|
||
|
||
Returns:
|
||
str: 网页HTML内容
|
||
"""
|
||
logger.info(f"正在获取网页内容: {self.base_url}")
|
||
try:
|
||
response = self.session.get(self.base_url, timeout=10)
|
||
response.raise_for_status()
|
||
logger.info("网页内容获取成功")
|
||
return response.text
|
||
except requests.RequestException as e:
|
||
logger.error(f"获取网页内容失败: {e}")
|
||
raise
|
||
|
||
def delete_date_txt_files(self):
|
||
"""
|
||
删除本地目录下所有以日期格式开头的txt文件
|
||
匹配格式: YYYY年MM月DD日HHMMSS.txt
|
||
"""
|
||
logger.info("开始删除日期格式的txt文件")
|
||
deleted_count = 0
|
||
|
||
# 定义日期格式的正则表达式模式
|
||
date_pattern = r'^\d{4}年\d{1,2}月\d{1,2}日\d{6}\.txt$'
|
||
|
||
try:
|
||
# 获取当前目录下的所有txt文件
|
||
for filename in os.listdir('.'):
|
||
if filename.endswith('.txt') and re.match(date_pattern, filename):
|
||
try:
|
||
os.remove(filename)
|
||
logger.info(f"已删除文件: {filename}")
|
||
deleted_count += 1
|
||
except Exception as e:
|
||
logger.error(f"删除文件 {filename} 失败: {e}")
|
||
|
||
logger.info(f"删除完成,共删除 {deleted_count} 个日期格式的txt文件")
|
||
except Exception as e:
|
||
logger.error(f"删除文件时出错: {e}")
|
||
|
||
def scrape_by_node_ids(self):
|
||
"""
|
||
根据节点ID范围抓取数据
|
||
|
||
Returns:
|
||
list: 包含已抓取数据的列表
|
||
"""
|
||
try:
|
||
# 运行逻辑前,先删除所有日期格式的txt文件
|
||
self.delete_date_txt_files()
|
||
|
||
# 1. 获取网页内容
|
||
html_content = self.fetch_webpage()
|
||
tree = html.fromstring(html_content)
|
||
|
||
# 2. 创建输出文件名(基于当前日期时间)
|
||
now = datetime.now()
|
||
output_file = f"{now.year}年{now.month}月{now.day}日{now.hour}{now.minute}{now.second}.txt"
|
||
|
||
scraped_data = []
|
||
|
||
# 3. 遍历节点ID范围
|
||
for node_id in range(1, 1000): # 从1到999
|
||
xpath = f'//*[@id="node-{node_id}"]'
|
||
logger.info(f"正在查找节点: {xpath}")
|
||
|
||
# 查找节点
|
||
nodes = tree.xpath(xpath)
|
||
if not nodes:
|
||
continue # 没有找到节点,跳过下一个数字
|
||
|
||
node = nodes[0]
|
||
|
||
# 查找span标签
|
||
spans = node.xpath('.//span')
|
||
if not spans:
|
||
logger.info(f"节点 {node_id} 中未找到span标签,跳过")
|
||
continue
|
||
|
||
# 获取第一个span的文本内容
|
||
span_text = spans[0].text_content().strip()
|
||
if not span_text:
|
||
logger.info(f"节点 {node_id} 的span标签为空,跳过")
|
||
continue
|
||
|
||
# 检查是否在过滤列表中(部分匹配)
|
||
should_skip = False
|
||
for ban_word in self.ban_list:
|
||
if ban_word in span_text:
|
||
logger.info(f"节点 {node_id} 的内容 '{span_text}' 包含过滤词 '{ban_word}',跳过")
|
||
should_skip = True
|
||
break
|
||
|
||
if should_skip:
|
||
continue
|
||
|
||
logger.info(f"节点 {node_id} 的内容 '{span_text}' 通过过滤,继续处理")
|
||
|
||
# 查找a元素
|
||
links = node.xpath('.//a')
|
||
if not links:
|
||
logger.info(f"节点 {node_id} 中未找到a元素,跳过")
|
||
continue
|
||
|
||
# 提取所有链接和文本
|
||
for link in links:
|
||
link_text = link.text_content().strip()
|
||
href = link.get('href', '')
|
||
|
||
if link_text and href:
|
||
# 补全相对链接
|
||
if not href.startswith('http'):
|
||
href = f"https://tophub.today{href}"
|
||
|
||
# 当category和text的值相同时,跳过当前循环
|
||
if span_text == link_text:
|
||
logger.info(f"节点 {node_id} 的分类和标题相同 ({span_text}),跳过")
|
||
continue
|
||
|
||
scraped_data.append({
|
||
'node_id': node_id,
|
||
'category': span_text,
|
||
'text': link_text,
|
||
'link': href
|
||
})
|
||
|
||
# 4. 保存数据到文件
|
||
if scraped_data:
|
||
self.save_to_file(scraped_data, output_file)
|
||
logger.info(f"成功抓取 {len(scraped_data)} 条数据,保存到 {output_file}")
|
||
else:
|
||
logger.warning("未抓取到任何数据")
|
||
|
||
return scraped_data
|
||
|
||
except Exception as e:
|
||
logger.error(f"抓取数据时出错: {e}")
|
||
raise
|
||
|
||
def save_to_file(self, data, filename):
|
||
"""
|
||
将数据保存到文件
|
||
|
||
Args:
|
||
data (list): 要保存的数据
|
||
filename (str): 文件名
|
||
"""
|
||
try:
|
||
with open(filename, 'w', encoding='utf-8') as f:
|
||
for item in data:
|
||
f.write(f"节点ID: {item['node_id']}\n")
|
||
f.write(f"分类: {item['category']}\n")
|
||
|
||
# 使用正则表达式清洗标题,去除数字序号和多余空白
|
||
title_text = item['text']
|
||
# 处理多行标题,提取实际内容
|
||
lines = title_text.strip().split('\n')
|
||
if len(lines) >= 2:
|
||
# 第二行通常是实际标题内容
|
||
cleaned_title = lines[1].strip()
|
||
else:
|
||
# 如果只有一行,尝试使用正则表达式
|
||
match = re.match(r'^\d+\s+(.+)$', title_text.strip(), re.DOTALL)
|
||
if match:
|
||
cleaned_title = match.group(1).strip()
|
||
else:
|
||
cleaned_title = title_text.strip()
|
||
|
||
f.write(f"标题: {cleaned_title}\n")
|
||
f.write(f"链接: {item['link']}\n")
|
||
f.write("-" * 50 + "\n")
|
||
logger.info(f"数据已保存到 {filename}")
|
||
except Exception as e:
|
||
logger.error(f"保存文件失败: {e}")
|
||
raise
|
||
|
||
def call_add_data_script(self):
|
||
"""
|
||
调用本地的tophub_add_data_to_db.py脚本
|
||
"""
|
||
logger.info("准备调用tophub_add_data_to_db.py脚本")
|
||
|
||
try:
|
||
# 检查tophub_add_data_to_db.py是否存在
|
||
if not os.path.exists("tophub_add_data_to_db.py"):
|
||
logger.error("tophub_add_data_to_db.py文件不存在,无法调用")
|
||
return
|
||
|
||
# 调用tophub_add_data_to_db.py脚本
|
||
logger.info("正在调用tophub_add_data_to_db.py...")
|
||
result = subprocess.run([sys.executable, "tophub_add_data_to_db.py"],
|
||
capture_output=True, text=True, encoding='utf-8')
|
||
|
||
if result.returncode == 0:
|
||
logger.info("tophub_add_data_to_db.py调用成功")
|
||
if result.stdout:
|
||
logger.info(f"脚本输出: {result.stdout}")
|
||
else:
|
||
logger.error(f"tophub_add_data_to_db.py调用失败,返回码: {result.returncode}")
|
||
if result.stderr:
|
||
logger.error(f"错误信息: {result.stderr}")
|
||
if result.stdout:
|
||
logger.info(f"脚本输出: {result.stdout}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"调用tophub_add_data_to_db.py时出错: {e}")
|
||
|
||
if __name__ == "__main__":
|
||
scraper = TopHubScraper()
|
||
try:
|
||
# 抓取数据
|
||
scraped_data = scraper.scrape_by_node_ids()
|
||
|
||
# 抓取完成后调用tophub_add_data_to_db.py脚本
|
||
if scraped_data:
|
||
scraper.call_add_data_script()
|
||
else:
|
||
logger.warning("未抓取到数据,跳过调用tophub_add_data_to_db.py脚本")
|
||
|
||
except Exception as e:
|
||
logger.error(f"程序执行出错: {e}")
|
||
raise |