Files
guba-indicator/llm_analyzer.py
xiaji f256bd0852 refactor: 重构配置管理和截图功能,切换LLM提供商至智谱AI
重构配置管理器以支持打包环境路径处理
将LLM分析器从OpenAI迁移至智谱AI API
替换Playwright截图功能为Selenium实现
更新默认配置中的API端点和模型
2026-01-27 11:11:24 +08:00

227 lines
8.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
大模型分析模块 - 调用LLM API分析评论情感
支持智谱AI API
"""
import json
import time
import re
from typing import Dict, Optional, Tuple, Any
from zai import ZhipuAiClient
from loguru import logger
class LLMAnalyzer:
"""大模型情感分析器"""
SYSTEM_PROMPT = """你是一个专业的情感分析助手。你的任务是分析股吧/论坛评论的情感倾向,判断投资者对该股票的态度。
评分规则:
- 0-30: 极度悲观(利空、暴跌、绝望等情绪)
- 30-39: 悲观(看空、担忧、谨慎等情绪)
- 39-45: 偏悲观(谨慎观望、保守等情绪)
- 45-55: 中立(观望、客观等情绪)
- 55-65: 偏乐观(看好、希望等情绪)
- 65-70: 乐观(看涨、信心等情绪)
- 70-100: 极度乐观(利好、暴涨、兴奋等情绪)
请直接输出一个JSON格式的结果包含两个字段
- score: 0-100的整数评分
- label: 简短的态度描述(如"极度悲观""悲观""偏悲观""中立""偏乐观""乐观""极度乐观"
注意:
1. 只返回JSON不要有其他文字
2. 如果无法判断返回50和"无法判断"
3. 分析要客观,不要被表面文字迷惑
"""
def __init__(self, config: Dict):
self.config = config
self.api_key = config.get('api_key', '')
self.base_url = config.get('base_url', '')
self.model = config.get('model', '')
self.timeout = config.get('timeout', 120)
self.retry_times = config.get('retry_times', 3)
self.client = None
self.last_result = None # 保存最后一次分析结果
logger.info(f"LLM分析器配置 - base_url: {self.base_url}, model: {self.model}, timeout: {self.timeout}s, retry: {self.retry_times}")
if self.api_key:
self._init_client()
else:
logger.warning("LLM API 未配置api_key 为空")
def _init_client(self):
"""初始化智谱AI客户端"""
try:
logger.info(f"初始化智谱AI客户端: {self.base_url}")
self.client = ZhipuAiClient(
api_key=self.api_key,
base_url=self.base_url
)
logger.info("智谱AI客户端初始化成功")
except Exception as e:
logger.error(f"初始化智谱AI客户端失败: {e}")
def update_config(self, config: Dict):
"""更新配置"""
self.config.update(config)
self.api_key = config.get('api_key', self.api_key)
self.base_url = config.get('base_url', self.base_url)
self.model = config.get('model', self.model)
self.timeout = config.get('timeout', self.timeout)
self.retry_times = config.get('retry_times', self.retry_times)
if self.api_key:
self._init_client()
def analyze(self, comment: str) -> Tuple[Optional[int], Optional[str]]:
"""
分析单条评论
返回 (score, label)
"""
if not self.client:
logger.error("LLM客户端未初始化请检查API配置")
return None, "LLM未配置"
if not comment or not comment.strip():
logger.warning("评论内容为空")
return None, "评论为空"
logger.debug(f"开始分析评论: {comment[:50]}...")
logger.debug(f"使用模型: {self.model}, 超时设置: {self.timeout}")
for attempt in range(self.retry_times):
try:
logger.info(f"API调用尝试 {attempt + 1}/{self.retry_times}")
logger.debug("发送请求到智谱AI API")
response = self.client.chat.completions.create(
model="glm-4.7-flash",
messages=[
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": f"请分析以下评论的情感倾向:\n\n{comment}"}
],
thinking={
"type": "disabled", # 禁用深度思考模式
},
temperature=0.3,
max_tokens=500
)
# 处理 deepseek-r1 的特殊结构(可能有 reasoning_content
message = response.choices[0].message
# 获取推理过程(如果有)
reasoning = getattr(message, 'reasoning_content', None)
if reasoning:
logger.debug(f"推理过程: {reasoning[:100]}...")
# 获取最终回答
result_text = message.content.strip() if message.content else ""
logger.debug(f"API返回原始内容: {result_text[:100]}...")
score, label = self._parse_response(result_text)
# 保存最后结果
self.last_result = {
'score': score,
'label': label,
'reasoning': reasoning,
'raw_response': result_text
}
if score is not None:
logger.info(f"分析完成: {score}分 - {label}")
return score, label
except Exception as e:
logger.warning(f"API调用失败 (尝试 {attempt + 1}/{self.retry_times}): {type(e).__name__}: {e}")
logger.debug(f"错误详情: {str(e)}")
if attempt < self.retry_times - 1:
wait_time = 2 ** attempt
logger.info(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time) # 指数退避
logger.error(f"所有 {self.retry_times} 次重试均失败")
return None, "分析失败"
def _parse_response(self, response: str) -> Tuple[Optional[int], Optional[str]]:
"""解析LLM返回的结果"""
try:
# 尝试直接解析JSON
result = json.loads(response)
score = result.get('score', 50)
label = result.get('label', '无法判断')
# 验证分数范围
score = max(0, min(100, int(score)))
logger.debug(f"JSON解析成功: {score} - {label}")
return score, label
except json.JSONDecodeError:
logger.debug("JSON解析失败尝试文本提取")
# 尝试从文本中提取
numbers = re.findall(r'\b(\d{1,3})\b', response)
if numbers:
score = int(numbers[0])
score = max(0, min(100, score))
# 提取标签
label_match = re.search(r'"([^"]+)"', response)
if label_match:
label = label_match.group(1)
else:
label = response.split('\n')[0][:20] if response else '无法判断'
logger.debug(f"文本提取成功: {score} - {label}")
return score, label
logger.warning("无法解析响应")
return None, "解析失败"
def get_last_result(self) -> Optional[Dict[str, Any]]:
"""获取最后一次分析结果"""
return self.last_result
def analyze_batch(self, comments: list, delay: float = 1.0) -> list:
"""
批量分析评论
delay: 每次调用之间的延迟(秒)
"""
logger.info(f"开始批量分析 {len(comments)} 条评论,每次间隔 {delay}")
results = []
success_count = 0
fail_count = 0
for i, comment in enumerate(comments):
logger.info(f"正在分析第 {i + 1}/{len(comments)} 条评论")
score, label = self.analyze(comment)
if score is not None:
success_count += 1
logger.debug(f"{i + 1} 条评论分析成功: {score}分 - {label}")
else:
fail_count += 1
logger.warning(f"{i + 1} 条评论分析失败: {label}")
results.append({
'content': comment,
'score': score,
'label': label
})
if delay > 0 and i < len(comments) - 1:
logger.debug(f"等待 {delay} 秒后继续...")
time.sleep(delay)
logger.info(f"批量分析完成,成功 {success_count} 条,失败 {fail_count}")
return results
def is_configured(self) -> bool:
"""检查是否已配置"""
return bool(self.client and self.api_key)