#!/usr/bin/env python3 """ 处理临时文件并写入数据库的脚本 读取指定格式的临时文件,提取标题和链接,调用API进行分类,然后写入SQLite数据库 """ import sqlite3 import requests import os import re from datetime import datetime from tqdm import tqdm from loguru import logger import glob # 配置日志 logger.add("tophub_add_data_to_db.log", rotation="10 MB", level="INFO") # API配置 API_URL = "http://localhost:11434/api/generate" API_MODEL = "gemma3:4b" def init_database(): """初始化数据库,创建表结构""" conn = sqlite3.connect('tophub_data.db') cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS articles ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, url TEXT NOT NULL, category TEXT, source_date TEXT NOT NULL, created_at TEXT NOT NULL, UNIQUE(title, source_date) ) ''') conn.commit() conn.close() logger.info("数据库初始化完成") def find_temp_files(): """查找符合格式的临时文件""" pattern = "*年*月*日*.txt" files = glob.glob(pattern) logger.info(f"找到 {len(files)} 个临时文件: {files}") return files def parse_file_content(file_path): """解析文件内容,按5行一个循环提取数据""" articles = [] try: with open(file_path, 'r', encoding='utf-8') as f: lines = f.readlines() # 按5行一组进行解析 for i in range(0, len(lines), 5): if i + 4 < len(lines): node_id = lines[i].strip() category = lines[i+1].strip() title = lines[i+2].strip() url = lines[i+3].strip() separator = lines[i+4].strip() if i+4 < len(lines) else "" # 提取关键信息 title_match = re.search(r'标题: (.+)', title) url_match = re.search(r'链接: (.+)', url) if title_match and url_match: articles.append({ 'title': title_match.group(1), 'url': url_match.group(1), 'category': category.split(': ')[1] if ': ' in category else '未知' }) logger.info(f"从文件 {file_path} 解析出 {len(articles)} 条数据") return articles except Exception as e: logger.error(f"解析文件 {file_path} 失败: {e}") return [] def check_duplicate(title, date_str): """检查标题在最近三天(前天、昨天和今天)是否已存在""" from datetime import datetime, timedelta conn = sqlite3.connect('tophub_data.db') cursor = conn.cursor() try: # 将输入日期字符串转换为datetime对象 current_date = datetime.strptime(date_str, '%Y-%m-%d') # 计算前天、昨天和今天的日期 yesterday = current_date - timedelta(days=1) day_before_yesterday = current_date - timedelta(days=2) # 检查这三天内是否有相同标题的文章 cursor.execute(''' SELECT COUNT(*) FROM articles WHERE title = ? AND source_date IN (?, ?, ?) ''', (title, day_before_yesterday.strftime('%Y-%m-%d'), yesterday.strftime('%Y-%m-%d'), date_str)) count = cursor.fetchone()[0] logger.info(f"检查标题 '{title}' 在最近三天的重复情况: 找到 {count} 条相同记录") return count > 0 finally: conn.close() def classify_title(title): """调用API对标题进行分类""" try: prompt = f"目标:对以下文字内容进行分类,返回结果为类别,如\"社会新闻\",\"金融\",\"历史\",\"购物\",\"新质科技\"等等。目的:只返回2-4个字,不返回其它内容。内容:{title}" data = { "model": API_MODEL, "prompt": prompt, "stream": False } response = requests.post(API_URL, json=data, timeout=30) response.raise_for_status() result = response.json() category = result.get('response', '').strip() # 验证分类结果长度 if len(category) < 2 or len(category) > 8: category = '其他' logger.info(f"标题 '{title}' 分类为: {category}") return category except Exception as e: logger.error(f"API调用失败,标题 '{title}': {e}") return '其他' def insert_article(title, url, category, source_date): """插入文章到数据库""" conn = sqlite3.connect('tophub_data.db') cursor = conn.cursor() try: created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S') cursor.execute(''' INSERT INTO articles (title, url, category, source_date, created_at) VALUES (?, ?, ?, ?, ?) ''', (title, url, category, source_date, created_at)) conn.commit() logger.info(f"成功插入文章: {title}") return True except sqlite3.IntegrityError: logger.warning(f"文章已存在,跳过: {title}") return False except Exception as e: logger.error(f"插入文章失败: {e}") return False finally: conn.close() def process_temp_files(): """主处理函数""" logger.info("开始处理临时文件...") # 初始化数据库 init_database() # 查找临时文件 temp_files = find_temp_files() if not temp_files: logger.warning("未找到临时文件") return total_processed = 0 total_inserted = 0 # 处理每个文件 for file_path in temp_files: logger.info(f"处理文件: {file_path}") # 从文件名提取日期 date_match = re.search(r'(\d{4})年(\d{1,2})月(\d{1,2})日', file_path) if date_match: source_date = f"{date_match.group(1)}-{int(date_match.group(2)):02d}-{int(date_match.group(3)):02d}" else: source_date = datetime.now().strftime('%Y-%m-%d') # 解析文件内容 articles = parse_file_content(file_path) if not articles: continue # 处理每篇文章 for i, article in tqdm(enumerate(articles), desc=f"处理 {file_path}", total=len(articles)): total_processed += 1 # 每处理10篇文章记录一次进度 if i % 10 == 0 and i > 0: logger.info(f"已处理 {i}/{len(articles)} 篇文章,完成 {i/len(articles)*100:.1f}%") # 检查重复 if check_duplicate(article['title'], source_date): logger.info(f"跳过重复文章(最近三天已存在): {article['title']}") continue # 分类标题 category = classify_title(article['title']) # 插入数据库 if insert_article(article['title'], article['url'], category, source_date): total_inserted += 1 logger.info(f"处理完成! 总计处理: {total_processed}, 成功插入: {total_inserted}") if __name__ == "__main__": try: process_temp_files() except Exception as e: logger.error(f"程序执行失败: {e}") raise