Files
guba-indicator/screenshot_manager.py

185 lines
7.6 KiB
Python
Raw Normal View History

"""
截图管理器 - 用于在非交易时间截取上海证券交易所网站图表
"""
import os
from datetime import datetime
from loguru import logger
try:
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
except ImportError:
logger.warning("playwright未安装截图功能将不可用")
class ScreenshotManager:
"""截图管理器"""
def __init__(self, screenshot_dir: str = "screenshots"):
"""初始化截图管理器"""
self.screenshot_dir = screenshot_dir
self.target_url = "https://www.sse.com.cn/"
self.chart_xpath_pattern = "//*[@id=\"hq_area\"]"
# 创建截图目录
os.makedirs(self.screenshot_dir, exist_ok=True)
logger.info(f"截图管理器初始化完成,截图目录: {self.screenshot_dir}")
def capture_chart_screenshot(self) -> str:
"""
截取上海证券交易所网站图表
返回截图文件路径失败时返回空字符串
"""
try:
# 检查playwright是否可用
if 'sync_playwright' not in globals():
logger.error("playwright未安装无法使用截图功能")
return ""
with sync_playwright() as p:
# 启动浏览器(无头模式,后台运行)
browser = p.chromium.launch(headless=True)
page = browser.new_page()
# 设置页面超时
page.set_default_timeout(60000) # 60秒超时
# 访问目标网页
logger.info(f"访问上海证券交易所网站: {self.target_url}")
page.goto(self.target_url, wait_until="domcontentloaded")
# 等待页面加载完成
page.wait_for_load_state("networkidle")
# 等待页面完全加载
page.wait_for_timeout(5000) # 额外等待5秒
# 等待图表元素出现
logger.info("等待图表元素加载...")
# 使用动态XPath模式查找图表元素
chart_element = None
selectors = [
self.chart_xpath_pattern, # 主要选择器highcharts-xxxxxxx-0格式
"//*[contains(@class, 'highcharts')]",
"//*[contains(@id, 'highcharts')]",
"//svg",
"//canvas",
"//div[contains(@class, 'chart')]",
"//div[contains(@class, 'graph')]"
]
for selector in selectors:
try:
# 等待选择器出现
page.wait_for_selector(selector, timeout=10000)
elements = page.query_selector_all(selector)
if elements:
# 选择第一个可见的元素
for element in elements:
if element.is_visible():
chart_element = element
logger.info(f"找到图表元素: {selector}")
break
if chart_element:
break
except PlaywrightTimeoutError:
logger.debug(f"选择器超时: {selector}")
continue
except Exception as e:
logger.debug(f"选择器错误 {selector}: {e}")
continue
if not chart_element:
logger.warning("未找到任何图表元素,尝试截取整个页面")
# 如果找不到图表元素,截取整个页面
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
screenshot_path = os.path.join(self.screenshot_dir, f"sse_page_{timestamp}.png")
page.screenshot(path=screenshot_path)
logger.info(f"截取整个页面: {screenshot_path}")
browser.close()
return screenshot_path
# 检查元素是否可见
if not chart_element.is_visible():
logger.warning("图表元素不可见,尝试滚动到元素位置")
chart_element.scroll_into_view_if_needed()
# 生成截图文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
screenshot_path = os.path.join(self.screenshot_dir, f"sse_chart_{timestamp}.png")
# 截取图表元素
logger.info("开始截取图表元素")
# 直接使用元素进行截图
chart_element.screenshot(path=screenshot_path)
logger.info(f"✅ 图表截图完成,保存至: {screenshot_path}")
# 关闭浏览器
browser.close()
return screenshot_path
except PlaywrightTimeoutError as e:
logger.error(f"页面加载超时: {e}")
return ""
except Exception as e:
logger.error(f"截图过程中发生错误: {e}")
return ""
def get_latest_screenshot(self) -> str:
"""获取最新的截图文件路径"""
try:
if not os.path.exists(self.screenshot_dir):
return ""
# 获取所有截图文件
screenshot_files = []
for file in os.listdir(self.screenshot_dir):
if file.startswith("sse_chart_") and file.endswith(".png"):
file_path = os.path.join(self.screenshot_dir, file)
screenshot_files.append((file_path, os.path.getmtime(file_path)))
if not screenshot_files:
return ""
# 按修改时间排序,获取最新的文件
screenshot_files.sort(key=lambda x: x[1], reverse=True)
return screenshot_files[0][0]
except Exception as e:
logger.error(f"获取最新截图失败: {e}")
return ""
def cleanup_old_screenshots(self, keep_count: int = 10):
"""清理旧的截图文件,只保留最新的几个"""
try:
if not os.path.exists(self.screenshot_dir):
return
# 获取所有截图文件
screenshot_files = []
for file in os.listdir(self.screenshot_dir):
if file.startswith("sse_chart_") and file.endswith(".png"):
file_path = os.path.join(self.screenshot_dir, file)
screenshot_files.append((file_path, os.path.getmtime(file_path)))
if len(screenshot_files) <= keep_count:
return
# 按修改时间排序,删除旧的文件
screenshot_files.sort(key=lambda x: x[1])
files_to_delete = screenshot_files[:-keep_count]
for file_path, _ in files_to_delete:
try:
os.remove(file_path)
logger.info(f"删除旧截图: {file_path}")
except Exception as e:
logger.error(f"删除截图文件失败 {file_path}: {e}")
except Exception as e:
logger.error(f"清理旧截图失败: {e}")