Files
guba-indicator/src/database.rs

335 lines
10 KiB
Rust
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use rusqlite::{Connection, params, OptionalExtension};
use chrono::Local;
use md5::{Md5, Digest};
use anyhow::{Result, Context};
use tracing::{info, debug, warn};
/// 评论数据结构
#[derive(Debug, Clone)]
pub struct Comment {
pub id: i64,
pub content: String,
pub content_hash: String,
pub url: Option<String>,
pub created_at: Option<String>,
pub fetched_at: String,
pub analyzed: bool,
pub sentiment_score: Option<f64>,
pub analyzed_at: Option<String>,
}
/// 未分析评论(用于分析)
#[derive(Debug, Clone)]
pub struct UnanalyzedComment {
pub id: i64,
pub content: String,
pub url: Option<String>,
}
/// 分析历史记录
#[derive(Debug, Clone)]
pub struct AnalysisHistory {
pub id: i64,
pub comment_id: i64,
pub sentiment_score: f64,
pub analysis_text: Option<String>,
pub created_at: String,
}
/// 数据库管理器
pub struct DatabaseManager {
db_path: String,
}
impl DatabaseManager {
/// 创建新的数据库管理器
pub fn new(db_path: &str) -> Result<Self> {
let manager = Self {
db_path: db_path.to_string(),
};
manager.init_db()?;
info!("数据库管理器初始化完成,数据库路径: {}", db_path);
Ok(manager)
}
/// 获取数据库连接
fn get_connection(&self) -> Result<Connection> {
Connection::open(&self.db_path)
.with_context(|| format!("无法打开数据库: {}", self.db_path))
}
/// 初始化数据库表
fn init_db(&self) -> Result<()> {
let conn = self.get_connection()?;
// 评论表
conn.execute(
"CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL,
content_hash TEXT UNIQUE NOT NULL,
url TEXT,
created_at TEXT,
fetched_at TEXT DEFAULT CURRENT_TIMESTAMP,
analyzed INTEGER DEFAULT 0,
sentiment_score REAL,
analyzed_at TEXT
)",
[],
)?;
// 分析历史表
conn.execute(
"CREATE TABLE IF NOT EXISTS analysis_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
comment_id INTEGER,
sentiment_score REAL NOT NULL,
analysis_text TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (comment_id) REFERENCES comments(id)
)",
[],
)?;
// 配置表
conn.execute(
"CREATE TABLE IF NOT EXISTS config (
key TEXT PRIMARY KEY,
value TEXT,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)",
[],
)?;
debug!("数据库表初始化完成");
Ok(())
}
/// 计算内容哈希值用于去重
pub fn hash_content(content: &str) -> String {
let mut hasher = Md5::new();
hasher.update(content.as_bytes());
format!("{:x}", hasher.finalize())
}
/// 检查评论是否已存在
pub fn is_comment_exists(&self, content_hash: &str) -> Result<bool> {
let conn = self.get_connection()?;
let exists: Option<i64> = conn.query_row(
"SELECT 1 FROM comments WHERE content_hash = ?1",
[content_hash],
|row| row.get(0),
).optional()?;
Ok(exists.is_some())
}
/// 添加单条评论返回评论ID
pub fn add_comment(&self, content: &str, url: Option<&str>) -> Result<Option<i64>> {
let content_hash = Self::hash_content(content);
if self.is_comment_exists(&content_hash)? {
debug!("评论已存在,跳过: {}", &content[..content.len().min(30)]);
return Ok(None);
}
let conn = self.get_connection()?;
let now = Local::now().to_rfc3339();
conn.execute(
"INSERT INTO comments (content, content_hash, url, created_at)
VALUES (?1, ?2, ?3, ?4)",
params![content, content_hash, url, now],
)?;
let comment_id = conn.last_insert_rowid();
info!("添加新评论ID: {}", comment_id);
Ok(Some(comment_id))
}
/// 批量添加评论返回新添加的ID列表
pub fn add_comments_batch(&self, comments: &[(String, Option<String>)]) -> Result<Vec<i64>> {
let mut new_ids = Vec::new();
let conn = self.get_connection()?;
let now = Local::now().to_rfc3339();
for (content, url) in comments {
let content_hash = Self::hash_content(content);
if self.is_comment_exists(&content_hash)? {
debug!("评论已存在,跳过: {}", &content[..content.len().min(30)]);
continue;
}
match conn.execute(
"INSERT OR IGNORE INTO comments (content, content_hash, url, created_at)
VALUES (?1, ?2, ?3, ?4)",
params![content, content_hash, url.as_deref(), now],
) {
Ok(rows) if rows > 0 => {
new_ids.push(conn.last_insert_rowid());
}
Ok(_) => {}
Err(e) => {
warn!("插入评论失败(可能已存在): {}", e);
}
}
}
info!("批量添加评论完成,新增 {} 条", new_ids.len());
Ok(new_ids)
}
/// 获取未分析的评论
pub fn get_unanalyzed_comments(&self, limit: i64) -> Result<Vec<UnanalyzedComment>> {
let conn = self.get_connection()?;
let mut stmt = conn.prepare(
"SELECT id, content, url FROM comments
WHERE analyzed = 0
ORDER BY fetched_at ASC
LIMIT ?1"
)?;
let comments = stmt.query_map([limit], |row| {
Ok(UnanalyzedComment {
id: row.get(0)?,
content: row.get(1)?,
url: row.get(2)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;
debug!("获取到 {} 条未分析评论", comments.len());
Ok(comments)
}
/// 标记评论已分析
pub fn mark_analyzed(&self, comment_id: i64, sentiment_score: f64, analysis_text: &str) -> Result<()> {
let conn = self.get_connection()?;
let now = Local::now().to_rfc3339();
// 更新评论状态
conn.execute(
"UPDATE comments
SET analyzed = 1, sentiment_score = ?1, analyzed_at = ?2
WHERE id = ?3",
params![sentiment_score, now, comment_id],
)?;
// 添加分析历史
conn.execute(
"INSERT INTO analysis_history (comment_id, sentiment_score, analysis_text)
VALUES (?1, ?2, ?3)",
params![comment_id, sentiment_score, analysis_text],
)?;
debug!("标记评论 {} 已分析,分数: {}", comment_id, sentiment_score);
Ok(())
}
/// 获取最新的情感分数
pub fn get_latest_sentiment_score(&self) -> Result<Option<f64>> {
let conn = self.get_connection()?;
let score: Option<f64> = conn.query_row(
"SELECT sentiment_score FROM comments
WHERE analyzed = 1 AND sentiment_score IS NOT NULL
ORDER BY analyzed_at DESC
LIMIT 1",
[],
|row| row.get(0),
).optional()?;
debug!("最新情感分数: {:?}", score);
Ok(score)
}
/// 获取已分析的分数,可指定数量限制
pub fn get_all_scores(&self, limit: Option<i64>) -> Result<Vec<f64>> {
let conn = self.get_connection()?;
let scores: Vec<f64> = if let Some(limit) = limit {
let mut stmt = conn.prepare(
"SELECT sentiment_score FROM comments
WHERE analyzed = 1 AND sentiment_score IS NOT NULL
ORDER BY analyzed_at DESC
LIMIT ?1"
)?;
stmt.query_map([limit], |row| row.get(0))?
.collect::<Result<Vec<_>, _>>()?
} else {
let mut stmt = conn.prepare(
"SELECT sentiment_score FROM comments
WHERE analyzed = 1 AND sentiment_score IS NOT NULL
ORDER BY analyzed_at DESC"
)?;
stmt.query_map([], |row| row.get(0))?
.collect::<Result<Vec<_>, _>>()?
};
let valid_scores: Vec<f64> = scores.into_iter().filter(|s| !s.is_nan()).collect();
debug!("获取到 {} 个分数", valid_scores.len());
Ok(valid_scores)
}
/// 获取评论总数
pub fn get_comment_count(&self) -> Result<i64> {
let conn = self.get_connection()?;
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM comments",
[],
|row| row.get(0),
)?;
debug!("评论总数: {}", count);
Ok(count)
}
/// 获取已分析评论数
pub fn get_analyzed_count(&self) -> Result<i64> {
let conn = self.get_connection()?;
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM comments WHERE analyzed = 1",
[],
|row| row.get(0),
)?;
debug!("已分析评论数: {}", count);
Ok(count)
}
/// 获取最近的评论
pub fn get_recent_comments(&self, limit: i64) -> Result<Vec<Comment>> {
let conn = self.get_connection()?;
let mut stmt = conn.prepare(
"SELECT id, content, sentiment_score, analyzed_at
FROM comments
ORDER BY fetched_at DESC
LIMIT ?1"
)?;
let comments = stmt.query_map([limit], |row| {
let content: String = row.get(1)?;
let display_content = if content.len() > 50 {
format!("{}...", &content[..50])
} else {
content.clone()
};
Ok(Comment {
id: row.get(0)?,
content: display_content,
content_hash: String::new(),
url: None,
created_at: None,
fetched_at: String::new(),
analyzed: row.get::<_, Option<f64>>(2)?.is_some(),
sentiment_score: row.get(2)?,
analyzed_at: row.get(3)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;
debug!("获取到 {} 条最近评论", comments.len());
Ok(comments)
}
}