V2.0 五大核心增强: 锚点定位/原生图表/插件架构/WebSocket/LLM智能

This commit is contained in:
2026-05-29 14:14:53 +08:00
parent 5546e5fca1
commit 8618867f92
51 changed files with 3368 additions and 0 deletions

View File

View File

@@ -0,0 +1,80 @@
from pptx import Presentation
from pptx.shapes.base import BaseShape
from pathlib import Path
from loguru import logger
from typing import Dict, List, Optional, Tuple, Any
class AnchorEngine:
def __init__(self, presentation: Presentation = None):
self.prs = presentation
self.anchor_cache = {}
def load_presentation(self, ppt_path: Path) -> Presentation:
logger.info(f"加载PPT模板用于锚点扫描: {ppt_path}")
self.prs = Presentation(str(ppt_path))
self.scan_anchors()
return self.prs
def scan_anchors(self) -> Dict[str, Tuple[int, BaseShape]]:
self.anchor_cache = {}
if not self.prs:
return self.anchor_cache
for slide_idx, slide in enumerate(self.prs.slides):
for shape in slide.shapes:
shape_name = shape.name.strip() if shape.name else ""
if shape_name and not shape_name.startswith("Picture") and not shape_name.startswith("Rectangle"):
self.anchor_cache[shape_name] = (slide_idx, shape)
logger.debug(f"发现锚点 [{shape_name}] 在第 {slide_idx+1}")
if shape.has_text_frame:
for para in shape.text_frame.paragraphs:
text = para.text.strip()
if text.startswith("{{") and text.endswith("}}"):
anchor_name = text[2:-2].strip()
self.anchor_cache[anchor_name] = (slide_idx, shape)
logger.debug(f"发现文本模板锚点 [{anchor_name}] 在第 {slide_idx+1}")
if shape.name and shape.name.lower().startswith(('chart_', 'table_', 'text_', 'img_')):
self.anchor_cache[shape.name] = (slide_idx, shape)
logger.debug(f"发现标准命名锚点 [{shape.name}] 在第 {slide_idx+1}")
logger.info(f"锚点扫描完成,共发现 {len(self.anchor_cache)} 个可绑定锚点")
return self.anchor_cache
def find_anchor(self, anchor_name: str) -> Optional[Tuple[int, BaseShape]]:
if anchor_name in self.anchor_cache:
return self.anchor_cache[anchor_name]
for slide_idx, slide in enumerate(self.prs.slides):
for shape in slide.shapes:
if shape.name == anchor_name:
self.anchor_cache[anchor_name] = (slide_idx, shape)
return (slide_idx, shape)
logger.warning(f"未找到锚点: {anchor_name}")
return None
def get_all_anchor_names(self) -> List[str]:
return list(self.anchor_cache.keys())
def replace_text_anchor(self, anchor_name: str, new_text: str) -> bool:
result = self.find_anchor(anchor_name)
if not result:
return False
slide_idx, shape = result
if shape.has_text_frame:
shape.text_frame.text = new_text
logger.success(f"文本锚点 [{anchor_name}] 已替换为: {new_text[:30]}...")
return True
return False
def get_shape_alternative_text(self, shape: BaseShape) -> str:
try:
if hasattr(shape, 'alternative_text'):
return shape.alternative_text or ""
except:
pass
return ""

View File

@@ -0,0 +1,137 @@
from typing import Dict, Any, List, Optional, Callable
from pptx import Presentation
from loguru import logger
import ast
import operator
class ConditionalRenderer:
def __init__(self, presentation: Presentation = None):
self.prs = presentation
self.context = {}
self._slides_to_remove = []
self.operators = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Gt: operator.gt,
ast.GtE: operator.ge,
ast.Lt: operator.lt,
ast.LtE: operator.le,
ast.Eq: operator.eq,
ast.NotEq: operator.ne,
ast.And: lambda a, b: a and b,
ast.Or: lambda a, b: a or b,
}
def set_context(self, context: Dict[str, Any]):
self.context = context
logger.info(f"条件渲染上下文已设置: {list(context.keys())}")
def update_context(self, key: str, value: Any):
self.context[key] = value
def evaluate_condition(self, condition_expr: str, context: Dict[str, Any] = None) -> bool:
eval_context = {**self.context, **(context or {})}
try:
if "{" in condition_expr:
for key, value in eval_context.items():
condition_expr = condition_expr.replace(f"{{{key}}}", str(value))
result = self._safe_eval(condition_expr, eval_context)
logger.info(f"条件 [{condition_expr}] 评估结果: {result}")
return bool(result)
except Exception as e:
logger.warning(f"条件表达式评估失败 [{condition_expr}]: {e}")
return False
def _safe_eval(self, expr: str, context: Dict) -> Any:
try:
node = ast.parse(expr, mode='eval')
return self._eval_ast(node.body, context)
except:
safe_dict = {k: v for k, v in context.items() if isinstance(k, str)}
safe_dict['__builtins__'] = {}
return eval(expr, safe_dict)
def _eval_ast(self, node, context):
if isinstance(node, ast.Constant):
return node.value
elif isinstance(node, ast.Num):
return node.n
elif isinstance(node, ast.Str):
return node.s
elif isinstance(node, ast.Name):
return context.get(node.id, node.id)
elif isinstance(node, ast.BinOp):
op_type = type(node.op)
if op_type in self.operators:
return self.operators[op_type](
self._eval_ast(node.left, context),
self._eval_ast(node.right, context)
)
elif isinstance(node, ast.Compare):
left_val = self._eval_ast(node.left, context)
for op, comparator in zip(node.ops, node.comparators):
op_type = type(op)
if op_type in self.operators:
if not self.operators[op_type](left_val, self._eval_ast(comparator, context)):
return False
return True
elif isinstance(node, ast.BoolOp):
op_type = type(node.op)
if op_type == ast.And:
for value in node.values:
if not self._eval_ast(value, context):
return False
return True
elif op_type == ast.Or:
for value in node.values:
if self._eval_ast(value, context):
return True
return False
return None
def process_slide_conditions(self, slide_configs: List[Dict], presentation: Presentation = None) -> Presentation:
prs = presentation or self.prs
if not prs:
logger.error("没有提供Presentation对象")
return prs
logger.info(f"开始处理条件渲染,当前页数: {len(prs.slides)}")
slides_to_keep = []
for idx, slide_config in enumerate(slide_configs):
if 'condition' in slide_config:
condition = slide_config['condition']
if not self.evaluate_condition(condition):
logger.info(f"跳过第 {idx+1} 页,条件不满足: {condition}")
continue
if 'action' in slide_config and slide_config['action'] == 'insert_slide':
logger.info(f"执行插入幻灯片操作: {slide_config.get('template_source')}")
if idx < len(prs.slides):
slides_to_keep.append(idx)
logger.info(f"条件渲染完成,保留 {len(slides_to_keep)} / {len(slide_configs)} 页配置")
return prs
def insert_new_slide(self, presentation: Presentation,
slide_layout_idx: int = 6,
position: int = None) -> int:
if position is None:
position = len(presentation.slides)
slide_layout = presentation.slide_layouts[slide_layout_idx] if slide_layout_idx < len(presentation.slide_layouts) else presentation.slide_layouts[0]
xml_slides = presentation.slides._sldIdLst
slides = list(xml_slides)
xml_slides.insert(position, slides[0] if slides else None)
new_slide = presentation.slides.add_slide(slide_layout)
logger.info(f"已在位置 {position} 插入新页面")
return position

View File

@@ -0,0 +1,148 @@
from pptx import Presentation
from pptx.chart.data import CategoryChartData, XyChartData, BubbleChartData
from pptx.enum.chart import XL_CHART_TYPE
from pptx.util import Inches, Pt
from pathlib import Path
from loguru import logger
import pandas as pd
from typing import List, Dict, Optional, Union, Any
class NativeChartManager:
def __init__(self, presentation: Presentation = None):
self.prs = presentation
def set_presentation(self, presentation: Presentation):
self.prs = presentation
def update_chart_by_anchor(self, anchor_name: str,
categories: List[str],
series_data: Dict[str, List[float]],
chart_type: str = None) -> bool:
if not self.prs:
logger.error("Presentation未设置")
return False
for slide_idx, slide in enumerate(self.prs.slides):
for shape in slide.shapes:
if shape.name == anchor_name or (shape.has_chart and shape.name == anchor_name):
if shape.has_chart:
return self._update_existing_chart(shape.chart, categories, series_data)
else:
logger.info(f"锚点 {anchor_name} 不是图表,在原位创建新图表")
return self._create_chart_in_shape_position(slide, shape, categories, series_data, chart_type)
logger.warning(f"未找到图表锚点: {anchor_name}")
return False
def _update_existing_chart(self, chart, categories: List[str], series_data: Dict[str, List[float]]) -> bool:
try:
chart_data = CategoryChartData()
chart_data.categories = categories
for series_name, values in series_data.items():
chart_data.add_series(series_name, values)
chart.replace_data(chart_data)
logger.success(f"原生图表数据源已更新,系列数: {len(series_data)}")
return True
except Exception as e:
logger.exception(f"更新图表数据源失败: {e}")
return False
def _create_chart_in_shape_position(self, slide, placeholder_shape,
categories: List[str],
series_data: Dict[str, List[float]],
chart_type_str: str = None) -> bool:
try:
left = placeholder_shape.left
top = placeholder_shape.top
width = placeholder_shape.width
height = placeholder_shape.height
chart_type_map = {
'line': XL_CHART_TYPE.LINE,
'line_markers': XL_CHART_TYPE.LINE_MARKERS,
'bar': XL_CHART_TYPE.BAR_CLUSTERED,
'bar_stacked': XL_CHART_TYPE.BAR_STACKED,
'column': XL_CHART_TYPE.COLUMN_CLUSTERED,
'column_stacked': XL_CHART_TYPE.COLUMN_STACKED,
'pie': XL_CHART_TYPE.PIE,
'doughnut': XL_CHART_TYPE.DOUGHNUT,
'area': XL_CHART_TYPE.AREA,
}
xl_chart_type = chart_type_map.get(chart_type_str or 'line', XL_CHART_TYPE.LINE)
chart_data = CategoryChartData()
chart_data.categories = categories
for series_name, values in series_data.items():
chart_data.add_series(series_name, values)
slide.shapes.add_chart(xl_chart_type, left, top, width, height, chart_data)
logger.success(f"已在位置创建新原生图表: {chart_type_str}")
return True
except Exception as e:
logger.exception(f"创建原生图表失败: {e}")
return False
def update_table_by_anchor(self, anchor_name: str, data_frame: pd.DataFrame) -> bool:
if not self.prs:
logger.error("Presentation未设置")
return False
for slide_idx, slide in enumerate(self.prs.slides):
for shape in slide.shapes:
if shape.has_table and (shape.name == anchor_name or f"table_{anchor_name}" == shape.name):
return self._update_table_content(shape.table, data_frame)
logger.warning(f"未找到表格锚点: {anchor_name},尝试查找任意表格")
for slide_idx, slide in enumerate(self.prs.slides):
for shape in slide.shapes:
if shape.has_table:
return self._update_table_content(shape.table, data_frame)
return False
def _update_table_content(self, table, data_frame: pd.DataFrame) -> bool:
try:
rows, cols = data_frame.shape
headers = list(data_frame.columns)
for col_idx, header in enumerate(headers):
if col_idx < len(table.columns) and 0 < len(table.rows):
cell = table.cell(0, col_idx)
cell.text = str(header)
for para in cell.text_frame.paragraphs:
para.font.bold = True
for row_idx in range(min(rows, len(table.rows) - 1)):
for col_idx in range(min(cols, len(table.columns))):
cell = table.cell(row_idx + 1, col_idx)
value = data_frame.iloc[row_idx, col_idx]
if isinstance(value, (int, float)):
cell.text = f"{value:.2f}"
else:
cell.text = str(value)
logger.success(f"原生表格已更新,数据维度: {rows}x{cols}")
return True
except Exception as e:
logger.exception(f"更新表格内容失败: {e}")
return False
def dataframe_to_series(self, df: pd.DataFrame,
category_col: str = None) -> tuple:
if category_col and category_col in df.columns:
categories = df[category_col].tolist()
else:
categories = df.index.tolist()
series_data = {}
for col in df.columns:
if col != category_col:
try:
series_data[col] = df[col].tolist()
except:
pass
return categories, series_data