125 lines
5.0 KiB
Python
125 lines
5.0 KiB
Python
from typing import Dict, Any, List, Optional
|
||
from loguru import logger
|
||
import json
|
||
|
||
class LLMAnalyst:
|
||
def __init__(self, config: Dict[str, Any] = None):
|
||
self.config = config or {}
|
||
self.provider = self.config.get('provider', 'mock')
|
||
logger.info(f"LLM分析师初始化,提供者: {self.provider}")
|
||
|
||
def generate_analysis(self, data_context: Dict[str, Any],
|
||
prompt_template: str = None,
|
||
max_words: int = 200) -> str:
|
||
default_prompt = """
|
||
基于以下宏观经济数据,用专业分析师的口吻撰写市场洞察:
|
||
{data_context}
|
||
|
||
要求:
|
||
1. 不超过 {max_words} 汉字
|
||
2. 专业、客观、具有洞察力
|
||
3. 突出核心指标的边际变化
|
||
4. 适合放在PPT首页作为摘要
|
||
"""
|
||
|
||
prompt = prompt_template or default_prompt
|
||
data_str = json.dumps(data_context, ensure_ascii=False, indent=2)
|
||
final_prompt = prompt.format(data_context=data_str, max_words=max_words)
|
||
|
||
if self.provider == 'mock':
|
||
return self._mock_analysis(data_context)
|
||
elif self.provider == 'openai':
|
||
return self._call_openai(final_prompt)
|
||
elif self.provider == 'tongyi':
|
||
return self._call_tongyi(final_prompt)
|
||
else:
|
||
return self._mock_analysis(data_context)
|
||
|
||
def _mock_analysis(self, context: Dict) -> str:
|
||
gdp = context.get('gdp_growth', 'N/A')
|
||
cpi = context.get('cpi', 'N/A')
|
||
unemployment = context.get('unemployment', 'N/A')
|
||
|
||
return f"""本月宏观经济洞察:GDP增长{gdp}%,经济扩张动能平稳。CPI同比{cpi}%,通胀水平温和,为货币政策留出空间。就业市场{self._format_unemployment(unemployment)},青年失业率需重点关注。整体来看,经济处于弱复苏通道,建议关注基建投资与消费修复的进度。"""
|
||
|
||
def _format_unemployment(self, val):
|
||
try:
|
||
v = float(val)
|
||
if v > 5.5:
|
||
return f"压力较大({val}%)"
|
||
elif v > 5:
|
||
return f"基本稳定({val}%)"
|
||
else:
|
||
return f"表现良好({val}%)"
|
||
except:
|
||
return "数据待更新"
|
||
|
||
def _call_openai(self, prompt: str) -> str:
|
||
try:
|
||
from openai import OpenAI
|
||
client = OpenAI(api_key=self.config.get('api_key'))
|
||
response = client.chat.completions.create(
|
||
model="gpt-3.5-turbo",
|
||
messages=[{"role": "user", "content": prompt}]
|
||
)
|
||
return response.choices[0].message.content
|
||
except Exception as e:
|
||
logger.warning(f"OpenAI调用失败: {e}")
|
||
return self._mock_analysis({})
|
||
|
||
def _call_tongyi(self, prompt: str) -> str:
|
||
try:
|
||
import dashscope
|
||
dashscope.api_key = self.config.get('api_key')
|
||
response = dashscope.Generation.call(
|
||
model='qwen-turbo',
|
||
prompt=prompt
|
||
)
|
||
return response.output.text
|
||
except Exception as e:
|
||
logger.warning(f"通义千问调用失败: {e}")
|
||
return self._mock_analysis({})
|
||
|
||
class DiffAnalyzer:
|
||
def __init__(self):
|
||
self.changes = []
|
||
|
||
def compare_dataframes(self, df_old, df_new, key_cols: List = None):
|
||
key_cols = key_cols or df_old.columns[:1].tolist()
|
||
|
||
for col in df_old.columns:
|
||
if col not in key_cols and col in df_new.columns:
|
||
try:
|
||
old_val = df_old[col].iloc[-1] if len(df_old) > 0 else 0
|
||
new_val = df_new[col].iloc[-1] if len(df_new) > 0 else 0
|
||
|
||
if isinstance(old_val, (int, float)) and isinstance(new_val, (int, float)):
|
||
diff = float(new_val) - float(old_val)
|
||
if abs(diff) > 0.01:
|
||
self.changes.append({
|
||
'indicator': col,
|
||
'old': round(float(old_val), 2),
|
||
'new': round(float(new_val), 2),
|
||
'delta': round(diff, 2),
|
||
'direction': 'up' if diff > 0 else 'down'
|
||
})
|
||
except:
|
||
pass
|
||
|
||
logger.info(f"Diff分析发现 {len(self.changes)} 项显著变动")
|
||
return self.changes
|
||
|
||
def generate_diff_report(self) -> str:
|
||
if not self.changes:
|
||
return "本月与上月数据相比无显著变动。"
|
||
|
||
report_parts = ["📊 数据异动 Diff 简报:\n"]
|
||
for chg in self.changes[:5]:
|
||
arrow = "↑" if chg['direction'] == 'up' else "↓"
|
||
report_parts.append(
|
||
f"• {chg['indicator']}: {chg['old']} → {chg['new']} "
|
||
f"({arrow}{abs(chg['delta'])})"
|
||
)
|
||
|
||
return "\n".join(report_parts)
|