""" 截图管理器 - 用于在非交易时间截取上海证券交易所网站图表 """ import os import sys from datetime import datetime from loguru import logger # 使用Selenium替代Playwright from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC class ScreenshotManager: """截图管理器""" def __init__(self, screenshot_dir: str = "screenshots"): """ 初始化截图管理器 """ # 确定截图目录的正确路径 if getattr(sys, 'frozen', False): # 打包后的环境 current_dir = os.path.dirname(sys.executable) self.screenshot_dir = os.path.join(current_dir, screenshot_dir) else: # 开发环境 self.screenshot_dir = screenshot_dir self.target_url = "https://www.sse.com.cn/" self.chart_xpath_pattern = "//*[@id=\"hq_area\"]" # 创建截图目录 os.makedirs(self.screenshot_dir, exist_ok=True) logger.info(f"截图管理器初始化完成,截图目录: {self.screenshot_dir}") def capture_chart_screenshot(self) -> str: """ 截取上海证券交易所网站图表 返回截图文件路径,失败时返回空字符串 """ try: # 配置Chrome选项 chrome_options = Options() chrome_options.add_argument('--headless') chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-dev-shm-usage') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('--window-size=1920,1080') chrome_options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36') # 创建WebDriver logger.info("初始化Chrome WebDriver...") driver = webdriver.Chrome(options=chrome_options) driver.set_page_load_timeout(60) driver.implicitly_wait(30) # 访问目标网页 logger.info(f"访问上海证券交易所网站: {self.target_url}") driver.get(self.target_url) # 等待页面加载完成 logger.info("等待页面加载完成...") WebDriverWait(driver, 30).until( EC.presence_of_element_located((By.XPATH, "//body")) ) # 额外等待确保数据加载完成 import time time.sleep(5) # 等待图表元素出现 logger.info("等待图表元素加载...") # 使用动态XPath模式查找图表元素 chart_element = None selectors = [ self.chart_xpath_pattern, # 主要选择器:highcharts-xxxxxxx-0格式 "//*[contains(@class, 'highcharts')]", "//*[contains(@id, 'highcharts')]", "//svg", "//canvas", "//div[contains(@class, 'chart')]", "//div[contains(@class, 'graph')]" ] for selector in selectors: try: # 等待选择器出现 element = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, selector)) ) # 检查元素是否可见 if element.is_displayed(): chart_element = element logger.info(f"找到图表元素: {selector}") break except Exception as e: logger.debug(f"选择器错误 {selector}: {e}") continue # 生成截图文件名 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if not chart_element: logger.warning("未找到任何图表元素,尝试截取整个页面") # 如果找不到图表元素,截取整个页面 screenshot_path = os.path.join(self.screenshot_dir, f"sse_page_{timestamp}.png") driver.save_screenshot(screenshot_path) logger.info(f"截取整个页面: {screenshot_path}") driver.quit() return screenshot_path # 检查元素是否可见 if not chart_element.is_displayed(): logger.warning("图表元素不可见,尝试滚动到元素位置") driver.execute_script("arguments[0].scrollIntoView({behavior: 'smooth', block: 'center', inline: 'center'});", chart_element) time.sleep(2) # 生成截图文件名 screenshot_path = os.path.join(self.screenshot_dir, f"sse_chart_{timestamp}.png") # 截取图表元素 logger.info("开始截取图表元素") # 直接使用元素进行截图 chart_element.screenshot(screenshot_path) logger.info(f"✅ 图表截图完成,保存至: {screenshot_path}") # 关闭浏览器 driver.quit() return screenshot_path except Exception as e: logger.error(f"截图过程中发生错误: {e}") import traceback traceback.print_exc() return "" def get_latest_screenshot(self) -> str: """获取最新的截图文件路径""" try: if not os.path.exists(self.screenshot_dir): return "" # 获取所有截图文件 screenshot_files = [] for file in os.listdir(self.screenshot_dir): if file.startswith("sse_chart_") and file.endswith(".png"): file_path = os.path.join(self.screenshot_dir, file) screenshot_files.append((file_path, os.path.getmtime(file_path))) if not screenshot_files: return "" # 按修改时间排序,获取最新的文件 screenshot_files.sort(key=lambda x: x[1], reverse=True) return screenshot_files[0][0] except Exception as e: logger.error(f"获取最新截图失败: {e}") return "" def cleanup_old_screenshots(self, keep_count: int = 10): """清理旧的截图文件,只保留最新的几个""" try: if not os.path.exists(self.screenshot_dir): return # 获取所有截图文件 screenshot_files = [] for file in os.listdir(self.screenshot_dir): if file.startswith("sse_chart_") and file.endswith(".png"): file_path = os.path.join(self.screenshot_dir, file) screenshot_files.append((file_path, os.path.getmtime(file_path))) if len(screenshot_files) <= keep_count: return # 按修改时间排序,删除旧的文件 screenshot_files.sort(key=lambda x: x[1]) files_to_delete = screenshot_files[:-keep_count] for file_path, _ in files_to_delete: try: os.remove(file_path) logger.info(f"删除旧截图: {file_path}") except Exception as e: logger.error(f"删除截图文件失败 {file_path}: {e}") except Exception as e: logger.error(f"清理旧截图失败: {e}")