Files
guba-indicator/llm_analyzer.py
xiaji 9d33a8e179 feat: 添加上证所截图功能并优化股票数据获取
- 新增上证所网页元素截图功能,使用Playwright实现
- 优化股票数据获取方式,改用新浪财经JS接口
- 调整情感分析评分规则为7级分类
- 添加截图显示组件到主界面
- 更新依赖项,替换playwright为selenium
2026-01-13 17:06:18 +08:00

226 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
大模型分析模块 - 调用LLM API分析评论情感
支持 OpenAI 兼容 API包括 NVIDIA API
"""
import json
import time
import re
from typing import Dict, Optional, Tuple, Any
from openai import OpenAI
from loguru import logger
class LLMAnalyzer:
"""大模型情感分析器"""
SYSTEM_PROMPT = """你是一个专业的情感分析助手。你的任务是分析股吧/论坛评论的情感倾向,判断投资者对该股票的态度。
评分规则:
- 0-30: 极度悲观(利空、暴跌、绝望等情绪)
- 30-39: 悲观(看空、担忧、谨慎等情绪)
- 39-45: 偏悲观(谨慎观望、保守等情绪)
- 45-55: 中立(观望、客观等情绪)
- 55-65: 偏乐观(看好、希望等情绪)
- 65-70: 乐观(看涨、信心等情绪)
- 70-100: 极度乐观(利好、暴涨、兴奋等情绪)
请直接输出一个JSON格式的结果包含两个字段
- score: 0-100的整数评分
- label: 简短的态度描述(如"极度悲观""悲观""偏悲观""中立""偏乐观""乐观""极度乐观"
注意:
1. 只返回JSON不要有其他文字
2. 如果无法判断返回50和"无法判断"
3. 分析要客观,不要被表面文字迷惑
"""
def __init__(self, config: Dict):
self.config = config
self.base_url = config.get('base_url', '')
self.api_key = config.get('api_key', '')
self.model = config.get('model', '')
self.timeout = config.get('timeout', 120)
self.retry_times = config.get('retry_times', 3)
self.client = None
self.last_result = None # 保存最后一次分析结果
logger.info(f"LLM分析器配置 - base_url: {self.base_url}, model: {self.model}, timeout: {self.timeout}s, retry: {self.retry_times}")
if self.base_url and self.api_key:
self._init_client()
else:
logger.warning("LLM API 未配置base_url 或 api_key 为空")
def _init_client(self):
"""初始化OpenAI客户端"""
try:
logger.info(f"初始化LLM客户端: {self.base_url}")
self.client = OpenAI(
api_key=self.api_key,
base_url=self.base_url,
timeout=self.timeout
)
logger.info("LLM客户端初始化成功")
except Exception as e:
logger.error(f"初始化LLM客户端失败: {e}")
def update_config(self, config: Dict):
"""更新配置"""
self.config.update(config)
self.base_url = config.get('base_url', self.base_url)
self.api_key = config.get('api_key', self.api_key)
self.model = config.get('model', self.model)
self.timeout = config.get('timeout', self.timeout)
self.retry_times = config.get('retry_times', self.retry_times)
if self.base_url and self.api_key:
self._init_client()
def analyze(self, comment: str) -> Tuple[Optional[int], Optional[str]]:
"""
分析单条评论
返回 (score, label)
"""
if not self.client:
logger.error("LLM客户端未初始化请检查API配置")
return None, "LLM未配置"
if not comment or not comment.strip():
logger.warning("评论内容为空")
return None, "评论为空"
logger.debug(f"开始分析评论: {comment[:50]}...")
logger.debug(f"使用模型: {self.model}, 超时设置: {self.timeout}")
for attempt in range(self.retry_times):
try:
logger.info(f"API调用尝试 {attempt + 1}/{self.retry_times}")
logger.debug(f"发送请求到 {self.base_url}")
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": f"请分析以下评论的情感倾向:\n\n{comment}"}
],
temperature=0.3,
max_tokens=500,
timeout=self.timeout
)
# 处理 deepseek-r1 的特殊结构(可能有 reasoning_content
message = response.choices[0].message
# 获取推理过程(如果有)
reasoning = getattr(message, 'reasoning_content', None)
if reasoning:
logger.debug(f"推理过程: {reasoning[:100]}...")
# 获取最终回答
result_text = message.content.strip() if message.content else ""
logger.debug(f"API返回原始内容: {result_text[:100]}...")
score, label = self._parse_response(result_text)
# 保存最后结果
self.last_result = {
'score': score,
'label': label,
'reasoning': reasoning,
'raw_response': result_text
}
if score is not None:
logger.info(f"分析完成: {score}分 - {label}")
return score, label
except Exception as e:
logger.warning(f"API调用失败 (尝试 {attempt + 1}/{self.retry_times}): {type(e).__name__}: {e}")
logger.debug(f"错误详情: {str(e)}")
if attempt < self.retry_times - 1:
wait_time = 2 ** attempt
logger.info(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time) # 指数退避
logger.error(f"所有 {self.retry_times} 次重试均失败")
return None, "分析失败"
def _parse_response(self, response: str) -> Tuple[Optional[int], Optional[str]]:
"""解析LLM返回的结果"""
try:
# 尝试直接解析JSON
result = json.loads(response)
score = result.get('score', 50)
label = result.get('label', '无法判断')
# 验证分数范围
score = max(0, min(100, int(score)))
logger.debug(f"JSON解析成功: {score} - {label}")
return score, label
except json.JSONDecodeError:
logger.debug("JSON解析失败尝试文本提取")
# 尝试从文本中提取
numbers = re.findall(r'\b(\d{1,3})\b', response)
if numbers:
score = int(numbers[0])
score = max(0, min(100, score))
# 提取标签
label_match = re.search(r'"([^"]+)"', response)
if label_match:
label = label_match.group(1)
else:
label = response.split('\n')[0][:20] if response else '无法判断'
logger.debug(f"文本提取成功: {score} - {label}")
return score, label
logger.warning("无法解析响应")
return None, "解析失败"
def get_last_result(self) -> Optional[Dict[str, Any]]:
"""获取最后一次分析结果"""
return self.last_result
def analyze_batch(self, comments: list, delay: float = 1.0) -> list:
"""
批量分析评论
delay: 每次调用之间的延迟(秒)
"""
logger.info(f"开始批量分析 {len(comments)} 条评论,每次间隔 {delay}")
results = []
success_count = 0
fail_count = 0
for i, comment in enumerate(comments):
logger.info(f"正在分析第 {i + 1}/{len(comments)} 条评论")
score, label = self.analyze(comment)
if score is not None:
success_count += 1
logger.debug(f"{i + 1} 条评论分析成功: {score}分 - {label}")
else:
fail_count += 1
logger.warning(f"{i + 1} 条评论分析失败: {label}")
results.append({
'content': comment,
'score': score,
'label': label
})
if delay > 0 and i < len(comments) - 1:
logger.debug(f"等待 {delay} 秒后继续...")
time.sleep(delay)
logger.info(f"批量分析完成,成功 {success_count} 条,失败 {fail_count}")
return results
def is_configured(self) -> bool:
"""检查是否已配置"""
return bool(self.client and self.api_key)