from pptx import Presentation from pptx.chart.data import CategoryChartData, XyChartData, BubbleChartData from pptx.enum.chart import XL_CHART_TYPE from pptx.util import Inches, Pt from pathlib import Path from loguru import logger import pandas as pd from typing import List, Dict, Optional, Union, Any class NativeChartManager: def __init__(self, presentation: Presentation = None): self.prs = presentation def set_presentation(self, presentation: Presentation): self.prs = presentation def update_chart_by_anchor(self, anchor_name: str, categories: List[str], series_data: Dict[str, List[float]], chart_type: str = None) -> bool: if not self.prs: logger.error("Presentation未设置") return False for slide_idx, slide in enumerate(self.prs.slides): for shape in slide.shapes: if shape.name == anchor_name or (shape.has_chart and shape.name == anchor_name): if shape.has_chart: return self._update_existing_chart(shape.chart, categories, series_data) else: logger.info(f"锚点 {anchor_name} 不是图表,在原位创建新图表") return self._create_chart_in_shape_position(slide, shape, categories, series_data, chart_type) logger.warning(f"未找到图表锚点: {anchor_name}") return False def _update_existing_chart(self, chart, categories: List[str], series_data: Dict[str, List[float]]) -> bool: try: chart_data = CategoryChartData() chart_data.categories = categories for series_name, values in series_data.items(): chart_data.add_series(series_name, values) chart.replace_data(chart_data) logger.success(f"原生图表数据源已更新,系列数: {len(series_data)}") return True except Exception as e: logger.exception(f"更新图表数据源失败: {e}") return False def _create_chart_in_shape_position(self, slide, placeholder_shape, categories: List[str], series_data: Dict[str, List[float]], chart_type_str: str = None) -> bool: try: left = placeholder_shape.left top = placeholder_shape.top width = placeholder_shape.width height = placeholder_shape.height chart_type_map = { 'line': XL_CHART_TYPE.LINE, 'line_markers': XL_CHART_TYPE.LINE_MARKERS, 'bar': XL_CHART_TYPE.BAR_CLUSTERED, 'bar_stacked': XL_CHART_TYPE.BAR_STACKED, 'column': XL_CHART_TYPE.COLUMN_CLUSTERED, 'column_stacked': XL_CHART_TYPE.COLUMN_STACKED, 'pie': XL_CHART_TYPE.PIE, 'doughnut': XL_CHART_TYPE.DOUGHNUT, 'area': XL_CHART_TYPE.AREA, } xl_chart_type = chart_type_map.get(chart_type_str or 'line', XL_CHART_TYPE.LINE) chart_data = CategoryChartData() chart_data.categories = categories for series_name, values in series_data.items(): chart_data.add_series(series_name, values) slide.shapes.add_chart(xl_chart_type, left, top, width, height, chart_data) logger.success(f"已在位置创建新原生图表: {chart_type_str}") return True except Exception as e: logger.exception(f"创建原生图表失败: {e}") return False def update_table_by_anchor(self, anchor_name: str, data_frame: pd.DataFrame) -> bool: if not self.prs: logger.error("Presentation未设置") return False for slide_idx, slide in enumerate(self.prs.slides): for shape in slide.shapes: if shape.has_table and (shape.name == anchor_name or f"table_{anchor_name}" == shape.name): return self._update_table_content(shape.table, data_frame) logger.warning(f"未找到表格锚点: {anchor_name},尝试查找任意表格") for slide_idx, slide in enumerate(self.prs.slides): for shape in slide.shapes: if shape.has_table: return self._update_table_content(shape.table, data_frame) return False def _update_table_content(self, table, data_frame: pd.DataFrame) -> bool: try: rows, cols = data_frame.shape headers = list(data_frame.columns) for col_idx, header in enumerate(headers): if col_idx < len(table.columns) and 0 < len(table.rows): cell = table.cell(0, col_idx) cell.text = str(header) for para in cell.text_frame.paragraphs: para.font.bold = True for row_idx in range(min(rows, len(table.rows) - 1)): for col_idx in range(min(cols, len(table.columns))): cell = table.cell(row_idx + 1, col_idx) value = data_frame.iloc[row_idx, col_idx] if isinstance(value, (int, float)): cell.text = f"{value:.2f}" else: cell.text = str(value) logger.success(f"原生表格已更新,数据维度: {rows}x{cols}") return True except Exception as e: logger.exception(f"更新表格内容失败: {e}") return False def dataframe_to_series(self, df: pd.DataFrame, category_col: str = None) -> tuple: if category_col and category_col in df.columns: categories = df[category_col].tolist() else: categories = df.index.tolist() series_data = {} for col in df.columns: if col != category_col: try: series_data[col] = df[col].tolist() except: pass return categories, series_data