125 lines
5.0 KiB
Python
125 lines
5.0 KiB
Python
|
|
from typing import Dict, Any, List, Optional
|
|||
|
|
from loguru import logger
|
|||
|
|
import json
|
|||
|
|
|
|||
|
|
class LLMAnalyst:
|
|||
|
|
def __init__(self, config: Dict[str, Any] = None):
|
|||
|
|
self.config = config or {}
|
|||
|
|
self.provider = self.config.get('provider', 'mock')
|
|||
|
|
logger.info(f"LLM分析师初始化,提供者: {self.provider}")
|
|||
|
|
|
|||
|
|
def generate_analysis(self, data_context: Dict[str, Any],
|
|||
|
|
prompt_template: str = None,
|
|||
|
|
max_words: int = 200) -> str:
|
|||
|
|
default_prompt = """
|
|||
|
|
基于以下宏观经济数据,用专业分析师的口吻撰写市场洞察:
|
|||
|
|
{data_context}
|
|||
|
|
|
|||
|
|
要求:
|
|||
|
|
1. 不超过 {max_words} 汉字
|
|||
|
|
2. 专业、客观、具有洞察力
|
|||
|
|
3. 突出核心指标的边际变化
|
|||
|
|
4. 适合放在PPT首页作为摘要
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
prompt = prompt_template or default_prompt
|
|||
|
|
data_str = json.dumps(data_context, ensure_ascii=False, indent=2)
|
|||
|
|
final_prompt = prompt.format(data_context=data_str, max_words=max_words)
|
|||
|
|
|
|||
|
|
if self.provider == 'mock':
|
|||
|
|
return self._mock_analysis(data_context)
|
|||
|
|
elif self.provider == 'openai':
|
|||
|
|
return self._call_openai(final_prompt)
|
|||
|
|
elif self.provider == 'tongyi':
|
|||
|
|
return self._call_tongyi(final_prompt)
|
|||
|
|
else:
|
|||
|
|
return self._mock_analysis(data_context)
|
|||
|
|
|
|||
|
|
def _mock_analysis(self, context: Dict) -> str:
|
|||
|
|
gdp = context.get('gdp_growth', 'N/A')
|
|||
|
|
cpi = context.get('cpi', 'N/A')
|
|||
|
|
unemployment = context.get('unemployment', 'N/A')
|
|||
|
|
|
|||
|
|
return f"""本月宏观经济洞察:GDP增长{gdp}%,经济扩张动能平稳。CPI同比{cpi}%,通胀水平温和,为货币政策留出空间。就业市场{self._format_unemployment(unemployment)},青年失业率需重点关注。整体来看,经济处于弱复苏通道,建议关注基建投资与消费修复的进度。"""
|
|||
|
|
|
|||
|
|
def _format_unemployment(self, val):
|
|||
|
|
try:
|
|||
|
|
v = float(val)
|
|||
|
|
if v > 5.5:
|
|||
|
|
return f"压力较大({val}%)"
|
|||
|
|
elif v > 5:
|
|||
|
|
return f"基本稳定({val}%)"
|
|||
|
|
else:
|
|||
|
|
return f"表现良好({val}%)"
|
|||
|
|
except:
|
|||
|
|
return "数据待更新"
|
|||
|
|
|
|||
|
|
def _call_openai(self, prompt: str) -> str:
|
|||
|
|
try:
|
|||
|
|
from openai import OpenAI
|
|||
|
|
client = OpenAI(api_key=self.config.get('api_key'))
|
|||
|
|
response = client.chat.completions.create(
|
|||
|
|
model="gpt-3.5-turbo",
|
|||
|
|
messages=[{"role": "user", "content": prompt}]
|
|||
|
|
)
|
|||
|
|
return response.choices[0].message.content
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"OpenAI调用失败: {e}")
|
|||
|
|
return self._mock_analysis({})
|
|||
|
|
|
|||
|
|
def _call_tongyi(self, prompt: str) -> str:
|
|||
|
|
try:
|
|||
|
|
import dashscope
|
|||
|
|
dashscope.api_key = self.config.get('api_key')
|
|||
|
|
response = dashscope.Generation.call(
|
|||
|
|
model='qwen-turbo',
|
|||
|
|
prompt=prompt
|
|||
|
|
)
|
|||
|
|
return response.output.text
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"通义千问调用失败: {e}")
|
|||
|
|
return self._mock_analysis({})
|
|||
|
|
|
|||
|
|
class DiffAnalyzer:
|
|||
|
|
def __init__(self):
|
|||
|
|
self.changes = []
|
|||
|
|
|
|||
|
|
def compare_dataframes(self, df_old, df_new, key_cols: List = None):
|
|||
|
|
key_cols = key_cols or df_old.columns[:1].tolist()
|
|||
|
|
|
|||
|
|
for col in df_old.columns:
|
|||
|
|
if col not in key_cols and col in df_new.columns:
|
|||
|
|
try:
|
|||
|
|
old_val = df_old[col].iloc[-1] if len(df_old) > 0 else 0
|
|||
|
|
new_val = df_new[col].iloc[-1] if len(df_new) > 0 else 0
|
|||
|
|
|
|||
|
|
if isinstance(old_val, (int, float)) and isinstance(new_val, (int, float)):
|
|||
|
|
diff = float(new_val) - float(old_val)
|
|||
|
|
if abs(diff) > 0.01:
|
|||
|
|
self.changes.append({
|
|||
|
|
'indicator': col,
|
|||
|
|
'old': round(float(old_val), 2),
|
|||
|
|
'new': round(float(new_val), 2),
|
|||
|
|
'delta': round(diff, 2),
|
|||
|
|
'direction': 'up' if diff > 0 else 'down'
|
|||
|
|
})
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
logger.info(f"Diff分析发现 {len(self.changes)} 项显著变动")
|
|||
|
|
return self.changes
|
|||
|
|
|
|||
|
|
def generate_diff_report(self) -> str:
|
|||
|
|
if not self.changes:
|
|||
|
|
return "本月与上月数据相比无显著变动。"
|
|||
|
|
|
|||
|
|
report_parts = ["📊 数据异动 Diff 简报:\n"]
|
|||
|
|
for chg in self.changes[:5]:
|
|||
|
|
arrow = "↑" if chg['direction'] == 'up' else "↓"
|
|||
|
|
report_parts.append(
|
|||
|
|
f"• {chg['indicator']}: {chg['old']} → {chg['new']} "
|
|||
|
|
f"({arrow}{abs(chg['delta'])})"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
return "\n".join(report_parts)
|