use rusqlite::{Connection, params, OptionalExtension}; use chrono::Local; use md5::{Md5, Digest}; use anyhow::{Result, Context}; use tracing::{info, debug, warn}; /// 评论数据结构 #[derive(Debug, Clone)] pub struct Comment { pub id: i64, pub content: String, pub content_hash: String, pub url: Option, pub created_at: Option, pub fetched_at: String, pub analyzed: bool, pub sentiment_score: Option, pub analyzed_at: Option, } /// 未分析评论(用于分析) #[derive(Debug, Clone)] pub struct UnanalyzedComment { pub id: i64, pub content: String, pub url: Option, } /// 分析历史记录 #[derive(Debug, Clone)] pub struct AnalysisHistory { pub id: i64, pub comment_id: i64, pub sentiment_score: f64, pub analysis_text: Option, pub created_at: String, } /// 数据库管理器 pub struct DatabaseManager { db_path: String, } impl DatabaseManager { /// 创建新的数据库管理器 pub fn new(db_path: &str) -> Result { let manager = Self { db_path: db_path.to_string(), }; manager.init_db()?; info!("数据库管理器初始化完成,数据库路径: {}", db_path); Ok(manager) } /// 获取数据库连接 fn get_connection(&self) -> Result { Connection::open(&self.db_path) .with_context(|| format!("无法打开数据库: {}", self.db_path)) } /// 初始化数据库表 fn init_db(&self) -> Result<()> { let conn = self.get_connection()?; // 评论表 conn.execute( "CREATE TABLE IF NOT EXISTS comments ( id INTEGER PRIMARY KEY AUTOINCREMENT, content TEXT NOT NULL, content_hash TEXT UNIQUE NOT NULL, url TEXT, created_at TEXT, fetched_at TEXT DEFAULT CURRENT_TIMESTAMP, analyzed INTEGER DEFAULT 0, sentiment_score REAL, analyzed_at TEXT )", [], )?; // 分析历史表 conn.execute( "CREATE TABLE IF NOT EXISTS analysis_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, comment_id INTEGER, sentiment_score REAL NOT NULL, analysis_text TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (comment_id) REFERENCES comments(id) )", [], )?; // 配置表 conn.execute( "CREATE TABLE IF NOT EXISTS config ( key TEXT PRIMARY KEY, value TEXT, updated_at TEXT DEFAULT CURRENT_TIMESTAMP )", [], )?; debug!("数据库表初始化完成"); Ok(()) } /// 计算内容哈希值用于去重 pub fn hash_content(content: &str) -> String { let mut hasher = Md5::new(); hasher.update(content.as_bytes()); format!("{:x}", hasher.finalize()) } /// 检查评论是否已存在 pub fn is_comment_exists(&self, content_hash: &str) -> Result { let conn = self.get_connection()?; let exists: Option = conn.query_row( "SELECT 1 FROM comments WHERE content_hash = ?1", [content_hash], |row| row.get(0), ).optional()?; Ok(exists.is_some()) } /// 添加单条评论,返回评论ID pub fn add_comment(&self, content: &str, url: Option<&str>) -> Result> { let content_hash = Self::hash_content(content); if self.is_comment_exists(&content_hash)? { debug!("评论已存在,跳过: {}", &content[..content.len().min(30)]); return Ok(None); } let conn = self.get_connection()?; let now = Local::now().to_rfc3339(); conn.execute( "INSERT INTO comments (content, content_hash, url, created_at) VALUES (?1, ?2, ?3, ?4)", params![content, content_hash, url, now], )?; let comment_id = conn.last_insert_rowid(); info!("添加新评论,ID: {}", comment_id); Ok(Some(comment_id)) } /// 批量添加评论,返回新添加的ID列表 pub fn add_comments_batch(&self, comments: &[(String, Option)]) -> Result> { let mut new_ids = Vec::new(); let conn = self.get_connection()?; let now = Local::now().to_rfc3339(); for (content, url) in comments { let content_hash = Self::hash_content(content); if self.is_comment_exists(&content_hash)? { debug!("评论已存在,跳过: {}", &content[..content.len().min(30)]); continue; } match conn.execute( "INSERT OR IGNORE INTO comments (content, content_hash, url, created_at) VALUES (?1, ?2, ?3, ?4)", params![content, content_hash, url.as_deref(), now], ) { Ok(rows) if rows > 0 => { new_ids.push(conn.last_insert_rowid()); } Ok(_) => {} Err(e) => { warn!("插入评论失败(可能已存在): {}", e); } } } info!("批量添加评论完成,新增 {} 条", new_ids.len()); Ok(new_ids) } /// 获取未分析的评论 pub fn get_unanalyzed_comments(&self, limit: i64) -> Result> { let conn = self.get_connection()?; let mut stmt = conn.prepare( "SELECT id, content, url FROM comments WHERE analyzed = 0 ORDER BY fetched_at ASC LIMIT ?1" )?; let comments = stmt.query_map([limit], |row| { Ok(UnanalyzedComment { id: row.get(0)?, content: row.get(1)?, url: row.get(2)?, }) })? .collect::, _>>()?; debug!("获取到 {} 条未分析评论", comments.len()); Ok(comments) } /// 标记评论已分析 pub fn mark_analyzed(&self, comment_id: i64, sentiment_score: f64, analysis_text: &str) -> Result<()> { let conn = self.get_connection()?; let now = Local::now().to_rfc3339(); // 更新评论状态 conn.execute( "UPDATE comments SET analyzed = 1, sentiment_score = ?1, analyzed_at = ?2 WHERE id = ?3", params![sentiment_score, now, comment_id], )?; // 添加分析历史 conn.execute( "INSERT INTO analysis_history (comment_id, sentiment_score, analysis_text) VALUES (?1, ?2, ?3)", params![comment_id, sentiment_score, analysis_text], )?; debug!("标记评论 {} 已分析,分数: {}", comment_id, sentiment_score); Ok(()) } /// 获取最新的情感分数 pub fn get_latest_sentiment_score(&self) -> Result> { let conn = self.get_connection()?; let score: Option = conn.query_row( "SELECT sentiment_score FROM comments WHERE analyzed = 1 AND sentiment_score IS NOT NULL ORDER BY analyzed_at DESC LIMIT 1", [], |row| row.get(0), ).optional()?; debug!("最新情感分数: {:?}", score); Ok(score) } /// 获取已分析的分数,可指定数量限制 pub fn get_all_scores(&self, limit: Option) -> Result> { let conn = self.get_connection()?; let scores: Vec = if let Some(limit) = limit { let mut stmt = conn.prepare( "SELECT sentiment_score FROM comments WHERE analyzed = 1 AND sentiment_score IS NOT NULL ORDER BY analyzed_at DESC LIMIT ?1" )?; stmt.query_map([limit], |row| row.get(0))? .collect::, _>>()? } else { let mut stmt = conn.prepare( "SELECT sentiment_score FROM comments WHERE analyzed = 1 AND sentiment_score IS NOT NULL ORDER BY analyzed_at DESC" )?; stmt.query_map([], |row| row.get(0))? .collect::, _>>()? }; let valid_scores: Vec = scores.into_iter().filter(|s| !s.is_nan()).collect(); debug!("获取到 {} 个分数", valid_scores.len()); Ok(valid_scores) } /// 获取评论总数 pub fn get_comment_count(&self) -> Result { let conn = self.get_connection()?; let count: i64 = conn.query_row( "SELECT COUNT(*) FROM comments", [], |row| row.get(0), )?; debug!("评论总数: {}", count); Ok(count) } /// 获取已分析评论数 pub fn get_analyzed_count(&self) -> Result { let conn = self.get_connection()?; let count: i64 = conn.query_row( "SELECT COUNT(*) FROM comments WHERE analyzed = 1", [], |row| row.get(0), )?; debug!("已分析评论数: {}", count); Ok(count) } /// 获取最近的评论 pub fn get_recent_comments(&self, limit: i64) -> Result> { let conn = self.get_connection()?; let mut stmt = conn.prepare( "SELECT id, content, sentiment_score, analyzed_at FROM comments ORDER BY fetched_at DESC LIMIT ?1" )?; let comments = stmt.query_map([limit], |row| { let content: String = row.get(1)?; let display_content = if content.len() > 50 { format!("{}...", &content[..50]) } else { content.clone() }; Ok(Comment { id: row.get(0)?, content: display_content, content_hash: String::new(), url: None, created_at: None, fetched_at: String::new(), analyzed: row.get::<_, Option>(2)?.is_some(), sentiment_score: row.get(2)?, analyzed_at: row.get(3)?, }) })? .collect::, _>>()?; debug!("获取到 {} 条最近评论", comments.len()); Ok(comments) } }