Initial commit: 股吧人气指示器 - PySide6桌面悬浮工具

This commit is contained in:
2026-01-07 17:32:58 +08:00
commit 5b8b9ec35a
9 changed files with 1316 additions and 0 deletions

52
.gitignore vendored Normal file
View File

@@ -0,0 +1,52 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# 虚拟环境
venv/
ENV/
env/
.venv
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# 配置文件(包含敏感信息)
config.json
# 数据库
*.db
*.sqlite
# 日志
*.log
# 系统文件
.DS_Store
Thumbs.db
# 临时文件
*.tmp
*.bak

172
config_manager.py Normal file
View File

@@ -0,0 +1,172 @@
"""
配置管理模块 - 负责配置的读取、验证和持久化
"""
import json
import os
from typing import Any, Dict
from pathlib import Path
class ConfigManager:
"""配置管理器"""
DEFAULT_CONFIG = {
"llm_api": {
"base_url": "https://api.openai.com/v1",
"api_key": "",
"model": "gpt-3.5-turbo",
"timeout": 30,
"retry_times": 3
},
"spider": {
"target_url": "https://example.com",
"xpath": "//a[contains(@class, 'linkblack')]",
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"fetch_interval": 60,
"retry_times": 3,
"retry_interval": 5
},
"ui": {
"opacity": 0.9,
"is_on_top": True,
"thresholds": {
"cold": 30,
"warm": 70
}
},
"database": {
"path": "guba.db"
},
"logging": {
"level": "INFO",
"path": "guba.log"
}
}
def __init__(self, config_path: str = "config.json"):
self.config_path = Path(config_path)
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""加载配置文件"""
if self.config_path.exists():
try:
with open(self.config_path, 'r', encoding='utf-8') as f:
loaded_config = json.load(f)
# 合并默认配置,确保所有键都存在
return self._merge_config(self.DEFAULT_CONFIG, loaded_config)
except (json.JSONDecodeError, IOError) as e:
print(f"配置文件加载失败,使用默认配置: {e}")
return self.DEFAULT_CONFIG.copy()
else:
return self.DEFAULT_CONFIG.copy()
def _merge_config(self, default: Dict, loaded: Dict) -> Dict:
"""递归合并配置"""
result = default.copy()
for key, value in loaded.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = self._merge_config(result[key], value)
else:
result[key] = value
return result
def save_config(self) -> bool:
"""保存配置到文件"""
try:
with open(self.config_path, 'w', encoding='utf-8') as f:
json.dump(self.config, f, ensure_ascii=False, indent=4)
return True
except IOError as e:
print(f"配置保存失败: {e}")
return False
def get(self, *keys: str, default: Any = None) -> Any:
"""获取嵌套配置值"""
value = self.config
for key in keys:
if isinstance(value, dict) and key in value:
value = value[key]
else:
return default
return value
def set(self, value: Any, *keys: str) -> bool:
"""设置嵌套配置值"""
if len(keys) < 1:
return False
current = self.config
for key in keys[:-1]:
if key not in current:
current[key] = {}
current = current[key]
current[keys[-1]] = value
return self.save_config()
def update_llm_api(self, base_url: str = None, api_key: str = None,
model: str = None, timeout: int = None, retry_times: int = None):
"""更新LLM API配置"""
if base_url:
self.config["llm_api"]["base_url"] = base_url
if api_key:
self.config["llm_api"]["api_key"] = api_key
if model:
self.config["llm_api"]["model"] = model
if timeout:
self.config["llm_api"]["timeout"] = timeout
if retry_times:
self.config["llm_api"]["retry_times"] = retry_times
self.save_config()
def update_spider(self, target_url: str = None, xpath: str = None,
user_agent: str = None, fetch_interval: int = None,
retry_times: int = None, retry_interval: int = None):
"""更新爬虫配置"""
if target_url:
self.config["spider"]["target_url"] = target_url
if xpath:
self.config["spider"]["xpath"] = xpath
if user_agent:
self.config["spider"]["user_agent"] = user_agent
if fetch_interval:
self.config["spider"]["fetch_interval"] = fetch_interval
if retry_times:
self.config["spider"]["retry_times"] = retry_times
if retry_interval:
self.config["spider"]["retry_interval"] = retry_interval
self.save_config()
def update_ui(self, opacity: float = None, is_on_top: bool = None,
cold_threshold: int = None, warm_threshold: int = None):
"""更新UI配置"""
if opacity is not None:
self.config["ui"]["opacity"] = max(0.3, min(1.0, opacity))
if is_on_top is not None:
self.config["ui"]["is_on_top"] = is_on_top
if cold_threshold is not None:
self.config["ui"]["thresholds"]["cold"] = cold_threshold
if warm_threshold is not None:
self.config["ui"]["thresholds"]["warm"] = warm_threshold
self.save_config()
@property
def llm_api_config(self) -> Dict:
return self.config["llm_api"]
@property
def spider_config(self) -> Dict:
return self.config["spider"]
@property
def ui_config(self) -> Dict:
return self.config["ui"]
@property
def database_config(self) -> Dict:
return self.config["database"]
@property
def logging_config(self) -> Dict:
return self.config["logging"]

219
database.py Normal file
View File

@@ -0,0 +1,219 @@
"""
数据库模块 - SQLite存储评论和分析结果
"""
import sqlite3
import hashlib
import json
from datetime import datetime
from typing import List, Dict, Optional, Tuple
from pathlib import Path
class DatabaseManager:
"""数据库管理器"""
def __init__(self, db_path: str = "guba.db"):
self.db_path = Path(db_path)
self._init_db()
def _init_db(self):
"""初始化数据库表"""
conn = self._get_connection()
cursor = conn.cursor()
# 评论表
cursor.execute('''
CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL,
content_hash TEXT UNIQUE NOT NULL,
url TEXT,
created_at TEXT,
fetched_at TEXT DEFAULT CURRENT_TIMESTAMP,
analyzed INTEGER DEFAULT 0,
sentiment_score REAL,
analyzed_at TEXT
)
''')
# 分析历史表
cursor.execute('''
CREATE TABLE IF NOT EXISTS analysis_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
comment_id INTEGER,
sentiment_score REAL NOT NULL,
analysis_text TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (comment_id) REFERENCES comments(id)
)
''')
# 配置表
cursor.execute('''
CREATE TABLE IF NOT EXISTS config (
key TEXT PRIMARY KEY,
value TEXT,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def _get_connection(self) -> sqlite3.Connection:
"""获取数据库连接"""
return sqlite3.connect(str(self.db_path))
@staticmethod
def hash_content(content: str) -> str:
"""计算内容哈希值用于去重"""
return hashlib.md5(content.encode('utf-8')).hexdigest()
def is_comment_exists(self, content_hash: str) -> bool:
"""检查评论是否已存在"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute('SELECT 1 FROM comments WHERE content_hash = ?', (content_hash,))
exists = cursor.fetchone() is not None
conn.close()
return exists
def add_comment(self, content: str, url: str = None) -> Optional[int]:
"""添加评论返回评论ID"""
content_hash = self.hash_content(content)
if self.is_comment_exists(content_hash):
return None # 已存在
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO comments (content, content_hash, url, created_at)
VALUES (?, ?, ?, ?)
''', (content, content_hash, url, datetime.now().isoformat()))
comment_id = cursor.lastrowid
conn.commit()
conn.close()
return comment_id
def add_comments_batch(self, comments: List[Dict]) -> List[int]:
"""批量添加评论返回新添加的ID列表"""
new_ids = []
conn = self._get_connection()
cursor = conn.cursor()
for comment in comments:
content = comment.get('content', '')
url = comment.get('url')
content_hash = self.hash_content(content)
if self.is_comment_exists(content_hash):
continue
cursor.execute('''
INSERT INTO comments (content, content_hash, url, created_at)
VALUES (?, ?, ?, ?)
''', (content, content_hash, url, datetime.now().isoformat()))
new_ids.append(cursor.lastrowid)
conn.commit()
conn.close()
return new_ids
def get_unanalyzed_comments(self, limit: int = 50) -> List[Dict]:
"""获取未分析的评论"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT id, content, url FROM comments
WHERE analyzed = 0
ORDER BY fetched_at ASC
LIMIT ?
''', (limit,))
rows = cursor.fetchall()
conn.close()
return [{'id': row[0], 'content': row[1], 'url': row[2]} for row in rows]
def mark_analyzed(self, comment_id: int, sentiment_score: float, analysis_text: str):
"""标记评论已分析"""
conn = self._get_connection()
cursor = conn.cursor()
# 更新评论状态
cursor.execute('''
UPDATE comments
SET analyzed = 1, sentiment_score = ?, analyzed_at = ?
WHERE id = ?
''', (sentiment_score, datetime.now().isoformat(), comment_id))
# 添加分析历史
cursor.execute('''
INSERT INTO analysis_history (comment_id, sentiment_score, analysis_text)
VALUES (?, ?, ?)
''', (comment_id, sentiment_score, analysis_text))
conn.commit()
conn.close()
def get_latest_sentiment_score(self) -> Optional[float]:
"""获取最新的情感分数"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT sentiment_score FROM comments
WHERE analyzed = 1 AND sentiment_score IS NOT NULL
ORDER BY analyzed_at DESC
LIMIT 1
''')
row = cursor.fetchone()
conn.close()
return row[0] if row else None
def get_all_scores(self) -> List[float]:
"""获取所有已分析的分数"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT sentiment_score FROM comments
WHERE analyzed = 1 AND sentiment_score IS NOT NULL
ORDER BY analyzed_at DESC
''')
rows = cursor.fetchall()
conn.close()
return [row[0] for row in rows if row[0] is not None]
def get_comment_count(self) -> int:
"""获取评论总数"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM comments')
count = cursor.fetchone()[0]
conn.close()
return count
def get_analyzed_count(self) -> int:
"""获取已分析评论数"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM comments WHERE analyzed = 1')
count = cursor.fetchone()[0]
conn.close()
return count
def get_recent_comments(self, limit: int = 10) -> List[Dict]:
"""获取最近的评论"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT id, content, sentiment_score, analyzed_at
FROM comments
ORDER BY fetched_at DESC
LIMIT ?
''', (limit,))
rows = cursor.fetchall()
conn.close()
return [
{'id': row[0], 'content': row[1][:50] + '...' if len(row[1]) > 50 else row[1],
'score': row[2], 'analyzed_at': row[3]}
for row in rows
]

163
llm_analyzer.py Normal file
View File

@@ -0,0 +1,163 @@
"""
大模型分析模块 - 调用LLM API分析评论情感
"""
import json
import time
import re
from typing import Dict, Optional, Tuple
from openai import OpenAI, OpenAIError
class LLMAnalyzer:
"""大模型情感分析器"""
SYSTEM_PROMPT = """你是一个专业的情感分析助手。你的任务是分析股吧/论坛评论的情感倾向,判断投资者对该股票的态度。
评分规则:
- 0-30: 极度悲观/看空(利空、暴跌、绝望等情绪)
- 31-50: 偏悲观/中性(担忧、谨慎、观望等情绪)
- 51-70: 偏乐观/中性(看好、希望、期待等情绪)
- 71-100: 极度乐观/看涨(利好、暴涨、兴奋等情绪)
请直接输出一个JSON格式的结果包含两个字段
- score: 0-100的整数评分
- label: 简短的态度描述(如"强烈看跌""谨慎观望""温和看涨""强烈看涨"等)
注意:
1. 只返回JSON不要有其他文字
2. 如果无法判断返回50和"无法判断"
3. 分析要客观,不要被表面文字迷惑
"""
def __init__(self, config: Dict):
self.config = config
self.base_url = config.get('base_url', 'https://api.openai.com/v1')
self.api_key = config.get('api_key', '')
self.model = config.get('model', 'gpt-3.5-turbo')
self.timeout = config.get('timeout', 30)
self.retry_times = config.get('retry_times', 3)
self.client = None
if self.api_key:
self._init_client()
def _init_client(self):
"""初始化OpenAI客户端"""
try:
self.client = OpenAI(
api_key=self.api_key,
base_url=self.base_url,
timeout=self.timeout
)
except Exception as e:
print(f"初始化LLM客户端失败: {e}")
def update_config(self, config: Dict):
"""更新配置"""
self.config.update(config)
self.base_url = config.get('base_url', self.base_url)
self.api_key = config.get('api_key', self.api_key)
self.model = config.get('model', self.model)
self.timeout = config.get('timeout', self.timeout)
self.retry_times = config.get('retry_times', self.retry_times)
if self.api_key:
self._init_client()
def analyze(self, comment: str) -> Tuple[Optional[int], Optional[str]]:
"""
分析单条评论
返回 (score, label)
"""
if not self.client:
return None, "LLM未配置"
if not comment or not comment.strip():
return None, "评论为空"
for attempt in range(self.retry_times):
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": f"请分析以下评论的情感倾向:\n\n{comment}"}
],
temperature=0.3,
max_tokens=200
)
result_text = response.choices[0].message.content.strip()
score, label = self._parse_response(result_text)
if score is not None:
return score, label
except OpenAIError as e:
print(f"API调用失败 (尝试 {attempt + 1}/{self.retry_times}): {e}")
if attempt < self.retry_times - 1:
time.sleep(2 ** attempt) # 指数退避
except Exception as e:
print(f"分析过程出错: {e}")
break
return None, "分析失败"
def _parse_response(self, response: str) -> Tuple[Optional[int], Optional[str]]:
"""解析LLM返回的结果"""
try:
# 尝试直接解析JSON
result = json.loads(response)
score = result.get('score', 50)
label = result.get('label', '无法判断')
# 验证分数范围
score = max(0, min(100, int(score)))
return score, label
except json.JSONDecodeError:
# 尝试从文本中提取
pass
# 尝试从文本中提取数字
numbers = re.findall(r'\b(\d{1,3})\b', response)
if numbers:
score = int(numbers[0])
score = max(0, min(100, score))
# 提取标签
label_match = re.search(r'["']([^"']+)["']', response)
if label_match:
label = label_match.group(1)
else:
label = response.split('\n')[0][:20] if response else '无法判断'
return score, label
return None, "解析失败"
def analyze_batch(self, comments: list, delay: float = 1.0) -> list:
"""
批量分析评论
delay: 每次调用之间的延迟(秒)
"""
results = []
for i, comment in enumerate(comments):
print(f"分析评论 {i + 1}/{len(comments)}...")
score, label = self.analyze(comment)
results.append({
'content': comment,
'score': score,
'label': label
})
if delay > 0 and i < len(comments) - 1:
time.sleep(delay)
return results
def is_configured(self) -> bool:
"""检查是否已配置"""
return bool(self.client and self.api_key)

201
main.py Normal file
View File

@@ -0,0 +1,201 @@
"""
主程序入口 - 股吧人气指示器
"""
import sys
import logging
import time
from datetime import datetime
from PySide6.QtWidgets import QApplication
from PySide6.QtCore import QTimer, Signal, QObject
from config_manager import ConfigManager
from database import DatabaseManager
from spider import SpiderManager
from llm_analyzer import LLMAnalyzer
from main_window import MainWindow
class BackendWorker(QObject):
"""后台工作器 - 处理爬取和分析任务"""
fetch_finished = Signal(list)
analysis_finished = Signal(float)
error_occurred = Signal(str)
status_update = Signal(str)
def __init__(self, config_manager: ConfigManager, db_manager: DatabaseManager,
spider: SpiderManager, analyzer: LLMAnalyzer):
super().__init__()
self.config = config_manager
self.db = db_manager
self.spider = spider
self.analyzer = analyzer
self.running = False
self.last_fetch_time = 0
self.fetch_interval = 60 # 默认60秒
self.no_new_content_count = 0 # 无新内容计数
def start(self):
"""启动后台任务"""
self.running = True
self._run_cycle()
def stop(self):
"""停止后台任务"""
self.running = False
def _run_cycle(self):
"""运行一个周期"""
if not self.running:
return
try:
# 1. 爬取评论
self.status_update.emit("正在爬取评论...")
comments = self.spider.fetch()
if not comments:
self.no_new_content_count += 1
interval = self.fetch_interval * (1 + min(self.no_new_content_count * 0.5, 2))
self.status_update.emit(f"无新内容,{int(interval)}秒后重试...")
QTimer.singleShot(int(interval * 1000), self._run_cycle)
return
# 2. 写入数据库
self.status_update.emit(f"获取到 {len(comments)} 条评论...")
new_ids = self.db.add_comments_batch(comments)
if new_ids:
self.no_new_content_count = 0
self.status_update.emit(f"新增 {len(new_ids)} 条评论")
# 3. 获取未分析评论并分析
unanalyzed = self.db.get_unanalyzed_comments(limit=10)
if unanalyzed:
self.status_update.emit(f"开始分析 {len(unanalyzed)} 条评论...")
self._analyze_comments(unanalyzed)
else:
self.no_new_content_count += 1
# 4. 更新指示器
self._update_indicator()
except Exception as e:
self.error_occurred.emit(f"运行错误: {str(e)}")
# 安排下一次执行
if self.running:
interval = self.fetch_interval * (1 + min(self.no_new_content_count * 0.5, 2))
QTimer.singleShot(int(interval * 1000), self._run_cycle)
def _analyze_comments(self, comments):
"""分析评论"""
for i, comment in enumerate(comments):
if not self.running:
break
try:
self.status_update.emit(f"分析 {i+1}/{len(comments)}...")
score, label = self.analyzer.analyze(comment['content'])
if score is not None:
self.db.mark_analyzed(comment['id'], score, label)
time.sleep(1.0) # 延迟避免API限流
else:
self.db.mark_analyzed(comment['id'], 50, "无法判断")
except Exception as e:
self.error_occurred.emit(f"分析失败: {str(e)}")
self.db.mark_analyzed(comment['id'], 50, "分析异常")
def _update_indicator(self):
"""更新指示器显示"""
scores = self.db.get_all_scores()
if not scores:
return
# 计算平均分
avg_score = sum(scores) / len(scores)
# 根据阈值确定标签
thresholds = self.config.get('ui', 'thresholds', default={'cold': 30, 'warm': 70})
cold = thresholds.get('cold', 30)
warm = thresholds.get('warm', 70)
if avg_score < cold:
label = "看跌"
elif avg_score > warm:
label = "看涨"
else:
label = "中性"
self.analysis_finished.emit(avg_score)
def manual_refresh(self):
"""手动刷新"""
self.no_new_content_count = 0
self._run_cycle()
def update_fetch_interval(self, interval: int):
"""更新爬取间隔"""
self.fetch_interval = interval
def setup_logging(log_path: str, level: str = "INFO"):
"""配置日志"""
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_path, encoding='utf-8'),
logging.StreamHandler()
]
)
def main():
"""主函数"""
# 创建应用
app = QApplication(sys.argv)
app.setQuitOnLastWindowClosed(False) # 允许最小化到托盘
# 加载配置
config = ConfigManager("config.json")
# 配置日志
log_config = config.logging_config
setup_logging(log_config.get('path', 'guba.log'), log_config.get('level', 'INFO'))
# 初始化组件
db = DatabaseManager(config.database_config.get('path', 'guba.db'))
spider = SpiderManager(config.spider_config)
analyzer = LLMAnalyzer(config.llm_api_config)
# 创建后台工作器
worker = BackendWorker(config, db, spider, analyzer)
worker.update_fetch_interval(config.spider_config.get('fetch_interval', 60))
# 创建主窗口
window = MainWindow(config)
window.show()
# 连接信号
worker.status_update.connect(window.update_status)
worker.analysis_finished.connect(window.update_indicator)
worker.error_occurred.connect(lambda msg: window.show_message("错误", msg))
# 设置按钮回调
window.set_refresh_callback(worker.manual_refresh)
window.set_config_callback(window.show_config)
# 启动后台任务
worker.start()
# 运行应用
sys.exit(app.exec())
if __name__ == "__main__":
main()

362
main_window.py Normal file
View File

@@ -0,0 +1,362 @@
"""
PySide6 GUI界面模块
"""
from PySide6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QSlider, QDialog, QFormLayout,
QLineEdit, QSpinBox, QMessageBox, QSystemTrayIcon,
QMenu, QTextEdit, QGroupBox, QDialogButtonBox)
from PySide6.QtCore import Qt, QTimer, Signal, QPoint
from PySide6.QtGui import QFont, QColor, QPainter, QBrush, QPen, QIcon, QAction
from typing import Callable, Optional
class SentimentIndicator(QWidget):
"""情感指示灯组件"""
def __init__(self, parent=None):
super().__init__(parent)
self.score = 50
self.label_text = "中性"
self.setMinimumSize(100, 100)
def set_value(self, score: int, label: str = None):
"""设置数值和标签"""
self.score = max(0, min(100, score))
if label:
self.label_text = label
self.update()
def paintEvent(self, event):
"""绘制指示灯"""
painter = QPainter(self)
painter.setRenderHint(QPainter.Antialiasing)
center = self.rect().center()
radius = min(self.width(), self.height()) // 2 - 10
# 根据分数确定颜色
color = self._get_color(self.score)
# 绘制外圈
painter.setPen(QPen(QColor(100, 100, 100), 2))
painter.setBrush(QBrush(QColor(30, 30, 30)))
painter.drawEllipse(center, radius, radius)
# 绘制内圈(渐变效果)
gradient_color = QColor(color)
painter.setPen(Qt.NoPen)
painter.setBrush(QBrush(gradient_color))
painter.drawEllipse(center, radius - 4, radius - 4)
# 绘制发光效果
for i in range(3, 0, -1):
glow_color = QColor(color)
glow_color.setAlpha(50 // i)
painter.setBrush(QBrush(glow_color))
painter.drawEllipse(center, radius - i * 5, radius - i * 5)
def _get_color(self, score: int) -> QColor:
"""根据分数获取颜色"""
if score < 30:
# 冷色系 - 蓝色/青色
ratio = score / 30
return QColor(int(0 + 100 * ratio), int(150 + 50 * ratio), 255)
elif score < 70:
# 中性 - 灰色/绿色
if score < 50:
ratio = (score - 30) / 20
return QColor(int(100 + 50 * ratio), int(200 + 20 * ratio), int(200 - 50 * ratio))
else:
ratio = (score - 50) / 20
return QColor(int(150 + 50 * ratio), int(220 - 20 * ratio), int(150 - 50 * ratio))
else:
# 暖色系 - 橙色/红色
ratio = (score - 70) / 30
return QColor(255, int(200 - 100 * ratio), int(50 + 50 * ratio))
def get_description(self, score: int) -> str:
"""获取描述文本"""
if score < 20:
return "极度看跌"
elif score < 40:
return "偏悲观"
elif score < 60:
return "中性"
elif score < 80:
return "偏乐观"
else:
return "极度看涨"
class ConfigDialog(QDialog):
"""配置对话框"""
def __init__(self, config_manager, parent=None):
super().__init__(parent)
self.config_manager = config_manager
self.setWindowTitle("配置")
self.setMinimumWidth(400)
self._init_ui()
def _init_ui(self):
layout = QFormLayout(self)
# LLM API 配置
llm_config = self.config_manager.llm_api_config
self.base_url_edit = QLineEdit(llm_config.get('base_url', ''))
self.api_key_edit = QLineEdit(llm_config.get('api_key', ''))
self.api_key_edit.setEchoMode(QLineEdit.Password)
self.model_edit = QLineEdit(llm_config.get('model', ''))
self.timeout_spin = QSpinBox()
self.timeout_spin.setRange(10, 300)
self.timeout_spin.setValue(llm_config.get('timeout', 30))
layout.addRow("API Base URL:", self.base_url_edit)
layout.addRow("API Key:", self.api_key_edit)
layout.addRow("Model:", self.model_edit)
layout.addRow("Timeout (s):", self.timeout_spin)
# 爬虫配置
spider_config = self.config_manager.spider_config
self.url_edit = QLineEdit(spider_config.get('target_url', ''))
self.xpath_edit = QLineEdit(spider_config.get('xpath', ''))
self.user_agent_edit = QLineEdit(spider_config.get('user_agent', ''))
self.interval_spin = QSpinBox()
self.interval_spin.setRange(10, 3600)
self.interval_spin.setValue(spider_config.get('fetch_interval', 60))
layout.addRow("目标URL:", self.url_edit)
layout.addRow("XPath:", self.xpath_edit)
layout.addRow("User Agent:", self.user_agent_edit)
layout.addRow("刷新间隔(s):", self.interval_spin)
# UI 配置
ui_config = self.config_manager.ui_config
self.opacity_slider = QSlider(Qt.Horizontal)
self.opacity_slider.setRange(30, 100)
self.opacity_slider.setValue(int(ui_config.get('opacity', 0.9) * 100))
self.ontop_check = QCheckBox() if hasattr(self, 'QCheckBox') else None
# 使用 QPushButton 替代 QCheckBox
self.ontop_btn = QPushButton("置顶")
self.ontop_btn.setCheckable(True)
self.ontop_btn.setChecked(ui_config.get('is_on_top', True))
layout.addRow("透明度:", self.opacity_slider)
layout.addRow("窗口置顶:", self.ontop_btn)
# 阈值配置
thresholds = ui_config.get('thresholds', {})
self.cold_spin = QSpinBox()
self.cold_spin.setRange(0, 50)
self.cold_spin.setValue(thresholds.get('cold', 30))
self.warm_spin = QSpinBox()
self.warm_spin.setRange(50, 100)
self.warm_spin.setValue(thresholds.get('warm', 70))
layout.addRow("寒冷阈值:", self.cold_spin)
layout.addRow("温暖阈值:", self.warm_spin)
# 按钮
button_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel)
button_box.accepted.connect(self._save_config)
button_box.rejected.connect(self.reject)
layout.addRow(button_box)
def _save_config(self):
"""保存配置"""
# LLM API
self.config_manager.update_llm_api(
base_url=self.base_url_edit.text(),
api_key=self.api_key_edit.text(),
model=self.model_edit.text(),
timeout=self.timeout_spin.value()
)
# 爬虫
self.config_manager.update_spider(
target_url=self.url_edit.text(),
xpath=self.xpath_edit.text(),
user_agent=self.user_agent_edit.text(),
fetch_interval=self.interval_spin.value()
)
# UI
self.config_manager.update_ui(
opacity=self.opacity_slider.value() / 100.0,
is_on_top=self.ontop_btn.isChecked(),
cold_threshold=self.cold_spin.value(),
warm_threshold=self.warm_spin.value()
)
self.accept()
class MainWindow(QWidget):
"""主窗口"""
def __init__(self, config_manager, parent=None):
super().__init__(parent)
self.config_manager = config_manager
self.setWindowTitle("股吧人气指示器")
self._init_ui()
self._apply_config()
# 拖拽相关
self.dragging = False
self.drag_position = QPoint()
# 系统托盘
self._init_tray_icon()
def _init_ui(self):
"""初始化UI"""
layout = QVBoxLayout(self)
layout.setContentsMargins(10, 10, 10, 10)
# 标题
self.title_label = QLabel("股吧人气")
self.title_label.setAlignment(Qt.AlignCenter)
title_font = QFont()
title_font.setPointSize(14)
title_font.setBold(True)
self.title_label.setFont(title_font)
# 指示灯
self.indicator = SentimentIndicator()
self.score_label = QLabel("50 - 中性")
self.score_label.setAlignment(Qt.AlignCenter)
# 状态信息
self.status_label = QLabel("等待数据...")
self.status_label.setAlignment(Qt.AlignCenter)
status_font = QFont()
status_font.setPointSize(10)
self.status_label.setFont(status_font)
# 按钮
btn_layout = QHBoxLayout()
self.refresh_btn = QPushButton("刷新")
self.config_btn = QPushButton("配置")
btn_layout.addWidget(self.refresh_btn)
btn_layout.addWidget(self.config_btn)
# 添加到主布局
layout.addWidget(self.title_label)
layout.addWidget(self.indicator)
layout.addWidget(self.score_label)
layout.addWidget(self.status_label)
layout.addLayout(btn_layout)
# 设置窗口标志(无边框、可拖拽)
self.setWindowFlags(Qt.FramelessWindowHint | Qt.WindowStaysOnTopHint)
self.setAttribute(Qt.WA_TranslucentBackground)
def _init_tray_icon(self):
"""初始化系统托盘"""
self.tray_icon = QSystemTrayIcon(self)
self.tray_icon.setToolTip("股吧人气指示器")
# 创建托盘菜单
tray_menu = QMenu()
show_action = QAction("显示", self)
hide_action = QAction("隐藏", self)
quit_action = QAction("退出", self)
show_action.triggered.connect(self.show)
hide_action.triggered.connect(self.hide)
quit_action.triggered.connect(self.quit_app)
tray_menu.addAction(show_action)
tray_menu.addAction(hide_action)
tray_menu.addAction(quit_action)
self.tray_icon.setContextMenu(tray_menu)
self.tray_icon.show()
def quit_app(self):
"""退出应用"""
self.close()
import sys
sys.exit(0)
def _apply_config(self):
"""应用配置"""
ui_config = self.config_manager.ui_config
self.setWindowOpacity(ui_config.get('opacity', 0.9))
if ui_config.get('is_on_top', True):
self.setWindowFlags(self.windowFlags() | Qt.WindowStaysOnTopHint)
else:
self.setWindowFlags(self.windowFlags() & ~Qt.WindowStaysOnTopHint)
thresholds = ui_config.get('thresholds', {})
def mousePressEvent(self, event):
"""鼠标按下事件"""
if event.button() == Qt.LeftButton:
self.dragging = True
self.drag_position = event.globalPosition().toPoint() - self.frameGeometry().topLeft()
def mouseMoveEvent(self, event):
"""鼠标移动事件"""
if self.dragging:
self.move(event.globalPosition().toPoint() - self.drag_position)
def mouseReleaseEvent(self, event):
"""鼠标释放事件"""
if event.button() == Qt.LeftButton:
self.dragging = False
def contextMenuEvent(self, event):
"""右键菜单"""
context_menu = QMenu(self)
config_action = QAction("配置", self)
opacity_action = QAction("透明度", self)
quit_action = QAction("退出", self)
config_action.triggered.connect(self.show_config)
quit_action.triggered.connect(self.quit_app)
context_menu.addAction(config_action)
context_menu.addAction(quit_action)
context_menu.exec(event.globalPosition().toPoint())
def show_config(self):
"""显示配置对话框"""
dialog = ConfigDialog(self.config_manager, self)
if dialog.exec() == QDialog.Accepted:
self._apply_config()
def update_indicator(self, score: int, label: str = None):
"""更新指示灯"""
if label is None:
label = self.indicator.get_description(score)
self.indicator.set_value(score, label)
self.score_label.setText(f"{score} - {label}")
def update_status(self, text: str):
"""更新状态"""
self.status_label.setText(text)
def set_refresh_callback(self, callback: Callable):
"""设置刷新按钮回调"""
self.refresh_btn.clicked.connect(callback)
def set_config_callback(self, callback: Callable):
"""设置配置按钮回调"""
self.config_btn.clicked.connect(callback)
def show_message(self, title: str, message: str, icon=QMessageBox.Information):
"""显示消息"""
QMessageBox.information(self, title, message)
class QCheckBox(QPushButton):
"""自定义复选框"""
def __init__(self, text=""):
super().__init__(text)
self.setCheckable(True)
self.setChecked(False)

5
requirements.txt Normal file
View File

@@ -0,0 +1,5 @@
PySide6>=6.6.0
requests>=2.31.0
beautifulsoup4>=4.12.0
lxml>=4.9.0
openai>=1.0.0

130
spider.py Normal file
View File

@@ -0,0 +1,130 @@
"""
爬虫模块 - 网站评论抓取
"""
import requests
from lxml import etree
import time
from typing import List, Dict, Optional
from urllib.parse import urljoin
from bs4 import BeautifulSoup
import random
class SpiderManager:
"""爬虫管理器"""
def __init__(self, config: Dict):
self.config = config
self.session = requests.Session()
self.session.headers.update({
'User-Agent': config.get('user_agent', 'Mozilla/5.0')
})
self.retry_times = config.get('retry_times', 3)
self.retry_interval = config.get('retry_interval', 5)
def fetch(self, url: str = None, xpath: str = None) -> List[Dict]:
"""
抓取网页评论
返回评论列表,每个元素包含 content 和 url
"""
target_url = url or self.config.get('target_url', '')
target_xpath = xpath or self.config.get('xpath', '')
if not target_url:
return []
html = self._fetch_with_retry(target_url)
if not html:
return []
return self._parse_comments(html, target_xpath, target_url)
def _fetch_with_retry(self, url: str, max_retries: int = None) -> Optional[str]:
"""带重试的网页获取"""
max_retries = max_retries or self.retry_times
for attempt in range(max_retries):
try:
response = self.session.get(url, timeout=30)
response.raise_for_status()
response.encoding = response.apparent_encoding
return response.text
except requests.RequestException as e:
print(f"请求失败 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(self.retry_interval + random.uniform(0, 2))
else:
return None
return None
def _parse_comments(self, html: str, xpath: str, base_url: str) -> List[Dict]:
"""解析评论"""
comments = []
try:
# 使用 lxml 解析
tree = etree.HTML(html)
elements = tree.xpath(xpath)
for elem in elements:
try:
text = elem.text_content().strip()
if text:
# 获取链接的 href如果存在
href = elem.get('href')
full_url = urljoin(base_url, href) if href else base_url
comments.append({
'content': text,
'url': full_url
})
except Exception as e:
print(f"解析元素失败: {e}")
continue
except Exception as e:
print(f"XPath解析失败: {e}")
# 备选解析方法
comments = self._fallback_parse(html, base_url)
return comments
def _fallback_parse(self, html: str, base_url: str) -> List[Dict]:
"""备选解析方法 - 使用 BeautifulSoup"""
comments = []
try:
soup = BeautifulSoup(html, 'lxml')
# 尝试查找常见的评论元素
# 这里可以根据实际网站结构调整选择器
elements = soup.find_all(['a', 'div', 'p', 'span'], class_=lambda x: x and 'linkblack' in x if x else False)
for elem in elements[:50]: # 限制数量
text = elem.get_text().strip()
if text and len(text) > 5:
comments.append({
'content': text,
'url': base_url
})
except Exception as e:
print(f"备选解析失败: {e}")
return comments
def set_user_agent(self, user_agent: str):
"""更新User-Agent"""
self.session.headers.update({'User-Agent': user_agent})
def update_config(self, config: Dict):
"""更新配置"""
self.config.update(config)
if 'user_agent' in config:
self.set_user_agent(config['user_agent'])
if 'retry_times' in config:
self.retry_times = config['retry_times']
if 'retry_interval' in config:
self.retry_interval = config['retry_interval']
def get_fetch_interval(self) -> int:
"""获取爬取间隔"""
return self.config.get('fetch_interval', 60)

12
需求.txt Normal file
View File

@@ -0,0 +1,12 @@
开发一个基于 Python + PySide6 的桌面悬浮小工具,核心功能是:爬取指定网站的评论(定期刷新、去重),调用大模型 API 分析评论的 “人气冷暖”(返回 0-100 的数值0 为极冷、100 为极暖),并通过桌面指示灯可视化展示;工具界面包含标题、冷暖指示灯、配置按钮,配置项需支持修改大模型 API、调整界面透明度爬取的网站的地址的配置爬取内容的xpath配置。
配置持久化 配置API、透明度、爬取规则等保存到本地 JSON 文件,避免每次启动重新配置
窗口交互 悬浮窗口可拖拽位置、置顶显示、最小化 / 关闭选项(桌面工具必备)
爬取增强 评论去重(哈希校验)、爬取失败重试、爬取频率 / UA 配置、反爬基础适配
大模型交互 API 超时 / 重试、响应格式校验(确保返回 0-100 数值)、多模型请求格式适配
可视化增强 指示灯显示具体数值(如 “88 - 强烈看好”)、冷暖阈值可自定义(冷 / 中性 / 暖分界)
异常处理 爬取 / API 调用失败的弹窗提示、日志记录、错误兜底(如默认显示 “中性”)
如果爬取的内容已经存在,则增大刷新时间。
比如//a[contains(@class, "linkblack")]返回是列表先写入当前的sqlite数据库再调用大模型一条条分析加一个延迟再存入当前的sqlite数据库。