""" 大模型分析模块 - 调用LLM API分析评论情感 支持智谱AI API """ import json import time import re from typing import Dict, Optional, Tuple, Any from zai import ZhipuAiClient from loguru import logger class LLMAnalyzer: """大模型情感分析器""" SYSTEM_PROMPT = """你是一个专业的情感分析助手。你的任务是分析股吧/论坛评论的情感倾向,判断投资者对该股票的态度。 评分规则: - 0-30: 极度悲观(利空、暴跌、绝望等情绪) - 30-39: 悲观(看空、担忧、谨慎等情绪) - 39-45: 偏悲观(谨慎观望、保守等情绪) - 45-55: 中立(观望、客观等情绪) - 55-65: 偏乐观(看好、希望等情绪) - 65-70: 乐观(看涨、信心等情绪) - 70-100: 极度乐观(利好、暴涨、兴奋等情绪) 请直接输出一个JSON格式的结果,包含两个字段: - score: 0-100的整数评分 - label: 简短的态度描述(如"极度悲观"、"悲观"、"偏悲观"、"中立"、"偏乐观"、"乐观"、"极度乐观") 注意: 1. 只返回JSON,不要有其他文字 2. 如果无法判断,返回50和"无法判断" 3. 分析要客观,不要被表面文字迷惑 """ def __init__(self, config: Dict): self.config = config self.api_key = config.get('api_key', '') self.base_url = config.get('base_url', '') self.model = config.get('model', '') self.timeout = config.get('timeout', 120) self.retry_times = config.get('retry_times', 3) self.client = None self.last_result = None # 保存最后一次分析结果 logger.info(f"LLM分析器配置 - base_url: {self.base_url}, model: {self.model}, timeout: {self.timeout}s, retry: {self.retry_times}次") if self.api_key: self._init_client() else: logger.warning("LLM API 未配置,api_key 为空") def _init_client(self): """初始化智谱AI客户端""" try: logger.info(f"初始化智谱AI客户端: {self.base_url}") self.client = ZhipuAiClient( api_key=self.api_key, base_url=self.base_url ) logger.info("智谱AI客户端初始化成功") except Exception as e: logger.error(f"初始化智谱AI客户端失败: {e}") def update_config(self, config: Dict): """更新配置""" self.config.update(config) self.api_key = config.get('api_key', self.api_key) self.base_url = config.get('base_url', self.base_url) self.model = config.get('model', self.model) self.timeout = config.get('timeout', self.timeout) self.retry_times = config.get('retry_times', self.retry_times) if self.api_key: self._init_client() def analyze(self, comment: str) -> Tuple[Optional[int], Optional[str]]: """ 分析单条评论 返回 (score, label) """ if not self.client: logger.error("LLM客户端未初始化,请检查API配置") return None, "LLM未配置" if not comment or not comment.strip(): logger.warning("评论内容为空") return None, "评论为空" logger.debug(f"开始分析评论: {comment[:50]}...") logger.debug(f"使用模型: {self.model}, 超时设置: {self.timeout}秒") for attempt in range(self.retry_times): try: logger.info(f"API调用尝试 {attempt + 1}/{self.retry_times}") logger.debug("发送请求到智谱AI API") response = self.client.chat.completions.create( model="glm-4.7-flash", messages=[ {"role": "system", "content": self.SYSTEM_PROMPT}, {"role": "user", "content": f"请分析以下评论的情感倾向:\n\n{comment}"} ], thinking={ "type": "disabled", # 禁用深度思考模式 }, temperature=0.3, max_tokens=500 ) # 处理 deepseek-r1 的特殊结构(可能有 reasoning_content) message = response.choices[0].message # 获取推理过程(如果有) reasoning = getattr(message, 'reasoning_content', None) if reasoning: logger.debug(f"推理过程: {reasoning[:100]}...") # 获取最终回答 result_text = message.content.strip() if message.content else "" logger.debug(f"API返回原始内容: {result_text[:100]}...") score, label = self._parse_response(result_text) # 保存最后结果 self.last_result = { 'score': score, 'label': label, 'reasoning': reasoning, 'raw_response': result_text } if score is not None: logger.info(f"分析完成: {score}分 - {label}") return score, label except Exception as e: logger.warning(f"API调用失败 (尝试 {attempt + 1}/{self.retry_times}): {type(e).__name__}: {e}") logger.debug(f"错误详情: {str(e)}") if attempt < self.retry_times - 1: wait_time = 2 ** attempt logger.info(f"等待 {wait_time} 秒后重试...") time.sleep(wait_time) # 指数退避 logger.error(f"所有 {self.retry_times} 次重试均失败") return None, "分析失败" def _parse_response(self, response: str) -> Tuple[Optional[int], Optional[str]]: """解析LLM返回的结果""" try: # 尝试直接解析JSON result = json.loads(response) score = result.get('score', 50) label = result.get('label', '无法判断') # 验证分数范围 score = max(0, min(100, int(score))) logger.debug(f"JSON解析成功: {score} - {label}") return score, label except json.JSONDecodeError: logger.debug("JSON解析失败,尝试文本提取") # 尝试从文本中提取 numbers = re.findall(r'\b(\d{1,3})\b', response) if numbers: score = int(numbers[0]) score = max(0, min(100, score)) # 提取标签 label_match = re.search(r'"([^"]+)"', response) if label_match: label = label_match.group(1) else: label = response.split('\n')[0][:20] if response else '无法判断' logger.debug(f"文本提取成功: {score} - {label}") return score, label logger.warning("无法解析响应") return None, "解析失败" def get_last_result(self) -> Optional[Dict[str, Any]]: """获取最后一次分析结果""" return self.last_result def analyze_batch(self, comments: list, delay: float = 1.0) -> list: """ 批量分析评论 delay: 每次调用之间的延迟(秒) """ logger.info(f"开始批量分析 {len(comments)} 条评论,每次间隔 {delay} 秒") results = [] success_count = 0 fail_count = 0 for i, comment in enumerate(comments): logger.info(f"正在分析第 {i + 1}/{len(comments)} 条评论") score, label = self.analyze(comment) if score is not None: success_count += 1 logger.debug(f"第 {i + 1} 条评论分析成功: {score}分 - {label}") else: fail_count += 1 logger.warning(f"第 {i + 1} 条评论分析失败: {label}") results.append({ 'content': comment, 'score': score, 'label': label }) if delay > 0 and i < len(comments) - 1: logger.debug(f"等待 {delay} 秒后继续...") time.sleep(delay) logger.info(f"批量分析完成,成功 {success_count} 条,失败 {fail_count} 条") return results def is_configured(self) -> bool: """检查是否已配置""" return bool(self.client and self.api_key)