From 5b8b9ec35ab324ce25505d01ca9667bac0cba7af Mon Sep 17 00:00:00 2001 From: xiaji Date: Wed, 7 Jan 2026 17:32:58 +0800 Subject: [PATCH] =?UTF-8?q?Initial=20commit:=20=E8=82=A1=E5=90=A7=E4=BA=BA?= =?UTF-8?q?=E6=B0=94=E6=8C=87=E7=A4=BA=E5=99=A8=20-=20PySide6=E6=A1=8C?= =?UTF-8?q?=E9=9D=A2=E6=82=AC=E6=B5=AE=E5=B7=A5=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 52 +++++++ config_manager.py | 172 ++++++++++++++++++++++ database.py | 219 ++++++++++++++++++++++++++++ llm_analyzer.py | 163 +++++++++++++++++++++ main.py | 201 +++++++++++++++++++++++++ main_window.py | 362 ++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 5 + spider.py | 130 +++++++++++++++++ 需求.txt | 12 ++ 9 files changed, 1316 insertions(+) create mode 100644 .gitignore create mode 100644 config_manager.py create mode 100644 database.py create mode 100644 llm_analyzer.py create mode 100644 main.py create mode 100644 main_window.py create mode 100644 requirements.txt create mode 100644 spider.py create mode 100644 需求.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5feab33 --- /dev/null +++ b/.gitignore @@ -0,0 +1,52 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# 虚拟环境 +venv/ +ENV/ +env/ +.venv + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# 配置文件(包含敏感信息) +config.json + +# 数据库 +*.db +*.sqlite + +# 日志 +*.log + +# 系统文件 +.DS_Store +Thumbs.db + +# 临时文件 +*.tmp +*.bak diff --git a/config_manager.py b/config_manager.py new file mode 100644 index 0000000..90fc92b --- /dev/null +++ b/config_manager.py @@ -0,0 +1,172 @@ +""" +配置管理模块 - 负责配置的读取、验证和持久化 +""" +import json +import os +from typing import Any, Dict +from pathlib import Path + + +class ConfigManager: + """配置管理器""" + + DEFAULT_CONFIG = { + "llm_api": { + "base_url": "https://api.openai.com/v1", + "api_key": "", + "model": "gpt-3.5-turbo", + "timeout": 30, + "retry_times": 3 + }, + "spider": { + "target_url": "https://example.com", + "xpath": "//a[contains(@class, 'linkblack')]", + "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", + "fetch_interval": 60, + "retry_times": 3, + "retry_interval": 5 + }, + "ui": { + "opacity": 0.9, + "is_on_top": True, + "thresholds": { + "cold": 30, + "warm": 70 + } + }, + "database": { + "path": "guba.db" + }, + "logging": { + "level": "INFO", + "path": "guba.log" + } + } + + def __init__(self, config_path: str = "config.json"): + self.config_path = Path(config_path) + self.config = self._load_config() + + def _load_config(self) -> Dict[str, Any]: + """加载配置文件""" + if self.config_path.exists(): + try: + with open(self.config_path, 'r', encoding='utf-8') as f: + loaded_config = json.load(f) + # 合并默认配置,确保所有键都存在 + return self._merge_config(self.DEFAULT_CONFIG, loaded_config) + except (json.JSONDecodeError, IOError) as e: + print(f"配置文件加载失败,使用默认配置: {e}") + return self.DEFAULT_CONFIG.copy() + else: + return self.DEFAULT_CONFIG.copy() + + def _merge_config(self, default: Dict, loaded: Dict) -> Dict: + """递归合并配置""" + result = default.copy() + for key, value in loaded.items(): + if key in result and isinstance(result[key], dict) and isinstance(value, dict): + result[key] = self._merge_config(result[key], value) + else: + result[key] = value + return result + + def save_config(self) -> bool: + """保存配置到文件""" + try: + with open(self.config_path, 'w', encoding='utf-8') as f: + json.dump(self.config, f, ensure_ascii=False, indent=4) + return True + except IOError as e: + print(f"配置保存失败: {e}") + return False + + def get(self, *keys: str, default: Any = None) -> Any: + """获取嵌套配置值""" + value = self.config + for key in keys: + if isinstance(value, dict) and key in value: + value = value[key] + else: + return default + return value + + def set(self, value: Any, *keys: str) -> bool: + """设置嵌套配置值""" + if len(keys) < 1: + return False + + current = self.config + for key in keys[:-1]: + if key not in current: + current[key] = {} + current = current[key] + + current[keys[-1]] = value + return self.save_config() + + def update_llm_api(self, base_url: str = None, api_key: str = None, + model: str = None, timeout: int = None, retry_times: int = None): + """更新LLM API配置""" + if base_url: + self.config["llm_api"]["base_url"] = base_url + if api_key: + self.config["llm_api"]["api_key"] = api_key + if model: + self.config["llm_api"]["model"] = model + if timeout: + self.config["llm_api"]["timeout"] = timeout + if retry_times: + self.config["llm_api"]["retry_times"] = retry_times + self.save_config() + + def update_spider(self, target_url: str = None, xpath: str = None, + user_agent: str = None, fetch_interval: int = None, + retry_times: int = None, retry_interval: int = None): + """更新爬虫配置""" + if target_url: + self.config["spider"]["target_url"] = target_url + if xpath: + self.config["spider"]["xpath"] = xpath + if user_agent: + self.config["spider"]["user_agent"] = user_agent + if fetch_interval: + self.config["spider"]["fetch_interval"] = fetch_interval + if retry_times: + self.config["spider"]["retry_times"] = retry_times + if retry_interval: + self.config["spider"]["retry_interval"] = retry_interval + self.save_config() + + def update_ui(self, opacity: float = None, is_on_top: bool = None, + cold_threshold: int = None, warm_threshold: int = None): + """更新UI配置""" + if opacity is not None: + self.config["ui"]["opacity"] = max(0.3, min(1.0, opacity)) + if is_on_top is not None: + self.config["ui"]["is_on_top"] = is_on_top + if cold_threshold is not None: + self.config["ui"]["thresholds"]["cold"] = cold_threshold + if warm_threshold is not None: + self.config["ui"]["thresholds"]["warm"] = warm_threshold + self.save_config() + + @property + def llm_api_config(self) -> Dict: + return self.config["llm_api"] + + @property + def spider_config(self) -> Dict: + return self.config["spider"] + + @property + def ui_config(self) -> Dict: + return self.config["ui"] + + @property + def database_config(self) -> Dict: + return self.config["database"] + + @property + def logging_config(self) -> Dict: + return self.config["logging"] diff --git a/database.py b/database.py new file mode 100644 index 0000000..ad7f54e --- /dev/null +++ b/database.py @@ -0,0 +1,219 @@ +""" +数据库模块 - SQLite存储评论和分析结果 +""" +import sqlite3 +import hashlib +import json +from datetime import datetime +from typing import List, Dict, Optional, Tuple +from pathlib import Path + + +class DatabaseManager: + """数据库管理器""" + + def __init__(self, db_path: str = "guba.db"): + self.db_path = Path(db_path) + self._init_db() + + def _init_db(self): + """初始化数据库表""" + conn = self._get_connection() + cursor = conn.cursor() + + # 评论表 + cursor.execute(''' + CREATE TABLE IF NOT EXISTS comments ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + content TEXT NOT NULL, + content_hash TEXT UNIQUE NOT NULL, + url TEXT, + created_at TEXT, + fetched_at TEXT DEFAULT CURRENT_TIMESTAMP, + analyzed INTEGER DEFAULT 0, + sentiment_score REAL, + analyzed_at TEXT + ) + ''') + + # 分析历史表 + cursor.execute(''' + CREATE TABLE IF NOT EXISTS analysis_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + comment_id INTEGER, + sentiment_score REAL NOT NULL, + analysis_text TEXT, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (comment_id) REFERENCES comments(id) + ) + ''') + + # 配置表 + cursor.execute(''' + CREATE TABLE IF NOT EXISTS config ( + key TEXT PRIMARY KEY, + value TEXT, + updated_at TEXT DEFAULT CURRENT_TIMESTAMP + ) + ''') + + conn.commit() + conn.close() + + def _get_connection(self) -> sqlite3.Connection: + """获取数据库连接""" + return sqlite3.connect(str(self.db_path)) + + @staticmethod + def hash_content(content: str) -> str: + """计算内容哈希值用于去重""" + return hashlib.md5(content.encode('utf-8')).hexdigest() + + def is_comment_exists(self, content_hash: str) -> bool: + """检查评论是否已存在""" + conn = self._get_connection() + cursor = conn.cursor() + cursor.execute('SELECT 1 FROM comments WHERE content_hash = ?', (content_hash,)) + exists = cursor.fetchone() is not None + conn.close() + return exists + + def add_comment(self, content: str, url: str = None) -> Optional[int]: + """添加评论,返回评论ID""" + content_hash = self.hash_content(content) + + if self.is_comment_exists(content_hash): + return None # 已存在 + + conn = self._get_connection() + cursor = conn.cursor() + cursor.execute(''' + INSERT INTO comments (content, content_hash, url, created_at) + VALUES (?, ?, ?, ?) + ''', (content, content_hash, url, datetime.now().isoformat())) + comment_id = cursor.lastrowid + conn.commit() + conn.close() + return comment_id + + def add_comments_batch(self, comments: List[Dict]) -> List[int]: + """批量添加评论,返回新添加的ID列表""" + new_ids = [] + conn = self._get_connection() + cursor = conn.cursor() + + for comment in comments: + content = comment.get('content', '') + url = comment.get('url') + content_hash = self.hash_content(content) + + if self.is_comment_exists(content_hash): + continue + + cursor.execute(''' + INSERT INTO comments (content, content_hash, url, created_at) + VALUES (?, ?, ?, ?) + ''', (content, content_hash, url, datetime.now().isoformat())) + new_ids.append(cursor.lastrowid) + + conn.commit() + conn.close() + return new_ids + + def get_unanalyzed_comments(self, limit: int = 50) -> List[Dict]: + """获取未分析的评论""" + conn = self._get_connection() + cursor = conn.cursor() + cursor.execute(''' + SELECT id, content, url FROM comments + WHERE analyzed = 0 + ORDER BY fetched_at ASC + LIMIT ? + ''', (limit,)) + rows = cursor.fetchall() + conn.close() + return [{'id': row[0], 'content': row[1], 'url': row[2]} for row in rows] + + def mark_analyzed(self, comment_id: int, sentiment_score: float, analysis_text: str): + """标记评论已分析""" + conn = self._get_connection() + cursor = conn.cursor() + + # 更新评论状态 + cursor.execute(''' + UPDATE comments + SET analyzed = 1, sentiment_score = ?, analyzed_at = ? + WHERE id = ? + ''', (sentiment_score, datetime.now().isoformat(), comment_id)) + + # 添加分析历史 + cursor.execute(''' + INSERT INTO analysis_history (comment_id, sentiment_score, analysis_text) + VALUES (?, ?, ?) + ''', (comment_id, sentiment_score, analysis_text)) + + conn.commit() + conn.close() + + def get_latest_sentiment_score(self) -> Optional[float]: + """获取最新的情感分数""" + conn = self._get_connection() + cursor = conn.cursor() + cursor.execute(''' + SELECT sentiment_score FROM comments + WHERE analyzed = 1 AND sentiment_score IS NOT NULL + ORDER BY analyzed_at DESC + LIMIT 1 + ''') + row = cursor.fetchone() + conn.close() + return row[0] if row else None + + def get_all_scores(self) -> List[float]: + """获取所有已分析的分数""" + conn = self._get_connection() + cursor = conn.cursor() + cursor.execute(''' + SELECT sentiment_score FROM comments + WHERE analyzed = 1 AND sentiment_score IS NOT NULL + ORDER BY analyzed_at DESC + ''') + rows = cursor.fetchall() + conn.close() + return [row[0] for row in rows if row[0] is not None] + + def get_comment_count(self) -> int: + """获取评论总数""" + conn = self._get_connection() + cursor = conn.cursor() + cursor.execute('SELECT COUNT(*) FROM comments') + count = cursor.fetchone()[0] + conn.close() + return count + + def get_analyzed_count(self) -> int: + """获取已分析评论数""" + conn = self._get_connection() + cursor = conn.cursor() + cursor.execute('SELECT COUNT(*) FROM comments WHERE analyzed = 1') + count = cursor.fetchone()[0] + conn.close() + return count + + def get_recent_comments(self, limit: int = 10) -> List[Dict]: + """获取最近的评论""" + conn = self._get_connection() + cursor = conn.cursor() + cursor.execute(''' + SELECT id, content, sentiment_score, analyzed_at + FROM comments + ORDER BY fetched_at DESC + LIMIT ? + ''', (limit,)) + rows = cursor.fetchall() + conn.close() + return [ + {'id': row[0], 'content': row[1][:50] + '...' if len(row[1]) > 50 else row[1], + 'score': row[2], 'analyzed_at': row[3]} + for row in rows + ] diff --git a/llm_analyzer.py b/llm_analyzer.py new file mode 100644 index 0000000..365e8e0 --- /dev/null +++ b/llm_analyzer.py @@ -0,0 +1,163 @@ +""" +大模型分析模块 - 调用LLM API分析评论情感 +""" +import json +import time +import re +from typing import Dict, Optional, Tuple +from openai import OpenAI, OpenAIError + + +class LLMAnalyzer: + """大模型情感分析器""" + + SYSTEM_PROMPT = """你是一个专业的情感分析助手。你的任务是分析股吧/论坛评论的情感倾向,判断投资者对该股票的态度。 + +评分规则: +- 0-30: 极度悲观/看空(利空、暴跌、绝望等情绪) +- 31-50: 偏悲观/中性(担忧、谨慎、观望等情绪) +- 51-70: 偏乐观/中性(看好、希望、期待等情绪) +- 71-100: 极度乐观/看涨(利好、暴涨、兴奋等情绪) + +请直接输出一个JSON格式的结果,包含两个字段: +- score: 0-100的整数评分 +- label: 简短的态度描述(如"强烈看跌"、"谨慎观望"、"温和看涨"、"强烈看涨"等) + +注意: +1. 只返回JSON,不要有其他文字 +2. 如果无法判断,返回50和"无法判断" +3. 分析要客观,不要被表面文字迷惑 +""" + + def __init__(self, config: Dict): + self.config = config + self.base_url = config.get('base_url', 'https://api.openai.com/v1') + self.api_key = config.get('api_key', '') + self.model = config.get('model', 'gpt-3.5-turbo') + self.timeout = config.get('timeout', 30) + self.retry_times = config.get('retry_times', 3) + + self.client = None + if self.api_key: + self._init_client() + + def _init_client(self): + """初始化OpenAI客户端""" + try: + self.client = OpenAI( + api_key=self.api_key, + base_url=self.base_url, + timeout=self.timeout + ) + except Exception as e: + print(f"初始化LLM客户端失败: {e}") + + def update_config(self, config: Dict): + """更新配置""" + self.config.update(config) + self.base_url = config.get('base_url', self.base_url) + self.api_key = config.get('api_key', self.api_key) + self.model = config.get('model', self.model) + self.timeout = config.get('timeout', self.timeout) + self.retry_times = config.get('retry_times', self.retry_times) + + if self.api_key: + self._init_client() + + def analyze(self, comment: str) -> Tuple[Optional[int], Optional[str]]: + """ + 分析单条评论 + 返回 (score, label) + """ + if not self.client: + return None, "LLM未配置" + + if not comment or not comment.strip(): + return None, "评论为空" + + for attempt in range(self.retry_times): + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[ + {"role": "system", "content": self.SYSTEM_PROMPT}, + {"role": "user", "content": f"请分析以下评论的情感倾向:\n\n{comment}"} + ], + temperature=0.3, + max_tokens=200 + ) + + result_text = response.choices[0].message.content.strip() + score, label = self._parse_response(result_text) + + if score is not None: + return score, label + + except OpenAIError as e: + print(f"API调用失败 (尝试 {attempt + 1}/{self.retry_times}): {e}") + if attempt < self.retry_times - 1: + time.sleep(2 ** attempt) # 指数退避 + except Exception as e: + print(f"分析过程出错: {e}") + break + + return None, "分析失败" + + def _parse_response(self, response: str) -> Tuple[Optional[int], Optional[str]]: + """解析LLM返回的结果""" + try: + # 尝试直接解析JSON + result = json.loads(response) + score = result.get('score', 50) + label = result.get('label', '无法判断') + + # 验证分数范围 + score = max(0, min(100, int(score))) + + return score, label + + except json.JSONDecodeError: + # 尝试从文本中提取 + pass + + # 尝试从文本中提取数字 + numbers = re.findall(r'\b(\d{1,3})\b', response) + if numbers: + score = int(numbers[0]) + score = max(0, min(100, score)) + + # 提取标签 + label_match = re.search(r'["']([^"']+)["']', response) + if label_match: + label = label_match.group(1) + else: + label = response.split('\n')[0][:20] if response else '无法判断' + + return score, label + + return None, "解析失败" + + def analyze_batch(self, comments: list, delay: float = 1.0) -> list: + """ + 批量分析评论 + delay: 每次调用之间的延迟(秒) + """ + results = [] + + for i, comment in enumerate(comments): + print(f"分析评论 {i + 1}/{len(comments)}...") + score, label = self.analyze(comment) + results.append({ + 'content': comment, + 'score': score, + 'label': label + }) + + if delay > 0 and i < len(comments) - 1: + time.sleep(delay) + + return results + + def is_configured(self) -> bool: + """检查是否已配置""" + return bool(self.client and self.api_key) \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..6b26198 --- /dev/null +++ b/main.py @@ -0,0 +1,201 @@ +""" +主程序入口 - 股吧人气指示器 +""" +import sys +import logging +import time +from datetime import datetime +from PySide6.QtWidgets import QApplication +from PySide6.QtCore import QTimer, Signal, QObject + +from config_manager import ConfigManager +from database import DatabaseManager +from spider import SpiderManager +from llm_analyzer import LLMAnalyzer +from main_window import MainWindow + + +class BackendWorker(QObject): + """后台工作器 - 处理爬取和分析任务""" + + fetch_finished = Signal(list) + analysis_finished = Signal(float) + error_occurred = Signal(str) + status_update = Signal(str) + + def __init__(self, config_manager: ConfigManager, db_manager: DatabaseManager, + spider: SpiderManager, analyzer: LLMAnalyzer): + super().__init__() + self.config = config_manager + self.db = db_manager + self.spider = spider + self.analyzer = analyzer + self.running = False + self.last_fetch_time = 0 + self.fetch_interval = 60 # 默认60秒 + self.no_new_content_count = 0 # 无新内容计数 + + def start(self): + """启动后台任务""" + self.running = True + self._run_cycle() + + def stop(self): + """停止后台任务""" + self.running = False + + def _run_cycle(self): + """运行一个周期""" + if not self.running: + return + + try: + # 1. 爬取评论 + self.status_update.emit("正在爬取评论...") + comments = self.spider.fetch() + + if not comments: + self.no_new_content_count += 1 + interval = self.fetch_interval * (1 + min(self.no_new_content_count * 0.5, 2)) + self.status_update.emit(f"无新内容,{int(interval)}秒后重试...") + QTimer.singleShot(int(interval * 1000), self._run_cycle) + return + + # 2. 写入数据库 + self.status_update.emit(f"获取到 {len(comments)} 条评论...") + new_ids = self.db.add_comments_batch(comments) + + if new_ids: + self.no_new_content_count = 0 + self.status_update.emit(f"新增 {len(new_ids)} 条评论") + + # 3. 获取未分析评论并分析 + unanalyzed = self.db.get_unanalyzed_comments(limit=10) + + if unanalyzed: + self.status_update.emit(f"开始分析 {len(unanalyzed)} 条评论...") + self._analyze_comments(unanalyzed) + else: + self.no_new_content_count += 1 + + # 4. 更新指示器 + self._update_indicator() + + except Exception as e: + self.error_occurred.emit(f"运行错误: {str(e)}") + + # 安排下一次执行 + if self.running: + interval = self.fetch_interval * (1 + min(self.no_new_content_count * 0.5, 2)) + QTimer.singleShot(int(interval * 1000), self._run_cycle) + + def _analyze_comments(self, comments): + """分析评论""" + for i, comment in enumerate(comments): + if not self.running: + break + + try: + self.status_update.emit(f"分析 {i+1}/{len(comments)}...") + score, label = self.analyzer.analyze(comment['content']) + + if score is not None: + self.db.mark_analyzed(comment['id'], score, label) + time.sleep(1.0) # 延迟,避免API限流 + else: + self.db.mark_analyzed(comment['id'], 50, "无法判断") + + except Exception as e: + self.error_occurred.emit(f"分析失败: {str(e)}") + self.db.mark_analyzed(comment['id'], 50, "分析异常") + + def _update_indicator(self): + """更新指示器显示""" + scores = self.db.get_all_scores() + + if not scores: + return + + # 计算平均分 + avg_score = sum(scores) / len(scores) + + # 根据阈值确定标签 + thresholds = self.config.get('ui', 'thresholds', default={'cold': 30, 'warm': 70}) + cold = thresholds.get('cold', 30) + warm = thresholds.get('warm', 70) + + if avg_score < cold: + label = "看跌" + elif avg_score > warm: + label = "看涨" + else: + label = "中性" + + self.analysis_finished.emit(avg_score) + + def manual_refresh(self): + """手动刷新""" + self.no_new_content_count = 0 + self._run_cycle() + + def update_fetch_interval(self, interval: int): + """更新爬取间隔""" + self.fetch_interval = interval + + +def setup_logging(log_path: str, level: str = "INFO"): + """配置日志""" + logging.basicConfig( + level=getattr(logging, level.upper(), logging.INFO), + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(log_path, encoding='utf-8'), + logging.StreamHandler() + ] + ) + + +def main(): + """主函数""" + # 创建应用 + app = QApplication(sys.argv) + app.setQuitOnLastWindowClosed(False) # 允许最小化到托盘 + + # 加载配置 + config = ConfigManager("config.json") + + # 配置日志 + log_config = config.logging_config + setup_logging(log_config.get('path', 'guba.log'), log_config.get('level', 'INFO')) + + # 初始化组件 + db = DatabaseManager(config.database_config.get('path', 'guba.db')) + spider = SpiderManager(config.spider_config) + analyzer = LLMAnalyzer(config.llm_api_config) + + # 创建后台工作器 + worker = BackendWorker(config, db, spider, analyzer) + worker.update_fetch_interval(config.spider_config.get('fetch_interval', 60)) + + # 创建主窗口 + window = MainWindow(config) + window.show() + + # 连接信号 + worker.status_update.connect(window.update_status) + worker.analysis_finished.connect(window.update_indicator) + worker.error_occurred.connect(lambda msg: window.show_message("错误", msg)) + + # 设置按钮回调 + window.set_refresh_callback(worker.manual_refresh) + window.set_config_callback(window.show_config) + + # 启动后台任务 + worker.start() + + # 运行应用 + sys.exit(app.exec()) + + +if __name__ == "__main__": + main() diff --git a/main_window.py b/main_window.py new file mode 100644 index 0000000..dd679a2 --- /dev/null +++ b/main_window.py @@ -0,0 +1,362 @@ +""" +PySide6 GUI界面模块 +""" +from PySide6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel, + QPushButton, QSlider, QDialog, QFormLayout, + QLineEdit, QSpinBox, QMessageBox, QSystemTrayIcon, + QMenu, QTextEdit, QGroupBox, QDialogButtonBox) +from PySide6.QtCore import Qt, QTimer, Signal, QPoint +from PySide6.QtGui import QFont, QColor, QPainter, QBrush, QPen, QIcon, QAction +from typing import Callable, Optional + + +class SentimentIndicator(QWidget): + """情感指示灯组件""" + + def __init__(self, parent=None): + super().__init__(parent) + self.score = 50 + self.label_text = "中性" + self.setMinimumSize(100, 100) + + def set_value(self, score: int, label: str = None): + """设置数值和标签""" + self.score = max(0, min(100, score)) + if label: + self.label_text = label + self.update() + + def paintEvent(self, event): + """绘制指示灯""" + painter = QPainter(self) + painter.setRenderHint(QPainter.Antialiasing) + + center = self.rect().center() + radius = min(self.width(), self.height()) // 2 - 10 + + # 根据分数确定颜色 + color = self._get_color(self.score) + + # 绘制外圈 + painter.setPen(QPen(QColor(100, 100, 100), 2)) + painter.setBrush(QBrush(QColor(30, 30, 30))) + painter.drawEllipse(center, radius, radius) + + # 绘制内圈(渐变效果) + gradient_color = QColor(color) + painter.setPen(Qt.NoPen) + painter.setBrush(QBrush(gradient_color)) + painter.drawEllipse(center, radius - 4, radius - 4) + + # 绘制发光效果 + for i in range(3, 0, -1): + glow_color = QColor(color) + glow_color.setAlpha(50 // i) + painter.setBrush(QBrush(glow_color)) + painter.drawEllipse(center, radius - i * 5, radius - i * 5) + + def _get_color(self, score: int) -> QColor: + """根据分数获取颜色""" + if score < 30: + # 冷色系 - 蓝色/青色 + ratio = score / 30 + return QColor(int(0 + 100 * ratio), int(150 + 50 * ratio), 255) + elif score < 70: + # 中性 - 灰色/绿色 + if score < 50: + ratio = (score - 30) / 20 + return QColor(int(100 + 50 * ratio), int(200 + 20 * ratio), int(200 - 50 * ratio)) + else: + ratio = (score - 50) / 20 + return QColor(int(150 + 50 * ratio), int(220 - 20 * ratio), int(150 - 50 * ratio)) + else: + # 暖色系 - 橙色/红色 + ratio = (score - 70) / 30 + return QColor(255, int(200 - 100 * ratio), int(50 + 50 * ratio)) + + def get_description(self, score: int) -> str: + """获取描述文本""" + if score < 20: + return "极度看跌" + elif score < 40: + return "偏悲观" + elif score < 60: + return "中性" + elif score < 80: + return "偏乐观" + else: + return "极度看涨" + + +class ConfigDialog(QDialog): + """配置对话框""" + + def __init__(self, config_manager, parent=None): + super().__init__(parent) + self.config_manager = config_manager + self.setWindowTitle("配置") + self.setMinimumWidth(400) + self._init_ui() + + def _init_ui(self): + layout = QFormLayout(self) + + # LLM API 配置 + llm_config = self.config_manager.llm_api_config + + self.base_url_edit = QLineEdit(llm_config.get('base_url', '')) + self.api_key_edit = QLineEdit(llm_config.get('api_key', '')) + self.api_key_edit.setEchoMode(QLineEdit.Password) + self.model_edit = QLineEdit(llm_config.get('model', '')) + self.timeout_spin = QSpinBox() + self.timeout_spin.setRange(10, 300) + self.timeout_spin.setValue(llm_config.get('timeout', 30)) + + layout.addRow("API Base URL:", self.base_url_edit) + layout.addRow("API Key:", self.api_key_edit) + layout.addRow("Model:", self.model_edit) + layout.addRow("Timeout (s):", self.timeout_spin) + + # 爬虫配置 + spider_config = self.config_manager.spider_config + + self.url_edit = QLineEdit(spider_config.get('target_url', '')) + self.xpath_edit = QLineEdit(spider_config.get('xpath', '')) + self.user_agent_edit = QLineEdit(spider_config.get('user_agent', '')) + self.interval_spin = QSpinBox() + self.interval_spin.setRange(10, 3600) + self.interval_spin.setValue(spider_config.get('fetch_interval', 60)) + + layout.addRow("目标URL:", self.url_edit) + layout.addRow("XPath:", self.xpath_edit) + layout.addRow("User Agent:", self.user_agent_edit) + layout.addRow("刷新间隔(s):", self.interval_spin) + + # UI 配置 + ui_config = self.config_manager.ui_config + + self.opacity_slider = QSlider(Qt.Horizontal) + self.opacity_slider.setRange(30, 100) + self.opacity_slider.setValue(int(ui_config.get('opacity', 0.9) * 100)) + self.ontop_check = QCheckBox() if hasattr(self, 'QCheckBox') else None + # 使用 QPushButton 替代 QCheckBox + self.ontop_btn = QPushButton("置顶") + self.ontop_btn.setCheckable(True) + self.ontop_btn.setChecked(ui_config.get('is_on_top', True)) + + layout.addRow("透明度:", self.opacity_slider) + layout.addRow("窗口置顶:", self.ontop_btn) + + # 阈值配置 + thresholds = ui_config.get('thresholds', {}) + self.cold_spin = QSpinBox() + self.cold_spin.setRange(0, 50) + self.cold_spin.setValue(thresholds.get('cold', 30)) + self.warm_spin = QSpinBox() + self.warm_spin.setRange(50, 100) + self.warm_spin.setValue(thresholds.get('warm', 70)) + + layout.addRow("寒冷阈值:", self.cold_spin) + layout.addRow("温暖阈值:", self.warm_spin) + + # 按钮 + button_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel) + button_box.accepted.connect(self._save_config) + button_box.rejected.connect(self.reject) + layout.addRow(button_box) + + def _save_config(self): + """保存配置""" + # LLM API + self.config_manager.update_llm_api( + base_url=self.base_url_edit.text(), + api_key=self.api_key_edit.text(), + model=self.model_edit.text(), + timeout=self.timeout_spin.value() + ) + + # 爬虫 + self.config_manager.update_spider( + target_url=self.url_edit.text(), + xpath=self.xpath_edit.text(), + user_agent=self.user_agent_edit.text(), + fetch_interval=self.interval_spin.value() + ) + + # UI + self.config_manager.update_ui( + opacity=self.opacity_slider.value() / 100.0, + is_on_top=self.ontop_btn.isChecked(), + cold_threshold=self.cold_spin.value(), + warm_threshold=self.warm_spin.value() + ) + + self.accept() + + +class MainWindow(QWidget): + """主窗口""" + + def __init__(self, config_manager, parent=None): + super().__init__(parent) + self.config_manager = config_manager + self.setWindowTitle("股吧人气指示器") + self._init_ui() + self._apply_config() + + # 拖拽相关 + self.dragging = False + self.drag_position = QPoint() + + # 系统托盘 + self._init_tray_icon() + + def _init_ui(self): + """初始化UI""" + layout = QVBoxLayout(self) + layout.setContentsMargins(10, 10, 10, 10) + + # 标题 + self.title_label = QLabel("股吧人气") + self.title_label.setAlignment(Qt.AlignCenter) + title_font = QFont() + title_font.setPointSize(14) + title_font.setBold(True) + self.title_label.setFont(title_font) + + # 指示灯 + self.indicator = SentimentIndicator() + self.score_label = QLabel("50 - 中性") + self.score_label.setAlignment(Qt.AlignCenter) + + # 状态信息 + self.status_label = QLabel("等待数据...") + self.status_label.setAlignment(Qt.AlignCenter) + status_font = QFont() + status_font.setPointSize(10) + self.status_label.setFont(status_font) + + # 按钮 + btn_layout = QHBoxLayout() + self.refresh_btn = QPushButton("刷新") + self.config_btn = QPushButton("配置") + btn_layout.addWidget(self.refresh_btn) + btn_layout.addWidget(self.config_btn) + + # 添加到主布局 + layout.addWidget(self.title_label) + layout.addWidget(self.indicator) + layout.addWidget(self.score_label) + layout.addWidget(self.status_label) + layout.addLayout(btn_layout) + + # 设置窗口标志(无边框、可拖拽) + self.setWindowFlags(Qt.FramelessWindowHint | Qt.WindowStaysOnTopHint) + self.setAttribute(Qt.WA_TranslucentBackground) + + def _init_tray_icon(self): + """初始化系统托盘""" + self.tray_icon = QSystemTrayIcon(self) + self.tray_icon.setToolTip("股吧人气指示器") + + # 创建托盘菜单 + tray_menu = QMenu() + show_action = QAction("显示", self) + hide_action = QAction("隐藏", self) + quit_action = QAction("退出", self) + + show_action.triggered.connect(self.show) + hide_action.triggered.connect(self.hide) + quit_action.triggered.connect(self.quit_app) + + tray_menu.addAction(show_action) + tray_menu.addAction(hide_action) + tray_menu.addAction(quit_action) + + self.tray_icon.setContextMenu(tray_menu) + self.tray_icon.show() + + def quit_app(self): + """退出应用""" + self.close() + import sys + sys.exit(0) + + def _apply_config(self): + """应用配置""" + ui_config = self.config_manager.ui_config + self.setWindowOpacity(ui_config.get('opacity', 0.9)) + + if ui_config.get('is_on_top', True): + self.setWindowFlags(self.windowFlags() | Qt.WindowStaysOnTopHint) + else: + self.setWindowFlags(self.windowFlags() & ~Qt.WindowStaysOnTopHint) + + thresholds = ui_config.get('thresholds', {}) + + def mousePressEvent(self, event): + """鼠标按下事件""" + if event.button() == Qt.LeftButton: + self.dragging = True + self.drag_position = event.globalPosition().toPoint() - self.frameGeometry().topLeft() + + def mouseMoveEvent(self, event): + """鼠标移动事件""" + if self.dragging: + self.move(event.globalPosition().toPoint() - self.drag_position) + + def mouseReleaseEvent(self, event): + """鼠标释放事件""" + if event.button() == Qt.LeftButton: + self.dragging = False + + def contextMenuEvent(self, event): + """右键菜单""" + context_menu = QMenu(self) + config_action = QAction("配置", self) + opacity_action = QAction("透明度", self) + quit_action = QAction("退出", self) + + config_action.triggered.connect(self.show_config) + quit_action.triggered.connect(self.quit_app) + + context_menu.addAction(config_action) + context_menu.addAction(quit_action) + context_menu.exec(event.globalPosition().toPoint()) + + def show_config(self): + """显示配置对话框""" + dialog = ConfigDialog(self.config_manager, self) + if dialog.exec() == QDialog.Accepted: + self._apply_config() + + def update_indicator(self, score: int, label: str = None): + """更新指示灯""" + if label is None: + label = self.indicator.get_description(score) + self.indicator.set_value(score, label) + self.score_label.setText(f"{score} - {label}") + + def update_status(self, text: str): + """更新状态""" + self.status_label.setText(text) + + def set_refresh_callback(self, callback: Callable): + """设置刷新按钮回调""" + self.refresh_btn.clicked.connect(callback) + + def set_config_callback(self, callback: Callable): + """设置配置按钮回调""" + self.config_btn.clicked.connect(callback) + + def show_message(self, title: str, message: str, icon=QMessageBox.Information): + """显示消息""" + QMessageBox.information(self, title, message) + + +class QCheckBox(QPushButton): + """自定义复选框""" + def __init__(self, text=""): + super().__init__(text) + self.setCheckable(True) + self.setChecked(False) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..62917cf --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +PySide6>=6.6.0 +requests>=2.31.0 +beautifulsoup4>=4.12.0 +lxml>=4.9.0 +openai>=1.0.0 diff --git a/spider.py b/spider.py new file mode 100644 index 0000000..03fb0d6 --- /dev/null +++ b/spider.py @@ -0,0 +1,130 @@ +""" +爬虫模块 - 网站评论抓取 +""" +import requests +from lxml import etree +import time +from typing import List, Dict, Optional +from urllib.parse import urljoin +from bs4 import BeautifulSoup +import random + + +class SpiderManager: + """爬虫管理器""" + + def __init__(self, config: Dict): + self.config = config + self.session = requests.Session() + self.session.headers.update({ + 'User-Agent': config.get('user_agent', 'Mozilla/5.0') + }) + self.retry_times = config.get('retry_times', 3) + self.retry_interval = config.get('retry_interval', 5) + + def fetch(self, url: str = None, xpath: str = None) -> List[Dict]: + """ + 抓取网页评论 + 返回评论列表,每个元素包含 content 和 url + """ + target_url = url or self.config.get('target_url', '') + target_xpath = xpath or self.config.get('xpath', '') + + if not target_url: + return [] + + html = self._fetch_with_retry(target_url) + if not html: + return [] + + return self._parse_comments(html, target_xpath, target_url) + + def _fetch_with_retry(self, url: str, max_retries: int = None) -> Optional[str]: + """带重试的网页获取""" + max_retries = max_retries or self.retry_times + + for attempt in range(max_retries): + try: + response = self.session.get(url, timeout=30) + response.raise_for_status() + response.encoding = response.apparent_encoding + return response.text + except requests.RequestException as e: + print(f"请求失败 (尝试 {attempt + 1}/{max_retries}): {e}") + if attempt < max_retries - 1: + time.sleep(self.retry_interval + random.uniform(0, 2)) + else: + return None + return None + + def _parse_comments(self, html: str, xpath: str, base_url: str) -> List[Dict]: + """解析评论""" + comments = [] + + try: + # 使用 lxml 解析 + tree = etree.HTML(html) + elements = tree.xpath(xpath) + + for elem in elements: + try: + text = elem.text_content().strip() + if text: + # 获取链接的 href(如果存在) + href = elem.get('href') + full_url = urljoin(base_url, href) if href else base_url + + comments.append({ + 'content': text, + 'url': full_url + }) + except Exception as e: + print(f"解析元素失败: {e}") + continue + + except Exception as e: + print(f"XPath解析失败: {e}") + # 备选解析方法 + comments = self._fallback_parse(html, base_url) + + return comments + + def _fallback_parse(self, html: str, base_url: str) -> List[Dict]: + """备选解析方法 - 使用 BeautifulSoup""" + comments = [] + try: + soup = BeautifulSoup(html, 'lxml') + + # 尝试查找常见的评论元素 + # 这里可以根据实际网站结构调整选择器 + elements = soup.find_all(['a', 'div', 'p', 'span'], class_=lambda x: x and 'linkblack' in x if x else False) + + for elem in elements[:50]: # 限制数量 + text = elem.get_text().strip() + if text and len(text) > 5: + comments.append({ + 'content': text, + 'url': base_url + }) + except Exception as e: + print(f"备选解析失败: {e}") + + return comments + + def set_user_agent(self, user_agent: str): + """更新User-Agent""" + self.session.headers.update({'User-Agent': user_agent}) + + def update_config(self, config: Dict): + """更新配置""" + self.config.update(config) + if 'user_agent' in config: + self.set_user_agent(config['user_agent']) + if 'retry_times' in config: + self.retry_times = config['retry_times'] + if 'retry_interval' in config: + self.retry_interval = config['retry_interval'] + + def get_fetch_interval(self) -> int: + """获取爬取间隔""" + return self.config.get('fetch_interval', 60) diff --git a/需求.txt b/需求.txt new file mode 100644 index 0000000..ac4f4f6 --- /dev/null +++ b/需求.txt @@ -0,0 +1,12 @@ +开发一个基于 Python + PySide6 的桌面悬浮小工具,核心功能是:爬取指定网站的评论(定期刷新、去重),调用大模型 API 分析评论的 “人气冷暖”(返回 0-100 的数值,0 为极冷、100 为极暖),并通过桌面指示灯可视化展示;工具界面包含标题、冷暖指示灯、配置按钮,配置项需支持修改大模型 API、调整界面透明度,爬取的网站的地址的配置,爬取内容的xpath配置。 + +配置持久化 配置(API、透明度、爬取规则等)保存到本地 JSON 文件,避免每次启动重新配置 +窗口交互 悬浮窗口可拖拽位置、置顶显示、最小化 / 关闭选项(桌面工具必备) +爬取增强 评论去重(哈希校验)、爬取失败重试、爬取频率 / UA 配置、反爬基础适配 +大模型交互 API 超时 / 重试、响应格式校验(确保返回 0-100 数值)、多模型请求格式适配 +可视化增强 指示灯显示具体数值(如 “88 - 强烈看好”)、冷暖阈值可自定义(冷 / 中性 / 暖分界) +异常处理 爬取 / API 调用失败的弹窗提示、日志记录、错误兜底(如默认显示 “中性”) + +如果爬取的内容已经存在,则增大刷新时间。 + +比如//a[contains(@class, "linkblack")]返回是列表,先写入当前的sqlite数据库,再调用大模型一条条分析(加一个延迟),再存入当前的sqlite数据库。 \ No newline at end of file