from typing import Dict, Any, List, Optional from loguru import logger import json class LLMAnalyst: def __init__(self, config: Dict[str, Any] = None): self.config = config or {} self.provider = self.config.get('provider', 'mock') logger.info(f"LLM分析师初始化,提供者: {self.provider}") def generate_analysis(self, data_context: Dict[str, Any], prompt_template: str = None, max_words: int = 200) -> str: default_prompt = """ 基于以下宏观经济数据,用专业分析师的口吻撰写市场洞察: {data_context} 要求: 1. 不超过 {max_words} 汉字 2. 专业、客观、具有洞察力 3. 突出核心指标的边际变化 4. 适合放在PPT首页作为摘要 """ prompt = prompt_template or default_prompt data_str = json.dumps(data_context, ensure_ascii=False, indent=2) final_prompt = prompt.format(data_context=data_str, max_words=max_words) if self.provider == 'mock': return self._mock_analysis(data_context) elif self.provider == 'openai': return self._call_openai(final_prompt) elif self.provider == 'tongyi': return self._call_tongyi(final_prompt) else: return self._mock_analysis(data_context) def _mock_analysis(self, context: Dict) -> str: gdp = context.get('gdp_growth', 'N/A') cpi = context.get('cpi', 'N/A') unemployment = context.get('unemployment', 'N/A') return f"""本月宏观经济洞察:GDP增长{gdp}%,经济扩张动能平稳。CPI同比{cpi}%,通胀水平温和,为货币政策留出空间。就业市场{self._format_unemployment(unemployment)},青年失业率需重点关注。整体来看,经济处于弱复苏通道,建议关注基建投资与消费修复的进度。""" def _format_unemployment(self, val): try: v = float(val) if v > 5.5: return f"压力较大({val}%)" elif v > 5: return f"基本稳定({val}%)" else: return f"表现良好({val}%)" except: return "数据待更新" def _call_openai(self, prompt: str) -> str: try: from openai import OpenAI client = OpenAI(api_key=self.config.get('api_key')) response = client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content except Exception as e: logger.warning(f"OpenAI调用失败: {e}") return self._mock_analysis({}) def _call_tongyi(self, prompt: str) -> str: try: import dashscope dashscope.api_key = self.config.get('api_key') response = dashscope.Generation.call( model='qwen-turbo', prompt=prompt ) return response.output.text except Exception as e: logger.warning(f"通义千问调用失败: {e}") return self._mock_analysis({}) class DiffAnalyzer: def __init__(self): self.changes = [] def compare_dataframes(self, df_old, df_new, key_cols: List = None): key_cols = key_cols or df_old.columns[:1].tolist() for col in df_old.columns: if col not in key_cols and col in df_new.columns: try: old_val = df_old[col].iloc[-1] if len(df_old) > 0 else 0 new_val = df_new[col].iloc[-1] if len(df_new) > 0 else 0 if isinstance(old_val, (int, float)) and isinstance(new_val, (int, float)): diff = float(new_val) - float(old_val) if abs(diff) > 0.01: self.changes.append({ 'indicator': col, 'old': round(float(old_val), 2), 'new': round(float(new_val), 2), 'delta': round(diff, 2), 'direction': 'up' if diff > 0 else 'down' }) except: pass logger.info(f"Diff分析发现 {len(self.changes)} 项显著变动") return self.changes def generate_diff_report(self) -> str: if not self.changes: return "本月与上月数据相比无显著变动。" report_parts = ["📊 数据异动 Diff 简报:\n"] for chg in self.changes[:5]: arrow = "↑" if chg['direction'] == 'up' else "↓" report_parts.append( f"• {chg['indicator']}: {chg['old']} → {chg['new']} " f"({arrow}{abs(chg['delta'])})" ) return "\n".join(report_parts)