Files
guba-indicator/main.py
xiaji 3457a2d5f0 refactor: 移除交易时间检查逻辑并优化数据获取流程
- 删除 SpiderManager 和 WaveformWidget 中的 is_trading_time 方法及相关检查
- 在 BackendWorker 中添加截图获取步骤
- 更新波形图显示逻辑,移除非交易时间相关提示
2026-01-16 16:51:26 +08:00

440 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
主程序入口 - 股吧人气指示器
"""
import sys
import time
from datetime import datetime
from PySide6.QtWidgets import QApplication
from PySide6.QtCore import QTimer, Signal, QObject, QThread
from loguru import logger
from config_manager import ConfigManager
from database import DatabaseManager
from spider import SpiderManager
from llm_analyzer import LLMAnalyzer
from main_window import MainWindow
class ScreenshotWorker(QObject):
"""截图工作器 - 专门处理上证所截图"""
sse_screenshot_fetched = Signal(str)
error_occurred = Signal(str)
def __init__(self, spider: SpiderManager):
super().__init__()
self.spider = spider
self.running = False
logger.info("ScreenshotWorker 初始化完成")
def start(self):
"""启动截图任务"""
self.running = True
logger.info("截图任务已启动")
def stop(self):
"""停止截图任务"""
self.running = False
logger.info("截图任务已停止")
def do_fetch(self):
"""执行截图获取"""
if not self.running:
return
try:
logger.info("开始爬取上证所网页截图")
screenshot_path = self.spider.fetch_sse_screenshot()
if screenshot_path:
logger.info(f"成功获取截图: {screenshot_path}")
self.sse_screenshot_fetched.emit(screenshot_path)
else:
logger.warning("未能获取有效的截图")
except Exception as e:
logger.error(f"爬取截图失败: {str(e)}")
self.error_occurred.emit(f"截图失败: {str(e)}")
class BackendWorker(QObject):
"""后台工作器 - 处理爬取和分析任务"""
fetch_finished = Signal(list)
analysis_finished = Signal(float)
analysis_result = Signal(dict) # 传递分析结果详情
error_occurred = Signal(str)
status_update = Signal(str)
stock_data_fetched = Signal(str, float) # 股票数据获取信号
sse_screenshot_fetched = Signal(str) # 上证所截图获取信号
def __init__(self, config_manager: ConfigManager, db_manager: DatabaseManager,
spider: SpiderManager, analyzer: LLMAnalyzer):
super().__init__()
self.config = config_manager
self.db = db_manager
self.spider = spider
self.analyzer = analyzer
self.running = False
self.last_fetch_time = 0
self.fetch_interval = 15 # 默认15秒
self.no_new_content_count = 0 # 无新内容计数
self.is_running_cycle = False # 防止并发执行
self.fetch_count = 0 # 爬取次数统计
self.analysis_count = 0 # API分析次数统计
logger.info("BackendWorker 初始化完成")
def start(self):
"""启动后台任务"""
self.running = True
logger.info("后台任务已启动")
# 启动时立即执行第一次任务
QTimer.singleShot(1000, self._run_cycle)
def stop(self):
"""停止后台任务"""
self.running = False
logger.info("后台任务已停止")
def _run_cycle(self):
"""运行一个周期"""
if self.is_running_cycle:
logger.debug("上一个周期仍在执行,跳过本次")
return
self.is_running_cycle = True
if not self.running:
self.is_running_cycle = False
return
try:
# 1. 先尝试获取截图
logger.info("开始获取截图")
self.status_update.emit("正在获取截图...")
self.fetch_sse_screenshot()
# 2. 爬取评论
logger.info("开始爬取评论")
self.status_update.emit("正在爬取评论...")
self.fetch_count += 1
comments = self.spider.fetch()
if not comments:
# 爬取失败,使用固定间隔重试,不计入无新内容计数
interval = 15 # 固定15秒重试
logger.warning(f"未获取到新评论,{int(interval)}秒后重试")
self.status_update.emit(f"无新内容,{int(interval)}秒后重试...")
self.is_running_cycle = False
QTimer.singleShot(int(interval * 1000), self._run_cycle)
return
# 2. 写入数据库
logger.info(f"获取到 {len(comments)} 条评论")
self.status_update.emit(f"获取到 {len(comments)} 条评论...")
new_ids = self.db.add_comments_batch(comments)
if new_ids:
self.no_new_content_count = 0
logger.info(f"新增 {len(new_ids)} 条评论到数据库")
self.status_update.emit(f"新增 {len(new_ids)} 条评论")
# 3. 获取未分析评论并分析
unanalyzed = self.db.get_unanalyzed_comments(limit=10)
logger.info(f"获取到 {len(unanalyzed)} 条未分析评论")
if unanalyzed:
self.status_update.emit(f"开始分析 {len(unanalyzed)} 条评论...")
self._analyze_comments(unanalyzed)
else:
self.no_new_content_count += 1
logger.info("评论已存在,未新增")
# 4. 更新指示器
self._update_indicator()
except Exception as e:
logger.error(f"运行错误: {str(e)}")
self.error_occurred.emit(f"运行错误: {str(e)}")
# 安排下一次执行
if self.running:
interval = self.fetch_interval * (1 + min(self.no_new_content_count * 1.0, 4.0))
logger.debug(f"下次执行将在 {int(interval)} 秒后")
self.is_running_cycle = False
QTimer.singleShot(int(interval * 1000), self._run_cycle)
else:
self.is_running_cycle = False
def _analyze_comments(self, comments):
"""分析评论"""
logger.info(f"开始分析 {len(comments)} 条评论")
for i, comment in enumerate(comments):
if not self.running:
logger.warning("分析被中断")
break
try:
self.status_update.emit(f"分析 {i+1}/{len(comments)}...")
logger.debug(f"分析第 {i+1} 条评论: {comment['content'][:50]}...")
self.analysis_count += 1
score, label = self.analyzer.analyze(comment['content'])
# 获取分析结果详情
last_result = self.analyzer.get_last_result()
if score is not None:
self.db.mark_analyzed(comment['id'], score, label)
logger.info(f"评论 {comment['id']} 分析完成: {score}分 - {label}")
# 更新状态显示为简洁格式
self.status_update.emit(f"分析 {i+1}/{len(comments)}...返回{score}")
# 每条评论分析完成后立即更新指示器
self._update_indicator()
time.sleep(1.0) # 延迟避免API限流
else:
self.db.mark_analyzed(comment['id'], 50, "无法判断")
logger.warning(f"评论 {comment['id']} 无法判断")
except Exception as e:
logger.error(f"分析评论 {comment.get('id', 'unknown')} 失败: {str(e)}")
self.error_occurred.emit(f"分析失败: {str(e)}")
self.db.mark_analyzed(comment['id'], 50, "分析异常")
def _update_indicator(self):
"""更新指示器显示"""
# 获取最新的100条评论的分数
scores = self.db.get_all_scores(limit=100)
if not scores:
logger.debug("暂无分析分数")
return
# 计算平均分
avg_score = sum(scores) / len(scores)
logger.info(f"当前平均分: {avg_score:.2f} (基于最新的 {len(scores)} 条评论)")
# 根据阈值确定标签
thresholds = self.config.get('ui', 'thresholds', default={'cold': 30, 'warm': 70})
cold = thresholds.get('cold', 30)
warm = thresholds.get('warm', 70)
if avg_score < cold:
label = "看跌"
elif avg_score > warm:
label = "看涨"
else:
label = "中性"
logger.info(f"情感倾向: {label}")
# 发送整数分数
self.analysis_finished.emit(int(avg_score))
def fetch_stock_data(self):
"""爬取股票数据并添加到波形图"""
try:
logger.info("开始爬取股票数据")
stock_data = self.spider.fetch_sse_stock_data()
if stock_data and 'time' in stock_data and 'value' in stock_data:
logger.info(f"成功获取股票数据: {stock_data}")
# 发送股票数据到主窗口
self.stock_data_fetched.emit(stock_data['time'], stock_data['value'])
else:
logger.warning("未能获取有效的股票数据")
except Exception as e:
logger.error(f"爬取股票数据失败: {str(e)}")
def fetch_sse_screenshot(self):
"""爬取上证所网页元素截图"""
try:
logger.info("开始爬取上证所网页截图")
screenshot_path = self.spider.fetch_sse_screenshot()
if screenshot_path:
logger.info(f"成功获取截图: {screenshot_path}")
self.sse_screenshot_fetched.emit(screenshot_path)
else:
logger.warning("未能获取有效的截图")
except Exception as e:
logger.error(f"爬取截图失败: {str(e)}")
def manual_refresh(self):
"""手动刷新"""
logger.info("用户手动刷新")
self.no_new_content_count = 0
if not self.is_running_cycle:
self._run_cycle()
else:
logger.info("上一个周期仍在执行,跳过手动刷新")
def update_fetch_interval(self, interval: int):
"""更新爬取间隔"""
logger.info(f"更新爬取间隔: {interval}")
self.fetch_interval = interval
def setup_logging(log_path: str, level: str = "INFO"):
"""配置日志"""
logger.remove() # 移除默认的处理器
# 确保log_path不为None
if log_path is None:
log_path = 'guba.log'
logger.add(
log_path,
rotation="10 MB",
retention="7 days",
level=level,
encoding="utf-8",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}"
)
# 仅当sys.stdout可用时才添加控制台日志
if sys.stdout is not None:
logger.add(
sys.stdout,
level=level,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"
)
def main():
"""主函数"""
logger.info("=== 股吧人气指示器启动 ===")
# 创建应用
app = QApplication(sys.argv)
app.setQuitOnLastWindowClosed(False) # 允许最小化到托盘
# 加载配置
logger.info("加载配置文件...")
config = ConfigManager("config.json")
# 配置日志
log_config = config.logging_config
log_path = log_config.get('path') or 'guba.log'
log_level = log_config.get('level') or 'INFO'
setup_logging(log_path, log_level)
logger.info(f"日志配置完成: {log_path}, 级别: {log_level}")
# 初始化组件
logger.info("初始化数据库...")
db = DatabaseManager(config.database_config.get('path', 'guba.db'))
logger.info("初始化爬虫...")
spider = SpiderManager(config.spider_config)
logger.info("初始化LLM分析器...")
analyzer = LLMAnalyzer(config.llm_api_config)
# 创建后台工作器
worker = BackendWorker(config, db, spider, analyzer)
worker.update_fetch_interval(config.spider_config.get('fetch_interval', 15))
# 创建后台线程
logger.info("创建后台线程...")
worker_thread = QThread()
worker.moveToThread(worker_thread)
# 线程启动时开始工作
worker_thread.started.connect(worker.start)
# 线程结束时停止工作
worker_thread.finished.connect(worker.stop)
# 创建截图后台线程
logger.info("创建截图后台线程...")
screenshot_worker = ScreenshotWorker(spider)
screenshot_thread = QThread()
screenshot_worker.moveToThread(screenshot_thread)
screenshot_thread.started.connect(screenshot_worker.start)
screenshot_thread.finished.connect(screenshot_worker.stop)
# 创建主窗口
logger.info("创建主窗口...")
window = MainWindow(config, spider)
window.show()
# 连接截图信号在创建window后连接
screenshot_worker.sse_screenshot_fetched.connect(window.update_sse_screenshot)
screenshot_worker.error_occurred.connect(lambda msg: logger.error(msg))
# 连接信号
worker.status_update.connect(window.update_status)
worker.analysis_finished.connect(window.update_indicator)
worker.error_occurred.connect(lambda msg: window.show_message("错误", msg))
worker.stock_data_fetched.connect(window.add_waveform_data)
worker.sse_screenshot_fetched.connect(window.update_sse_screenshot)
# 启动时从数据库初始化指示器显示
worker._update_indicator()
logger.info("初始化指示器显示完成")
# 设置按钮回调
window.set_refresh_callback(worker.manual_refresh)
window.set_config_callback(window.show_config)
# 启动后台线程
worker_thread.start()
logger.info("后台线程已启动")
# 启动截图后台线程
screenshot_thread.start()
logger.info("截图后台线程已启动")
# 启动股票数据爬取定时器
stock_timer = QTimer()
stock_timer.timeout.connect(worker.fetch_stock_data)
stock_timer.start(60000) # 每分钟爬取一次股票数据
logger.info("股票数据爬取定时器已启动间隔60秒")
# 启动上证所截图爬取定时器(使用独立的截图线程,不再阻塞主循环)
screenshot_timer = QTimer()
screenshot_timer.timeout.connect(screenshot_worker.do_fetch)
screenshot_timer.start(60000) # 每1分钟爬取一次截图
logger.info("上证所截图爬取定时器已启动间隔60秒")
# 确保应用退出时清理线程
def cleanup():
logger.info("清理资源,停止后台线程...")
worker.stop()
worker_thread.quit()
worker_thread.wait()
# 停止截图线程
screenshot_worker.stop()
screenshot_thread.quit()
screenshot_thread.wait()
# 显示统计信息
logger.info("=== 程序运行统计 ===")
logger.info(f"爬取网站次数: {worker.fetch_count}")
logger.info(f"提交API分析次数: {worker.analysis_count}")
logger.info("=== 统计结束 ===")
# 写入统计信息到文件
try:
stats_file = "statistics.txt"
with open(stats_file, "a", encoding="utf-8") as f:
f.write(f"=== 程序运行统计 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===\n")
f.write(f"爬取网站次数: {worker.fetch_count}\n")
f.write(f"提交API分析次数: {worker.analysis_count}\n")
f.write("=== 统计结束 ===\n\n")
logger.info(f"统计信息已写入到文件: {stats_file}")
except Exception as e:
logger.error(f"写入统计文件失败: {str(e)}")
logger.info("后台线程已停止")
app.aboutToQuit.connect(cleanup)
logger.info("应用启动完成,进入主循环")
# 运行应用
sys.exit(app.exec())
if __name__ == "__main__":
main()