149 lines
6.5 KiB
Python
149 lines
6.5 KiB
Python
|
|
from pptx import Presentation
|
||
|
|
from pptx.chart.data import CategoryChartData, XyChartData, BubbleChartData
|
||
|
|
from pptx.enum.chart import XL_CHART_TYPE
|
||
|
|
from pptx.util import Inches, Pt
|
||
|
|
from pathlib import Path
|
||
|
|
from loguru import logger
|
||
|
|
import pandas as pd
|
||
|
|
from typing import List, Dict, Optional, Union, Any
|
||
|
|
|
||
|
|
class NativeChartManager:
|
||
|
|
def __init__(self, presentation: Presentation = None):
|
||
|
|
self.prs = presentation
|
||
|
|
|
||
|
|
def set_presentation(self, presentation: Presentation):
|
||
|
|
self.prs = presentation
|
||
|
|
|
||
|
|
def update_chart_by_anchor(self, anchor_name: str,
|
||
|
|
categories: List[str],
|
||
|
|
series_data: Dict[str, List[float]],
|
||
|
|
chart_type: str = None) -> bool:
|
||
|
|
if not self.prs:
|
||
|
|
logger.error("Presentation未设置")
|
||
|
|
return False
|
||
|
|
|
||
|
|
for slide_idx, slide in enumerate(self.prs.slides):
|
||
|
|
for shape in slide.shapes:
|
||
|
|
if shape.name == anchor_name or (shape.has_chart and shape.name == anchor_name):
|
||
|
|
if shape.has_chart:
|
||
|
|
return self._update_existing_chart(shape.chart, categories, series_data)
|
||
|
|
else:
|
||
|
|
logger.info(f"锚点 {anchor_name} 不是图表,在原位创建新图表")
|
||
|
|
return self._create_chart_in_shape_position(slide, shape, categories, series_data, chart_type)
|
||
|
|
|
||
|
|
logger.warning(f"未找到图表锚点: {anchor_name}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
def _update_existing_chart(self, chart, categories: List[str], series_data: Dict[str, List[float]]) -> bool:
|
||
|
|
try:
|
||
|
|
chart_data = CategoryChartData()
|
||
|
|
chart_data.categories = categories
|
||
|
|
|
||
|
|
for series_name, values in series_data.items():
|
||
|
|
chart_data.add_series(series_name, values)
|
||
|
|
|
||
|
|
chart.replace_data(chart_data)
|
||
|
|
logger.success(f"原生图表数据源已更新,系列数: {len(series_data)}")
|
||
|
|
return True
|
||
|
|
except Exception as e:
|
||
|
|
logger.exception(f"更新图表数据源失败: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
def _create_chart_in_shape_position(self, slide, placeholder_shape,
|
||
|
|
categories: List[str],
|
||
|
|
series_data: Dict[str, List[float]],
|
||
|
|
chart_type_str: str = None) -> bool:
|
||
|
|
try:
|
||
|
|
left = placeholder_shape.left
|
||
|
|
top = placeholder_shape.top
|
||
|
|
width = placeholder_shape.width
|
||
|
|
height = placeholder_shape.height
|
||
|
|
|
||
|
|
chart_type_map = {
|
||
|
|
'line': XL_CHART_TYPE.LINE,
|
||
|
|
'line_markers': XL_CHART_TYPE.LINE_MARKERS,
|
||
|
|
'bar': XL_CHART_TYPE.BAR_CLUSTERED,
|
||
|
|
'bar_stacked': XL_CHART_TYPE.BAR_STACKED,
|
||
|
|
'column': XL_CHART_TYPE.COLUMN_CLUSTERED,
|
||
|
|
'column_stacked': XL_CHART_TYPE.COLUMN_STACKED,
|
||
|
|
'pie': XL_CHART_TYPE.PIE,
|
||
|
|
'doughnut': XL_CHART_TYPE.DOUGHNUT,
|
||
|
|
'area': XL_CHART_TYPE.AREA,
|
||
|
|
}
|
||
|
|
|
||
|
|
xl_chart_type = chart_type_map.get(chart_type_str or 'line', XL_CHART_TYPE.LINE)
|
||
|
|
|
||
|
|
chart_data = CategoryChartData()
|
||
|
|
chart_data.categories = categories
|
||
|
|
for series_name, values in series_data.items():
|
||
|
|
chart_data.add_series(series_name, values)
|
||
|
|
|
||
|
|
slide.shapes.add_chart(xl_chart_type, left, top, width, height, chart_data)
|
||
|
|
logger.success(f"已在位置创建新原生图表: {chart_type_str}")
|
||
|
|
return True
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.exception(f"创建原生图表失败: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
def update_table_by_anchor(self, anchor_name: str, data_frame: pd.DataFrame) -> bool:
|
||
|
|
if not self.prs:
|
||
|
|
logger.error("Presentation未设置")
|
||
|
|
return False
|
||
|
|
|
||
|
|
for slide_idx, slide in enumerate(self.prs.slides):
|
||
|
|
for shape in slide.shapes:
|
||
|
|
if shape.has_table and (shape.name == anchor_name or f"table_{anchor_name}" == shape.name):
|
||
|
|
return self._update_table_content(shape.table, data_frame)
|
||
|
|
|
||
|
|
logger.warning(f"未找到表格锚点: {anchor_name},尝试查找任意表格")
|
||
|
|
for slide_idx, slide in enumerate(self.prs.slides):
|
||
|
|
for shape in slide.shapes:
|
||
|
|
if shape.has_table:
|
||
|
|
return self._update_table_content(shape.table, data_frame)
|
||
|
|
return False
|
||
|
|
|
||
|
|
def _update_table_content(self, table, data_frame: pd.DataFrame) -> bool:
|
||
|
|
try:
|
||
|
|
rows, cols = data_frame.shape
|
||
|
|
headers = list(data_frame.columns)
|
||
|
|
|
||
|
|
for col_idx, header in enumerate(headers):
|
||
|
|
if col_idx < len(table.columns) and 0 < len(table.rows):
|
||
|
|
cell = table.cell(0, col_idx)
|
||
|
|
cell.text = str(header)
|
||
|
|
for para in cell.text_frame.paragraphs:
|
||
|
|
para.font.bold = True
|
||
|
|
|
||
|
|
for row_idx in range(min(rows, len(table.rows) - 1)):
|
||
|
|
for col_idx in range(min(cols, len(table.columns))):
|
||
|
|
cell = table.cell(row_idx + 1, col_idx)
|
||
|
|
value = data_frame.iloc[row_idx, col_idx]
|
||
|
|
if isinstance(value, (int, float)):
|
||
|
|
cell.text = f"{value:.2f}"
|
||
|
|
else:
|
||
|
|
cell.text = str(value)
|
||
|
|
|
||
|
|
logger.success(f"原生表格已更新,数据维度: {rows}x{cols}")
|
||
|
|
return True
|
||
|
|
except Exception as e:
|
||
|
|
logger.exception(f"更新表格内容失败: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
def dataframe_to_series(self, df: pd.DataFrame,
|
||
|
|
category_col: str = None) -> tuple:
|
||
|
|
if category_col and category_col in df.columns:
|
||
|
|
categories = df[category_col].tolist()
|
||
|
|
else:
|
||
|
|
categories = df.index.tolist()
|
||
|
|
|
||
|
|
series_data = {}
|
||
|
|
for col in df.columns:
|
||
|
|
if col != category_col:
|
||
|
|
try:
|
||
|
|
series_data[col] = df[col].tolist()
|
||
|
|
except:
|
||
|
|
pass
|
||
|
|
|
||
|
|
return categories, series_data
|