Files
ppt/ppt_manager_v2/orchestrator.py

252 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sys
from pathlib import Path
from typing import Dict, Any, List, Optional
from loguru import logger
from datetime import datetime
import yaml
sys.path.insert(0, str(Path(__file__).parent))
from core.anchor_engine import AnchorEngine
from core.native_chart import NativeChartManager
from core.conditional_renderer import ConditionalRenderer
from plugins.base_generator import GeneratorPluginManager
from ai.llm_analyst import LLMAnalyst
class Orchestrator:
def __init__(self, config_path: str = None):
self.base_dir = Path(__file__).parent
if config_path:
self.config_path = Path(config_path)
else:
self.config_path = self.base_dir / "config" / "project_config_v2.yaml"
self.config = self._load_config()
self.prs = None
self.results: Dict[str, Any] = {}
self.progress_callback = None
log_file = self.base_dir / "logs" / f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
log_file.parent.mkdir(parents=True, exist_ok=True)
logger.add(str(log_file), rotation="10 MB", level="INFO", encoding="utf-8")
self.plugin_manager = GeneratorPluginManager(self.base_dir / "plugins" / "generators")
self.plugin_manager.discover_plugins()
self.anchor_engine = AnchorEngine()
self.chart_manager = NativeChartManager()
self.conditional_renderer = ConditionalRenderer()
ai_config = self.config.get('ai', {})
self.llm = LLMAnalyst(ai_config)
def _load_config(self) -> Dict:
if self.config_path.exists():
with open(self.config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f) or {}
return {}
def set_progress_callback(self, callback):
self.progress_callback = callback
def _report_progress(self, step: str, message: str, percent: int = 0):
logger.info(f"[{step}] {message}")
if self.progress_callback:
self.progress_callback({
'step': step,
'message': message,
'percent': percent,
'timestamp': datetime.now().isoformat()
})
def load_template(self, template_path: str = None) -> bool:
template_file = self.base_dir / "template_ppt" / (template_path or self.config.get('template', ''))
if not template_file.exists():
self._report_progress("LOAD", f"创建新演示文稿(模板不存在: {template_file.name}", 5)
from pptx import Presentation
self.prs = Presentation()
for i in range(8):
layout_idx = 5 if i == 0 else 1
if layout_idx < len(self.prs.slide_layouts):
slide = self.prs.slides.add_slide(self.prs.slide_layouts[layout_idx])
if slide.shapes.title:
slide.shapes.title.text = f"示例页 {i+1}"
if i == 3:
if slide.shapes.title:
slide.shapes.title.text = "GDP趋势图"
for shp in slide.shapes:
if shp != slide.shapes.title and hasattr(shp, 'name'):
shp.name = "chart_gdp"
break
if i == 4:
if slide.shapes.title:
slide.shapes.title.text = "CPI/PPI走势图"
for shp in slide.shapes:
if shp != slide.shapes.title and hasattr(shp, 'name'):
shp.name = "chart_cpi"
break
logger.debug(f"创建示例页 {i+1}")
else:
self._report_progress("LOAD", f"加载模板: {template_file.name}", 5)
self.prs = self.anchor_engine.load_presentation(template_file)
self.anchor_engine.prs = self.prs
self.chart_manager.set_presentation(self.prs)
self.conditional_renderer.prs = self.prs
anchors_found = self.anchor_engine.scan_anchors()
self._report_progress("LOAD", f"发现 {len(anchors_found)} 个可绑定锚点", 10)
return True
def run_plugins(self, params: Dict[str, Any] = None) -> Dict:
self._report_progress("PLUGINS", "开始执行数据插件...", 15)
run_params = {**self.config.get('params', {}), **(params or {})}
plugin_results = {}
total = len(self.plugin_manager.plugins)
for idx, (pid, plugin_class) in enumerate(self.plugin_manager.plugins.items()):
self._report_progress("PLUGINS", f"执行插件: {plugin_class.generator_name}", 20 + int(idx/total*30))
try:
plugin = plugin_class(run_params)
if plugin.fetch_data(run_params):
render_result = plugin.render()
plugin_results[pid] = {
'success': True,
'data': plugin.get_data(),
'render': render_result
}
logger.success(f"插件 [{pid}] 执行成功")
else:
plugin_results[pid] = {'success': False}
except Exception as e:
logger.exception(f"插件 [{pid}] 执行失败: {e}")
plugin_results[pid] = {'success': False, 'error': str(e)}
self.results['plugins'] = plugin_results
return plugin_results
def update_native_charts(self) -> int:
self._report_progress("CHARTS", "开始更新原生图表...", 55)
updates_count = 0
anchors_config = self.config.get('anchors', [])
for anchor_cfg in anchors_config:
anchor_type = anchor_cfg.get('type', '')
anchor_name = anchor_cfg.get('name', '')
if 'native_chart' in anchor_type:
plugin_id = anchor_cfg.get('plugin')
chart_type = anchor_cfg.get('chart_type', 'line')
if plugin_id and plugin_id in self.results.get('plugins', {}):
plugin_result = self.results['plugins'][plugin_id]
if plugin_result.get('success'):
render = plugin_result['render']
success = self.chart_manager.update_chart_by_anchor(
anchor_name=render.get('anchor', anchor_name),
categories=render.get('categories', []),
series_data=render.get('series', {}),
chart_type=chart_type
)
if success:
updates_count += 1
self._report_progress("CHARTS", f"完成 {updates_count} 个原生图表更新", 70)
return updates_count
def ai_generate_summary(self) -> str:
self._report_progress("AI", "LLM正在生成分析摘要...", 75)
context = {}
for pid, presult in self.results.get('plugins', {}).items():
if presult.get('success') and 'data' in presult:
df = presult['data']
if hasattr(df, 'iloc'):
for col in df.columns:
try:
context[f"{pid}_{col}"] = round(float(df[col].iloc[-1]), 2)
except:
pass
try:
summary = self.llm.generate_analysis(context, max_words=200)
self.results['llm_summary'] = summary
for anchor_cfg in self.config.get('anchors', []):
if anchor_cfg.get('type') == 'text_replace' and 'summary' in anchor_cfg.get('name', ''):
self.anchor_engine.replace_text_anchor(anchor_cfg['name'], summary)
if anchor_cfg.get('type') == 'ai_text_generate':
anchor_name = anchor_cfg.get('name', 'text_llm_summary')
self.anchor_engine.replace_text_anchor(anchor_name, summary)
logger.success(f"LLM分析完成: {summary[:50]}...")
return summary
except Exception as e:
logger.warning(f"LLM分析失败: {e}")
return ""
def process_conditions(self):
self._report_progress("CONDITIONS", "处理条件渲染规则...", 80)
context = {}
for pid, presult in self.results.get('plugins', {}).items():
if presult.get('success') and 'data' in presult:
df = presult['data']
if hasattr(df, 'iloc'):
for col in df.columns:
try:
val = float(df[col].iloc[-1])
context[col] = val
if 'GDP' in col: context['gdp_growth'] = val
if 'unemploy' in col.lower(): context['unemployment_rate'] = val
except:
pass
self.conditional_renderer.set_context(context)
slide_conditions = self.config.get('slide_conditions', [])
for cond_cfg in slide_conditions:
if 'condition' in cond_cfg:
cond_result = self.conditional_renderer.evaluate_condition(cond_cfg['condition'])
logger.info(f"条件规则: {cond_cfg['condition']} -> {'满足' if cond_result else '不满足'}")
return True
def save(self, output_name: str = None) -> str:
self._report_progress("SAVE", "正在保存最终PPT...", 95)
output_dir = self.base_dir / self.config.get('output_dir', 'output')
output_dir.mkdir(parents=True, exist_ok=True)
if not output_name:
output_name = f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pptx"
output_path = output_dir / output_name
self.prs.save(str(output_path))
self._report_progress("DONE", f"PPT已保存: {output_name}", 100)
logger.success(f"=" * 50)
logger.success(f"生成完成: {output_path}")
logger.success(f"=" * 50)
return str(output_path)
def run_full_pipeline(self, template_path: str = None, params: Dict = None) -> str:
self.load_template(template_path)
self.run_plugins(params)
self.update_native_charts()
self.ai_generate_summary()
self.process_conditions()
return self.save()
if __name__ == "__main__":
orch = Orchestrator()
orch.run_full_pipeline()