Files
guba-indicator/llm_analyzer.py

227 lines
8.4 KiB
Python
Raw Normal View History

"""
大模型分析模块 - 调用LLM API分析评论情感
支持智谱AI API
"""
import json
import time
import re
from typing import Dict, Optional, Tuple, Any
from zai import ZhipuAiClient
from loguru import logger
class LLMAnalyzer:
"""大模型情感分析器"""
SYSTEM_PROMPT = """你是一个专业的情感分析助手。你的任务是分析股吧/论坛评论的情感倾向,判断投资者对该股票的态度。
评分规则
- 0-30: 极度悲观利空暴跌绝望等情绪
- 30-39: 悲观看空担忧谨慎等情绪
- 39-45: 偏悲观谨慎观望保守等情绪
- 45-55: 中立观望客观等情绪
- 55-65: 偏乐观看好希望等情绪
- 65-70: 乐观看涨信心等情绪
- 70-100: 极度乐观利好暴涨兴奋等情绪
请直接输出一个JSON格式的结果包含两个字段
- score: 0-100的整数评分
- label: 简短的态度描述"极度悲观""悲观""偏悲观""中立""偏乐观""乐观""极度乐观"
注意
1. 只返回JSON不要有其他文字
2. 如果无法判断返回50和"无法判断"
3. 分析要客观不要被表面文字迷惑
"""
def __init__(self, config: Dict):
self.config = config
self.api_key = config.get('api_key', '')
self.base_url = config.get('base_url', '')
self.model = config.get('model', '')
self.timeout = config.get('timeout', 120)
self.retry_times = config.get('retry_times', 3)
self.client = None
self.last_result = None # 保存最后一次分析结果
logger.info(f"LLM分析器配置 - base_url: {self.base_url}, model: {self.model}, timeout: {self.timeout}s, retry: {self.retry_times}")
if self.api_key:
self._init_client()
else:
logger.warning("LLM API 未配置api_key 为空")
def _init_client(self):
"""初始化智谱AI客户端"""
try:
logger.info(f"初始化智谱AI客户端: {self.base_url}")
self.client = ZhipuAiClient(
api_key=self.api_key,
base_url=self.base_url
)
logger.info("智谱AI客户端初始化成功")
except Exception as e:
logger.error(f"初始化智谱AI客户端失败: {e}")
def update_config(self, config: Dict):
"""更新配置"""
self.config.update(config)
self.api_key = config.get('api_key', self.api_key)
self.base_url = config.get('base_url', self.base_url)
self.model = config.get('model', self.model)
self.timeout = config.get('timeout', self.timeout)
self.retry_times = config.get('retry_times', self.retry_times)
if self.api_key:
self._init_client()
def analyze(self, comment: str) -> Tuple[Optional[int], Optional[str]]:
"""
分析单条评论
返回 (score, label)
"""
if not self.client:
logger.error("LLM客户端未初始化请检查API配置")
return None, "LLM未配置"
if not comment or not comment.strip():
logger.warning("评论内容为空")
return None, "评论为空"
logger.debug(f"开始分析评论: {comment[:50]}...")
logger.debug(f"使用模型: {self.model}, 超时设置: {self.timeout}")
for attempt in range(self.retry_times):
try:
logger.info(f"API调用尝试 {attempt + 1}/{self.retry_times}")
logger.debug("发送请求到智谱AI API")
response = self.client.chat.completions.create(
model="glm-4.7-flash",
messages=[
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": f"请分析以下评论的情感倾向:\n\n{comment}"}
],
thinking={
"type": "disabled", # 禁用深度思考模式
},
temperature=0.3,
max_tokens=500
)
# 处理 deepseek-r1 的特殊结构(可能有 reasoning_content
message = response.choices[0].message
# 获取推理过程(如果有)
reasoning = getattr(message, 'reasoning_content', None)
if reasoning:
logger.debug(f"推理过程: {reasoning[:100]}...")
# 获取最终回答
result_text = message.content.strip() if message.content else ""
logger.debug(f"API返回原始内容: {result_text[:100]}...")
score, label = self._parse_response(result_text)
# 保存最后结果
self.last_result = {
'score': score,
'label': label,
'reasoning': reasoning,
'raw_response': result_text
}
if score is not None:
logger.info(f"分析完成: {score}分 - {label}")
return score, label
except Exception as e:
logger.warning(f"API调用失败 (尝试 {attempt + 1}/{self.retry_times}): {type(e).__name__}: {e}")
logger.debug(f"错误详情: {str(e)}")
if attempt < self.retry_times - 1:
wait_time = 2 ** attempt
logger.info(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time) # 指数退避
logger.error(f"所有 {self.retry_times} 次重试均失败")
return None, "分析失败"
def _parse_response(self, response: str) -> Tuple[Optional[int], Optional[str]]:
"""解析LLM返回的结果"""
try:
# 尝试直接解析JSON
result = json.loads(response)
score = result.get('score', 50)
label = result.get('label', '无法判断')
# 验证分数范围
score = max(0, min(100, int(score)))
logger.debug(f"JSON解析成功: {score} - {label}")
return score, label
except json.JSONDecodeError:
logger.debug("JSON解析失败尝试文本提取")
# 尝试从文本中提取
numbers = re.findall(r'\b(\d{1,3})\b', response)
if numbers:
score = int(numbers[0])
score = max(0, min(100, score))
# 提取标签
label_match = re.search(r'"([^"]+)"', response)
if label_match:
label = label_match.group(1)
else:
label = response.split('\n')[0][:20] if response else '无法判断'
logger.debug(f"文本提取成功: {score} - {label}")
return score, label
logger.warning("无法解析响应")
return None, "解析失败"
def get_last_result(self) -> Optional[Dict[str, Any]]:
"""获取最后一次分析结果"""
return self.last_result
def analyze_batch(self, comments: list, delay: float = 1.0) -> list:
"""
批量分析评论
delay: 每次调用之间的延迟
"""
logger.info(f"开始批量分析 {len(comments)} 条评论,每次间隔 {delay}")
results = []
success_count = 0
fail_count = 0
for i, comment in enumerate(comments):
logger.info(f"正在分析第 {i + 1}/{len(comments)} 条评论")
score, label = self.analyze(comment)
if score is not None:
success_count += 1
logger.debug(f"{i + 1} 条评论分析成功: {score}分 - {label}")
else:
fail_count += 1
logger.warning(f"{i + 1} 条评论分析失败: {label}")
results.append({
'content': comment,
'score': score,
'label': label
})
if delay > 0 and i < len(comments) - 1:
logger.debug(f"等待 {delay} 秒后继续...")
time.sleep(delay)
logger.info(f"批量分析完成,成功 {success_count} 条,失败 {fail_count}")
return results
def is_configured(self) -> bool:
"""检查是否已配置"""
return bool(self.client and self.api_key)