Files
ppt/ppt_manager_v2/core/native_chart.py

149 lines
6.5 KiB
Python

from pptx import Presentation
from pptx.chart.data import CategoryChartData, XyChartData, BubbleChartData
from pptx.enum.chart import XL_CHART_TYPE
from pptx.util import Inches, Pt
from pathlib import Path
from loguru import logger
import pandas as pd
from typing import List, Dict, Optional, Union, Any
class NativeChartManager:
def __init__(self, presentation: Presentation = None):
self.prs = presentation
def set_presentation(self, presentation: Presentation):
self.prs = presentation
def update_chart_by_anchor(self, anchor_name: str,
categories: List[str],
series_data: Dict[str, List[float]],
chart_type: str = None) -> bool:
if not self.prs:
logger.error("Presentation未设置")
return False
for slide_idx, slide in enumerate(self.prs.slides):
for shape in slide.shapes:
if shape.name == anchor_name or (shape.has_chart and shape.name == anchor_name):
if shape.has_chart:
return self._update_existing_chart(shape.chart, categories, series_data)
else:
logger.info(f"锚点 {anchor_name} 不是图表,在原位创建新图表")
return self._create_chart_in_shape_position(slide, shape, categories, series_data, chart_type)
logger.warning(f"未找到图表锚点: {anchor_name}")
return False
def _update_existing_chart(self, chart, categories: List[str], series_data: Dict[str, List[float]]) -> bool:
try:
chart_data = CategoryChartData()
chart_data.categories = categories
for series_name, values in series_data.items():
chart_data.add_series(series_name, values)
chart.replace_data(chart_data)
logger.success(f"原生图表数据源已更新,系列数: {len(series_data)}")
return True
except Exception as e:
logger.exception(f"更新图表数据源失败: {e}")
return False
def _create_chart_in_shape_position(self, slide, placeholder_shape,
categories: List[str],
series_data: Dict[str, List[float]],
chart_type_str: str = None) -> bool:
try:
left = placeholder_shape.left
top = placeholder_shape.top
width = placeholder_shape.width
height = placeholder_shape.height
chart_type_map = {
'line': XL_CHART_TYPE.LINE,
'line_markers': XL_CHART_TYPE.LINE_MARKERS,
'bar': XL_CHART_TYPE.BAR_CLUSTERED,
'bar_stacked': XL_CHART_TYPE.BAR_STACKED,
'column': XL_CHART_TYPE.COLUMN_CLUSTERED,
'column_stacked': XL_CHART_TYPE.COLUMN_STACKED,
'pie': XL_CHART_TYPE.PIE,
'doughnut': XL_CHART_TYPE.DOUGHNUT,
'area': XL_CHART_TYPE.AREA,
}
xl_chart_type = chart_type_map.get(chart_type_str or 'line', XL_CHART_TYPE.LINE)
chart_data = CategoryChartData()
chart_data.categories = categories
for series_name, values in series_data.items():
chart_data.add_series(series_name, values)
slide.shapes.add_chart(xl_chart_type, left, top, width, height, chart_data)
logger.success(f"已在位置创建新原生图表: {chart_type_str}")
return True
except Exception as e:
logger.exception(f"创建原生图表失败: {e}")
return False
def update_table_by_anchor(self, anchor_name: str, data_frame: pd.DataFrame) -> bool:
if not self.prs:
logger.error("Presentation未设置")
return False
for slide_idx, slide in enumerate(self.prs.slides):
for shape in slide.shapes:
if shape.has_table and (shape.name == anchor_name or f"table_{anchor_name}" == shape.name):
return self._update_table_content(shape.table, data_frame)
logger.warning(f"未找到表格锚点: {anchor_name},尝试查找任意表格")
for slide_idx, slide in enumerate(self.prs.slides):
for shape in slide.shapes:
if shape.has_table:
return self._update_table_content(shape.table, data_frame)
return False
def _update_table_content(self, table, data_frame: pd.DataFrame) -> bool:
try:
rows, cols = data_frame.shape
headers = list(data_frame.columns)
for col_idx, header in enumerate(headers):
if col_idx < len(table.columns) and 0 < len(table.rows):
cell = table.cell(0, col_idx)
cell.text = str(header)
for para in cell.text_frame.paragraphs:
para.font.bold = True
for row_idx in range(min(rows, len(table.rows) - 1)):
for col_idx in range(min(cols, len(table.columns))):
cell = table.cell(row_idx + 1, col_idx)
value = data_frame.iloc[row_idx, col_idx]
if isinstance(value, (int, float)):
cell.text = f"{value:.2f}"
else:
cell.text = str(value)
logger.success(f"原生表格已更新,数据维度: {rows}x{cols}")
return True
except Exception as e:
logger.exception(f"更新表格内容失败: {e}")
return False
def dataframe_to_series(self, df: pd.DataFrame,
category_col: str = None) -> tuple:
if category_col and category_col in df.columns:
categories = df[category_col].tolist()
else:
categories = df.index.tolist()
series_data = {}
for col in df.columns:
if col != category_col:
try:
series_data[col] = df[col].tolist()
except:
pass
return categories, series_data