2026-01-12 09:19:38 +08:00
|
|
|
|
"""
|
|
|
|
|
|
截图管理器 - 用于在非交易时间截取上海证券交易所网站图表
|
|
|
|
|
|
"""
|
|
|
|
|
|
import os
|
2026-01-27 11:11:24 +08:00
|
|
|
|
import sys
|
2026-01-12 09:19:38 +08:00
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
from loguru import logger
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-01-27 11:11:24 +08:00
|
|
|
|
# 使用Selenium替代Playwright
|
|
|
|
|
|
from selenium import webdriver
|
|
|
|
|
|
from selenium.webdriver.chrome.options import Options
|
|
|
|
|
|
from selenium.webdriver.common.by import By
|
|
|
|
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
|
|
|
|
from selenium.webdriver.support import expected_conditions as EC
|
2026-01-12 09:19:38 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ScreenshotManager:
|
|
|
|
|
|
"""截图管理器"""
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, screenshot_dir: str = "screenshots"):
|
2026-01-27 11:11:24 +08:00
|
|
|
|
"""
|
|
|
|
|
|
初始化截图管理器
|
|
|
|
|
|
"""
|
|
|
|
|
|
# 确定截图目录的正确路径
|
|
|
|
|
|
if getattr(sys, 'frozen', False):
|
|
|
|
|
|
# 打包后的环境
|
|
|
|
|
|
current_dir = os.path.dirname(sys.executable)
|
|
|
|
|
|
self.screenshot_dir = os.path.join(current_dir, screenshot_dir)
|
|
|
|
|
|
else:
|
|
|
|
|
|
# 开发环境
|
|
|
|
|
|
self.screenshot_dir = screenshot_dir
|
|
|
|
|
|
|
2026-01-12 09:19:38 +08:00
|
|
|
|
self.target_url = "https://www.sse.com.cn/"
|
|
|
|
|
|
self.chart_xpath_pattern = "//*[@id=\"hq_area\"]"
|
|
|
|
|
|
|
|
|
|
|
|
# 创建截图目录
|
|
|
|
|
|
os.makedirs(self.screenshot_dir, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"截图管理器初始化完成,截图目录: {self.screenshot_dir}")
|
|
|
|
|
|
|
|
|
|
|
|
def capture_chart_screenshot(self) -> str:
|
|
|
|
|
|
"""
|
|
|
|
|
|
截取上海证券交易所网站图表
|
|
|
|
|
|
返回截图文件路径,失败时返回空字符串
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
2026-01-27 11:11:24 +08:00
|
|
|
|
# 配置Chrome选项
|
|
|
|
|
|
chrome_options = Options()
|
|
|
|
|
|
chrome_options.add_argument('--headless')
|
|
|
|
|
|
chrome_options.add_argument('--no-sandbox')
|
|
|
|
|
|
chrome_options.add_argument('--disable-dev-shm-usage')
|
|
|
|
|
|
chrome_options.add_argument('--disable-gpu')
|
|
|
|
|
|
chrome_options.add_argument('--window-size=1920,1080')
|
|
|
|
|
|
chrome_options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
|
|
|
|
|
|
|
|
|
|
|
|
# 创建WebDriver
|
|
|
|
|
|
logger.info("初始化Chrome WebDriver...")
|
|
|
|
|
|
driver = webdriver.Chrome(options=chrome_options)
|
|
|
|
|
|
driver.set_page_load_timeout(60)
|
|
|
|
|
|
driver.implicitly_wait(30)
|
|
|
|
|
|
|
|
|
|
|
|
# 访问目标网页
|
|
|
|
|
|
logger.info(f"访问上海证券交易所网站: {self.target_url}")
|
|
|
|
|
|
driver.get(self.target_url)
|
|
|
|
|
|
|
|
|
|
|
|
# 等待页面加载完成
|
|
|
|
|
|
logger.info("等待页面加载完成...")
|
|
|
|
|
|
WebDriverWait(driver, 30).until(
|
|
|
|
|
|
EC.presence_of_element_located((By.XPATH, "//body"))
|
|
|
|
|
|
)
|
|
|
|
|
|
# 额外等待确保数据加载完成
|
|
|
|
|
|
import time
|
|
|
|
|
|
time.sleep(5)
|
|
|
|
|
|
|
|
|
|
|
|
# 等待图表元素出现
|
|
|
|
|
|
logger.info("等待图表元素加载...")
|
|
|
|
|
|
|
|
|
|
|
|
# 使用动态XPath模式查找图表元素
|
|
|
|
|
|
chart_element = None
|
|
|
|
|
|
selectors = [
|
|
|
|
|
|
self.chart_xpath_pattern, # 主要选择器:highcharts-xxxxxxx-0格式
|
|
|
|
|
|
"//*[contains(@class, 'highcharts')]",
|
|
|
|
|
|
"//*[contains(@id, 'highcharts')]",
|
|
|
|
|
|
"//svg",
|
|
|
|
|
|
"//canvas",
|
|
|
|
|
|
"//div[contains(@class, 'chart')]",
|
|
|
|
|
|
"//div[contains(@class, 'graph')]"
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
for selector in selectors:
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 等待选择器出现
|
|
|
|
|
|
element = WebDriverWait(driver, 10).until(
|
|
|
|
|
|
EC.presence_of_element_located((By.XPATH, selector))
|
|
|
|
|
|
)
|
|
|
|
|
|
# 检查元素是否可见
|
|
|
|
|
|
if element.is_displayed():
|
|
|
|
|
|
chart_element = element
|
|
|
|
|
|
logger.info(f"找到图表元素: {selector}")
|
|
|
|
|
|
break
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.debug(f"选择器错误 {selector}: {e}")
|
|
|
|
|
|
continue
|
2026-01-12 09:19:38 +08:00
|
|
|
|
|
2026-01-27 11:11:24 +08:00
|
|
|
|
# 生成截图文件名
|
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
|
|
|
|
|
|
|
|
if not chart_element:
|
|
|
|
|
|
logger.warning("未找到任何图表元素,尝试截取整个页面")
|
|
|
|
|
|
# 如果找不到图表元素,截取整个页面
|
|
|
|
|
|
screenshot_path = os.path.join(self.screenshot_dir, f"sse_page_{timestamp}.png")
|
|
|
|
|
|
driver.save_screenshot(screenshot_path)
|
|
|
|
|
|
logger.info(f"截取整个页面: {screenshot_path}")
|
|
|
|
|
|
driver.quit()
|
2026-01-12 09:19:38 +08:00
|
|
|
|
return screenshot_path
|
2026-01-27 11:11:24 +08:00
|
|
|
|
|
|
|
|
|
|
# 检查元素是否可见
|
|
|
|
|
|
if not chart_element.is_displayed():
|
|
|
|
|
|
logger.warning("图表元素不可见,尝试滚动到元素位置")
|
|
|
|
|
|
driver.execute_script("arguments[0].scrollIntoView({behavior: 'smooth', block: 'center', inline: 'center'});", chart_element)
|
|
|
|
|
|
time.sleep(2)
|
|
|
|
|
|
|
|
|
|
|
|
# 生成截图文件名
|
|
|
|
|
|
screenshot_path = os.path.join(self.screenshot_dir, f"sse_chart_{timestamp}.png")
|
|
|
|
|
|
|
|
|
|
|
|
# 截取图表元素
|
|
|
|
|
|
logger.info("开始截取图表元素")
|
|
|
|
|
|
|
|
|
|
|
|
# 直接使用元素进行截图
|
|
|
|
|
|
chart_element.screenshot(screenshot_path)
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"✅ 图表截图完成,保存至: {screenshot_path}")
|
|
|
|
|
|
|
|
|
|
|
|
# 关闭浏览器
|
|
|
|
|
|
driver.quit()
|
|
|
|
|
|
|
|
|
|
|
|
return screenshot_path
|
|
|
|
|
|
|
2026-01-12 09:19:38 +08:00
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"截图过程中发生错误: {e}")
|
2026-01-27 11:11:24 +08:00
|
|
|
|
import traceback
|
|
|
|
|
|
traceback.print_exc()
|
2026-01-12 09:19:38 +08:00
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
|
|
def get_latest_screenshot(self) -> str:
|
|
|
|
|
|
"""获取最新的截图文件路径"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
if not os.path.exists(self.screenshot_dir):
|
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
|
|
# 获取所有截图文件
|
|
|
|
|
|
screenshot_files = []
|
|
|
|
|
|
for file in os.listdir(self.screenshot_dir):
|
|
|
|
|
|
if file.startswith("sse_chart_") and file.endswith(".png"):
|
|
|
|
|
|
file_path = os.path.join(self.screenshot_dir, file)
|
|
|
|
|
|
screenshot_files.append((file_path, os.path.getmtime(file_path)))
|
|
|
|
|
|
|
|
|
|
|
|
if not screenshot_files:
|
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
|
|
# 按修改时间排序,获取最新的文件
|
|
|
|
|
|
screenshot_files.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
|
|
return screenshot_files[0][0]
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"获取最新截图失败: {e}")
|
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
|
|
def cleanup_old_screenshots(self, keep_count: int = 10):
|
|
|
|
|
|
"""清理旧的截图文件,只保留最新的几个"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
if not os.path.exists(self.screenshot_dir):
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
# 获取所有截图文件
|
|
|
|
|
|
screenshot_files = []
|
|
|
|
|
|
for file in os.listdir(self.screenshot_dir):
|
|
|
|
|
|
if file.startswith("sse_chart_") and file.endswith(".png"):
|
|
|
|
|
|
file_path = os.path.join(self.screenshot_dir, file)
|
|
|
|
|
|
screenshot_files.append((file_path, os.path.getmtime(file_path)))
|
|
|
|
|
|
|
|
|
|
|
|
if len(screenshot_files) <= keep_count:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
# 按修改时间排序,删除旧的文件
|
|
|
|
|
|
screenshot_files.sort(key=lambda x: x[1])
|
|
|
|
|
|
files_to_delete = screenshot_files[:-keep_count]
|
|
|
|
|
|
|
|
|
|
|
|
for file_path, _ in files_to_delete:
|
|
|
|
|
|
try:
|
|
|
|
|
|
os.remove(file_path)
|
|
|
|
|
|
logger.info(f"删除旧截图: {file_path}")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"删除截图文件失败 {file_path}: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"清理旧截图失败: {e}")
|