Files
guba-indicator/screenshot_manager.py
xiaji f256bd0852 refactor: 重构配置管理和截图功能,切换LLM提供商至智谱AI
重构配置管理器以支持打包环境路径处理
将LLM分析器从OpenAI迁移至智谱AI API
替换Playwright截图功能为Selenium实现
更新默认配置中的API端点和模型
2026-01-27 11:11:24 +08:00

198 lines
7.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
截图管理器 - 用于在非交易时间截取上海证券交易所网站图表
"""
import os
import sys
from datetime import datetime
from loguru import logger
# 使用Selenium替代Playwright
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
class ScreenshotManager:
"""截图管理器"""
def __init__(self, screenshot_dir: str = "screenshots"):
"""
初始化截图管理器
"""
# 确定截图目录的正确路径
if getattr(sys, 'frozen', False):
# 打包后的环境
current_dir = os.path.dirname(sys.executable)
self.screenshot_dir = os.path.join(current_dir, screenshot_dir)
else:
# 开发环境
self.screenshot_dir = screenshot_dir
self.target_url = "https://www.sse.com.cn/"
self.chart_xpath_pattern = "//*[@id=\"hq_area\"]"
# 创建截图目录
os.makedirs(self.screenshot_dir, exist_ok=True)
logger.info(f"截图管理器初始化完成,截图目录: {self.screenshot_dir}")
def capture_chart_screenshot(self) -> str:
"""
截取上海证券交易所网站图表
返回截图文件路径,失败时返回空字符串
"""
try:
# 配置Chrome选项
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--window-size=1920,1080')
chrome_options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
# 创建WebDriver
logger.info("初始化Chrome WebDriver...")
driver = webdriver.Chrome(options=chrome_options)
driver.set_page_load_timeout(60)
driver.implicitly_wait(30)
# 访问目标网页
logger.info(f"访问上海证券交易所网站: {self.target_url}")
driver.get(self.target_url)
# 等待页面加载完成
logger.info("等待页面加载完成...")
WebDriverWait(driver, 30).until(
EC.presence_of_element_located((By.XPATH, "//body"))
)
# 额外等待确保数据加载完成
import time
time.sleep(5)
# 等待图表元素出现
logger.info("等待图表元素加载...")
# 使用动态XPath模式查找图表元素
chart_element = None
selectors = [
self.chart_xpath_pattern, # 主要选择器highcharts-xxxxxxx-0格式
"//*[contains(@class, 'highcharts')]",
"//*[contains(@id, 'highcharts')]",
"//svg",
"//canvas",
"//div[contains(@class, 'chart')]",
"//div[contains(@class, 'graph')]"
]
for selector in selectors:
try:
# 等待选择器出现
element = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, selector))
)
# 检查元素是否可见
if element.is_displayed():
chart_element = element
logger.info(f"找到图表元素: {selector}")
break
except Exception as e:
logger.debug(f"选择器错误 {selector}: {e}")
continue
# 生成截图文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if not chart_element:
logger.warning("未找到任何图表元素,尝试截取整个页面")
# 如果找不到图表元素,截取整个页面
screenshot_path = os.path.join(self.screenshot_dir, f"sse_page_{timestamp}.png")
driver.save_screenshot(screenshot_path)
logger.info(f"截取整个页面: {screenshot_path}")
driver.quit()
return screenshot_path
# 检查元素是否可见
if not chart_element.is_displayed():
logger.warning("图表元素不可见,尝试滚动到元素位置")
driver.execute_script("arguments[0].scrollIntoView({behavior: 'smooth', block: 'center', inline: 'center'});", chart_element)
time.sleep(2)
# 生成截图文件名
screenshot_path = os.path.join(self.screenshot_dir, f"sse_chart_{timestamp}.png")
# 截取图表元素
logger.info("开始截取图表元素")
# 直接使用元素进行截图
chart_element.screenshot(screenshot_path)
logger.info(f"✅ 图表截图完成,保存至: {screenshot_path}")
# 关闭浏览器
driver.quit()
return screenshot_path
except Exception as e:
logger.error(f"截图过程中发生错误: {e}")
import traceback
traceback.print_exc()
return ""
def get_latest_screenshot(self) -> str:
"""获取最新的截图文件路径"""
try:
if not os.path.exists(self.screenshot_dir):
return ""
# 获取所有截图文件
screenshot_files = []
for file in os.listdir(self.screenshot_dir):
if file.startswith("sse_chart_") and file.endswith(".png"):
file_path = os.path.join(self.screenshot_dir, file)
screenshot_files.append((file_path, os.path.getmtime(file_path)))
if not screenshot_files:
return ""
# 按修改时间排序,获取最新的文件
screenshot_files.sort(key=lambda x: x[1], reverse=True)
return screenshot_files[0][0]
except Exception as e:
logger.error(f"获取最新截图失败: {e}")
return ""
def cleanup_old_screenshots(self, keep_count: int = 10):
"""清理旧的截图文件,只保留最新的几个"""
try:
if not os.path.exists(self.screenshot_dir):
return
# 获取所有截图文件
screenshot_files = []
for file in os.listdir(self.screenshot_dir):
if file.startswith("sse_chart_") and file.endswith(".png"):
file_path = os.path.join(self.screenshot_dir, file)
screenshot_files.append((file_path, os.path.getmtime(file_path)))
if len(screenshot_files) <= keep_count:
return
# 按修改时间排序,删除旧的文件
screenshot_files.sort(key=lambda x: x[1])
files_to_delete = screenshot_files[:-keep_count]
for file_path, _ in files_to_delete:
try:
os.remove(file_path)
logger.info(f"删除旧截图: {file_path}")
except Exception as e:
logger.error(f"删除截图文件失败 {file_path}: {e}")
except Exception as e:
logger.error(f"清理旧截图失败: {e}")