Files
tophux_scrape/tophub_add_data_to_db.py

234 lines
7.5 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
处理临时文件并写入数据库的脚本
读取指定格式的临时文件提取标题和链接调用API进行分类然后写入SQLite数据库
"""
import sqlite3
import requests
import os
import re
from datetime import datetime
from tqdm import tqdm
from loguru import logger
import glob
# 配置日志
logger.add("tophub_add_data_to_db.log", rotation="10 MB", level="INFO")
# API配置
API_URL = "http://localhost:11434/api/generate"
API_MODEL = "gemma3:4b"
def init_database():
"""初始化数据库,创建表结构"""
conn = sqlite3.connect('tophub_data.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
url TEXT NOT NULL,
category TEXT,
source_date TEXT NOT NULL,
created_at TEXT NOT NULL,
UNIQUE(title, source_date)
)
''')
conn.commit()
conn.close()
logger.info("数据库初始化完成")
def find_temp_files():
"""查找符合格式的临时文件"""
pattern = "*年*月*日*.txt"
files = glob.glob(pattern)
logger.info(f"找到 {len(files)} 个临时文件: {files}")
return files
def parse_file_content(file_path):
"""解析文件内容按5行一个循环提取数据"""
articles = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 按5行一组进行解析
for i in range(0, len(lines), 5):
if i + 4 < len(lines):
node_id = lines[i].strip()
category = lines[i+1].strip()
title = lines[i+2].strip()
url = lines[i+3].strip()
separator = lines[i+4].strip() if i+4 < len(lines) else ""
# 提取关键信息
title_match = re.search(r'标题: (.+)', title)
url_match = re.search(r'链接: (.+)', url)
if title_match and url_match:
articles.append({
'title': title_match.group(1),
'url': url_match.group(1),
'category': category.split(': ')[1] if ': ' in category else '未知'
})
logger.info(f"从文件 {file_path} 解析出 {len(articles)} 条数据")
return articles
except Exception as e:
logger.error(f"解析文件 {file_path} 失败: {e}")
return []
def check_duplicate(title, date_str):
"""检查标题在最近三天(前天、昨天和今天)是否已存在"""
from datetime import datetime, timedelta
conn = sqlite3.connect('tophub_data.db')
cursor = conn.cursor()
try:
# 将输入日期字符串转换为datetime对象
current_date = datetime.strptime(date_str, '%Y-%m-%d')
# 计算前天、昨天和今天的日期
yesterday = current_date - timedelta(days=1)
day_before_yesterday = current_date - timedelta(days=2)
# 检查这三天内是否有相同标题的文章
cursor.execute('''
SELECT COUNT(*) FROM articles
WHERE title = ? AND source_date IN (?, ?, ?)
''', (title,
day_before_yesterday.strftime('%Y-%m-%d'),
yesterday.strftime('%Y-%m-%d'),
date_str))
count = cursor.fetchone()[0]
logger.info(f"检查标题 '{title}' 在最近三天的重复情况: 找到 {count} 条相同记录")
return count > 0
finally:
conn.close()
def classify_title(title):
"""调用API对标题进行分类"""
try:
prompt = f"目标:对以下文字内容进行分类,返回结果为类别,如\"社会新闻\"\"金融\"\"历史\"\"购物\"\"新质科技\"等等。目的只返回2-4个字不返回其它内容。内容{title}"
data = {
"model": API_MODEL,
"prompt": prompt,
"stream": False
}
response = requests.post(API_URL, json=data, timeout=30)
response.raise_for_status()
result = response.json()
category = result.get('response', '').strip()
# 验证分类结果长度
if len(category) < 2 or len(category) > 8:
category = '其他'
logger.info(f"标题 '{title}' 分类为: {category}")
return category
except Exception as e:
logger.error(f"API调用失败标题 '{title}': {e}")
return '其他'
def insert_article(title, url, category, source_date):
"""插入文章到数据库"""
conn = sqlite3.connect('tophub_data.db')
cursor = conn.cursor()
try:
created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('''
INSERT INTO articles (title, url, category, source_date, created_at)
VALUES (?, ?, ?, ?, ?)
''', (title, url, category, source_date, created_at))
conn.commit()
logger.info(f"成功插入文章: {title}")
return True
except sqlite3.IntegrityError:
logger.warning(f"文章已存在,跳过: {title}")
return False
except Exception as e:
logger.error(f"插入文章失败: {e}")
return False
finally:
conn.close()
def process_temp_files():
"""主处理函数"""
logger.info("开始处理临时文件...")
# 初始化数据库
init_database()
# 查找临时文件
temp_files = find_temp_files()
if not temp_files:
logger.warning("未找到临时文件")
return
total_processed = 0
total_inserted = 0
# 处理每个文件
for file_path in temp_files:
logger.info(f"处理文件: {file_path}")
# 从文件名提取日期
date_match = re.search(r'(\d{4})年(\d{1,2})月(\d{1,2})日', file_path)
if date_match:
source_date = f"{date_match.group(1)}-{int(date_match.group(2)):02d}-{int(date_match.group(3)):02d}"
else:
source_date = datetime.now().strftime('%Y-%m-%d')
# 解析文件内容
articles = parse_file_content(file_path)
if not articles:
continue
# 处理每篇文章
2026-01-12 20:36:44 +08:00
for i, article in tqdm(enumerate(articles), desc=f"处理 {file_path}", total=len(articles)):
total_processed += 1
2026-01-12 20:36:44 +08:00
# 每处理10篇文章记录一次进度
if i % 10 == 0 and i > 0:
logger.info(f"已处理 {i}/{len(articles)} 篇文章,完成 {i/len(articles)*100:.1f}%")
# 检查重复
if check_duplicate(article['title'], source_date):
logger.info(f"跳过重复文章(最近三天已存在): {article['title']}")
continue
# 分类标题
category = classify_title(article['title'])
# 插入数据库
if insert_article(article['title'], article['url'], category, source_date):
total_inserted += 1
logger.info(f"处理完成! 总计处理: {total_processed}, 成功插入: {total_inserted}")
if __name__ == "__main__":
try:
process_temp_files()
except Exception as e:
logger.error(f"程序执行失败: {e}")
raise