234 lines
7.5 KiB
Python
234 lines
7.5 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
处理临时文件并写入数据库的脚本
|
||
读取指定格式的临时文件,提取标题和链接,调用API进行分类,然后写入SQLite数据库
|
||
"""
|
||
|
||
import sqlite3
|
||
import requests
|
||
import os
|
||
import re
|
||
from datetime import datetime
|
||
from tqdm import tqdm
|
||
from loguru import logger
|
||
import glob
|
||
|
||
# 配置日志
|
||
logger.add("tophub_add_data_to_db.log", rotation="10 MB", level="INFO")
|
||
|
||
# API配置
|
||
API_URL = "http://localhost:11434/api/generate"
|
||
API_MODEL = "gemma3:4b"
|
||
|
||
def init_database():
|
||
"""初始化数据库,创建表结构"""
|
||
conn = sqlite3.connect('tophub_data.db')
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS articles (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
title TEXT NOT NULL,
|
||
url TEXT NOT NULL,
|
||
category TEXT,
|
||
source_date TEXT NOT NULL,
|
||
created_at TEXT NOT NULL,
|
||
UNIQUE(title, source_date)
|
||
)
|
||
''')
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
logger.info("数据库初始化完成")
|
||
|
||
def find_temp_files():
|
||
"""查找符合格式的临时文件"""
|
||
pattern = "*年*月*日*.txt"
|
||
files = glob.glob(pattern)
|
||
logger.info(f"找到 {len(files)} 个临时文件: {files}")
|
||
return files
|
||
|
||
def parse_file_content(file_path):
|
||
"""解析文件内容,按5行一个循环提取数据"""
|
||
articles = []
|
||
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
lines = f.readlines()
|
||
|
||
# 按5行一组进行解析
|
||
for i in range(0, len(lines), 5):
|
||
if i + 4 < len(lines):
|
||
node_id = lines[i].strip()
|
||
category = lines[i+1].strip()
|
||
title = lines[i+2].strip()
|
||
url = lines[i+3].strip()
|
||
separator = lines[i+4].strip() if i+4 < len(lines) else ""
|
||
|
||
# 提取关键信息
|
||
title_match = re.search(r'标题: (.+)', title)
|
||
url_match = re.search(r'链接: (.+)', url)
|
||
|
||
if title_match and url_match:
|
||
articles.append({
|
||
'title': title_match.group(1),
|
||
'url': url_match.group(1),
|
||
'category': category.split(': ')[1] if ': ' in category else '未知'
|
||
})
|
||
|
||
logger.info(f"从文件 {file_path} 解析出 {len(articles)} 条数据")
|
||
return articles
|
||
|
||
except Exception as e:
|
||
logger.error(f"解析文件 {file_path} 失败: {e}")
|
||
return []
|
||
|
||
def check_duplicate(title, date_str):
|
||
"""检查标题在最近三天(前天、昨天和今天)是否已存在"""
|
||
from datetime import datetime, timedelta
|
||
|
||
conn = sqlite3.connect('tophub_data.db')
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
# 将输入日期字符串转换为datetime对象
|
||
current_date = datetime.strptime(date_str, '%Y-%m-%d')
|
||
|
||
# 计算前天、昨天和今天的日期
|
||
yesterday = current_date - timedelta(days=1)
|
||
day_before_yesterday = current_date - timedelta(days=2)
|
||
|
||
# 检查这三天内是否有相同标题的文章
|
||
cursor.execute('''
|
||
SELECT COUNT(*) FROM articles
|
||
WHERE title = ? AND source_date IN (?, ?, ?)
|
||
''', (title,
|
||
day_before_yesterday.strftime('%Y-%m-%d'),
|
||
yesterday.strftime('%Y-%m-%d'),
|
||
date_str))
|
||
|
||
count = cursor.fetchone()[0]
|
||
logger.info(f"检查标题 '{title}' 在最近三天的重复情况: 找到 {count} 条相同记录")
|
||
|
||
return count > 0
|
||
|
||
finally:
|
||
conn.close()
|
||
|
||
def classify_title(title):
|
||
"""调用API对标题进行分类"""
|
||
try:
|
||
prompt = f"目标:对以下文字内容进行分类,返回结果为类别,如\"社会新闻\",\"金融\",\"历史\",\"购物\",\"新质科技\"等等。目的:只返回2-4个字,不返回其它内容。内容:{title}"
|
||
|
||
data = {
|
||
"model": API_MODEL,
|
||
"prompt": prompt,
|
||
"stream": False
|
||
}
|
||
|
||
response = requests.post(API_URL, json=data, timeout=30)
|
||
response.raise_for_status()
|
||
|
||
result = response.json()
|
||
category = result.get('response', '').strip()
|
||
|
||
# 验证分类结果长度
|
||
if len(category) < 2 or len(category) > 8:
|
||
category = '其他'
|
||
|
||
logger.info(f"标题 '{title}' 分类为: {category}")
|
||
return category
|
||
|
||
except Exception as e:
|
||
logger.error(f"API调用失败,标题 '{title}': {e}")
|
||
return '其他'
|
||
|
||
def insert_article(title, url, category, source_date):
|
||
"""插入文章到数据库"""
|
||
conn = sqlite3.connect('tophub_data.db')
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
cursor.execute('''
|
||
INSERT INTO articles (title, url, category, source_date, created_at)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
''', (title, url, category, source_date, created_at))
|
||
|
||
conn.commit()
|
||
logger.info(f"成功插入文章: {title}")
|
||
return True
|
||
|
||
except sqlite3.IntegrityError:
|
||
logger.warning(f"文章已存在,跳过: {title}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"插入文章失败: {e}")
|
||
return False
|
||
|
||
finally:
|
||
conn.close()
|
||
|
||
def process_temp_files():
|
||
"""主处理函数"""
|
||
logger.info("开始处理临时文件...")
|
||
|
||
# 初始化数据库
|
||
init_database()
|
||
|
||
# 查找临时文件
|
||
temp_files = find_temp_files()
|
||
|
||
if not temp_files:
|
||
logger.warning("未找到临时文件")
|
||
return
|
||
|
||
total_processed = 0
|
||
total_inserted = 0
|
||
|
||
# 处理每个文件
|
||
for file_path in temp_files:
|
||
logger.info(f"处理文件: {file_path}")
|
||
|
||
# 从文件名提取日期
|
||
date_match = re.search(r'(\d{4})年(\d{1,2})月(\d{1,2})日', file_path)
|
||
if date_match:
|
||
source_date = f"{date_match.group(1)}-{int(date_match.group(2)):02d}-{int(date_match.group(3)):02d}"
|
||
else:
|
||
source_date = datetime.now().strftime('%Y-%m-%d')
|
||
|
||
# 解析文件内容
|
||
articles = parse_file_content(file_path)
|
||
|
||
if not articles:
|
||
continue
|
||
|
||
# 处理每篇文章
|
||
for i, article in tqdm(enumerate(articles), desc=f"处理 {file_path}", total=len(articles)):
|
||
total_processed += 1
|
||
|
||
# 每处理10篇文章记录一次进度
|
||
if i % 10 == 0 and i > 0:
|
||
logger.info(f"已处理 {i}/{len(articles)} 篇文章,完成 {i/len(articles)*100:.1f}%")
|
||
|
||
# 检查重复
|
||
if check_duplicate(article['title'], source_date):
|
||
logger.info(f"跳过重复文章(最近三天已存在): {article['title']}")
|
||
continue
|
||
|
||
# 分类标题
|
||
category = classify_title(article['title'])
|
||
|
||
# 插入数据库
|
||
if insert_article(article['title'], article['url'], category, source_date):
|
||
total_inserted += 1
|
||
|
||
logger.info(f"处理完成! 总计处理: {total_processed}, 成功插入: {total_inserted}")
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
process_temp_files()
|
||
except Exception as e:
|
||
logger.error(f"程序执行失败: {e}")
|
||
raise |