Files
guba-indicator/screenshot_manager.py
xiaji 96f206ea78 feat: 新增股票数据波形图和截图功能
refactor: 重构数据库和LLM分析器逻辑

fix: 修复爬虫解析和UI显示问题

docs: 更新配置文件和注释

style: 优化代码格式和日志输出
2026-01-12 09:19:38 +08:00

185 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
截图管理器 - 用于在非交易时间截取上海证券交易所网站图表
"""
import os
from datetime import datetime
from loguru import logger
try:
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
except ImportError:
logger.warning("playwright未安装截图功能将不可用")
class ScreenshotManager:
"""截图管理器"""
def __init__(self, screenshot_dir: str = "screenshots"):
"""初始化截图管理器"""
self.screenshot_dir = screenshot_dir
self.target_url = "https://www.sse.com.cn/"
self.chart_xpath_pattern = "//*[@id=\"hq_area\"]"
# 创建截图目录
os.makedirs(self.screenshot_dir, exist_ok=True)
logger.info(f"截图管理器初始化完成,截图目录: {self.screenshot_dir}")
def capture_chart_screenshot(self) -> str:
"""
截取上海证券交易所网站图表
返回截图文件路径,失败时返回空字符串
"""
try:
# 检查playwright是否可用
if 'sync_playwright' not in globals():
logger.error("playwright未安装无法使用截图功能")
return ""
with sync_playwright() as p:
# 启动浏览器(无头模式,后台运行)
browser = p.chromium.launch(headless=True)
page = browser.new_page()
# 设置页面超时
page.set_default_timeout(60000) # 60秒超时
# 访问目标网页
logger.info(f"访问上海证券交易所网站: {self.target_url}")
page.goto(self.target_url, wait_until="domcontentloaded")
# 等待页面加载完成
page.wait_for_load_state("networkidle")
# 等待页面完全加载
page.wait_for_timeout(5000) # 额外等待5秒
# 等待图表元素出现
logger.info("等待图表元素加载...")
# 使用动态XPath模式查找图表元素
chart_element = None
selectors = [
self.chart_xpath_pattern, # 主要选择器highcharts-xxxxxxx-0格式
"//*[contains(@class, 'highcharts')]",
"//*[contains(@id, 'highcharts')]",
"//svg",
"//canvas",
"//div[contains(@class, 'chart')]",
"//div[contains(@class, 'graph')]"
]
for selector in selectors:
try:
# 等待选择器出现
page.wait_for_selector(selector, timeout=10000)
elements = page.query_selector_all(selector)
if elements:
# 选择第一个可见的元素
for element in elements:
if element.is_visible():
chart_element = element
logger.info(f"找到图表元素: {selector}")
break
if chart_element:
break
except PlaywrightTimeoutError:
logger.debug(f"选择器超时: {selector}")
continue
except Exception as e:
logger.debug(f"选择器错误 {selector}: {e}")
continue
if not chart_element:
logger.warning("未找到任何图表元素,尝试截取整个页面")
# 如果找不到图表元素,截取整个页面
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
screenshot_path = os.path.join(self.screenshot_dir, f"sse_page_{timestamp}.png")
page.screenshot(path=screenshot_path)
logger.info(f"截取整个页面: {screenshot_path}")
browser.close()
return screenshot_path
# 检查元素是否可见
if not chart_element.is_visible():
logger.warning("图表元素不可见,尝试滚动到元素位置")
chart_element.scroll_into_view_if_needed()
# 生成截图文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
screenshot_path = os.path.join(self.screenshot_dir, f"sse_chart_{timestamp}.png")
# 截取图表元素
logger.info("开始截取图表元素")
# 直接使用元素进行截图
chart_element.screenshot(path=screenshot_path)
logger.info(f"✅ 图表截图完成,保存至: {screenshot_path}")
# 关闭浏览器
browser.close()
return screenshot_path
except PlaywrightTimeoutError as e:
logger.error(f"页面加载超时: {e}")
return ""
except Exception as e:
logger.error(f"截图过程中发生错误: {e}")
return ""
def get_latest_screenshot(self) -> str:
"""获取最新的截图文件路径"""
try:
if not os.path.exists(self.screenshot_dir):
return ""
# 获取所有截图文件
screenshot_files = []
for file in os.listdir(self.screenshot_dir):
if file.startswith("sse_chart_") and file.endswith(".png"):
file_path = os.path.join(self.screenshot_dir, file)
screenshot_files.append((file_path, os.path.getmtime(file_path)))
if not screenshot_files:
return ""
# 按修改时间排序,获取最新的文件
screenshot_files.sort(key=lambda x: x[1], reverse=True)
return screenshot_files[0][0]
except Exception as e:
logger.error(f"获取最新截图失败: {e}")
return ""
def cleanup_old_screenshots(self, keep_count: int = 10):
"""清理旧的截图文件,只保留最新的几个"""
try:
if not os.path.exists(self.screenshot_dir):
return
# 获取所有截图文件
screenshot_files = []
for file in os.listdir(self.screenshot_dir):
if file.startswith("sse_chart_") and file.endswith(".png"):
file_path = os.path.join(self.screenshot_dir, file)
screenshot_files.append((file_path, os.path.getmtime(file_path)))
if len(screenshot_files) <= keep_count:
return
# 按修改时间排序,删除旧的文件
screenshot_files.sort(key=lambda x: x[1])
files_to_delete = screenshot_files[:-keep_count]
for file_path, _ in files_to_delete:
try:
os.remove(file_path)
logger.info(f"删除旧截图: {file_path}")
except Exception as e:
logger.error(f"删除截图文件失败 {file_path}: {e}")
except Exception as e:
logger.error(f"清理旧截图失败: {e}")