Files
guba-indicator/waveform_widget.py

384 lines
14 KiB
Python
Raw Normal View History

"""
波形图组件 - 用于绘制股票数据波形图
"""
import math
import os
from datetime import datetime, time
from PySide6.QtWidgets import QWidget
from PySide6.QtGui import QPainter, QPen, QColor, QBrush, QPixmap
from PySide6.QtCore import QPointF, QTimer
from loguru import logger
# 尝试导入截图管理器
try:
from screenshot_manager import ScreenshotManager
except ImportError:
ScreenshotManager = None
logger.warning("截图管理器导入失败,截图功能将不可用")
class WaveformWidget(QWidget):
"""波形图控件"""
def __init__(self, parent=None):
super().__init__(parent)
self.data_points = [] # 存储数据点 [(time, value)]
self.base_value = 0 # 基准值
self.screenshot_manager = None
self.latest_screenshot_path = ""
self.setMinimumSize(600, 300)
# 设置背景色
self.setStyleSheet("background-color: #1e1e1e;")
# 初始化截图管理器
if ScreenshotManager:
self.screenshot_manager = ScreenshotManager()
logger.info("截图管理器初始化完成")
logger.info("WaveformWidget 初始化完成")
def time_to_x_position(self, time_str: str, total_width: int) -> float:
"""
将时间转换为X轴位置
时间折算关系
- 9:30 -> 最左侧 (x=0)
- 11:30 -> 中间 (x=total_width/2)
- 13:00 -> 中间 (x=total_width/2)
- 15:00 -> 最右侧 (x=total_width)
- 10:30 -> 左侧四分之一 (x=total_width/4)
- 14:00 -> 右侧四分之一 (x=total_width*3/4)
"""
try:
# 解析时间字符串
current_time = datetime.strptime(time_str, "%H:%M").time()
# 定义关键时间点
market_start = time(9, 30) # 9:30
market_mid1 = time(11, 30) # 11:30
market_mid2 = time(13, 0) # 13:00
market_end = time(15, 0) # 15:00
# 计算总交易时间(分钟)
morning_duration = (market_mid1.hour - market_start.hour) * 60 + \
(market_mid1.minute - market_start.minute)
afternoon_duration = (market_end.hour - market_mid2.hour) * 60 + \
(market_end.minute - market_mid2.minute)
total_duration = morning_duration + afternoon_duration
# 计算当前时间相对于开盘时间的分钟数
if current_time <= market_mid1:
# 上午交易时段
minutes_from_start = (current_time.hour - market_start.hour) * 60 + \
(current_time.minute - market_start.minute)
# 上午时段占一半宽度
x_ratio = minutes_from_start / morning_duration * 0.5
elif current_time >= market_mid2:
# 下午交易时段
minutes_from_start = (current_time.hour - market_mid2.hour) * 60 + \
(current_time.minute - market_mid2.minute)
# 下午时段占一半宽度,从中间开始
x_ratio = 0.5 + minutes_from_start / afternoon_duration * 0.5
else:
# 午休时间,统一放在中间
x_ratio = 0.5
return x_ratio * total_width
except Exception as e:
logger.error(f"时间转换错误: {time_str}, 错误: {e}")
return total_width / 2 # 默认返回中间位置
def add_data_point(self, time_str: str, value: float):
"""添加数据点"""
# 如果是第一个数据点,设置基准值
if not self.data_points:
self.base_value = value
logger.info(f"设置基准值: {self.base_value}")
self.data_points.append((time_str, value))
logger.info(f"添加数据点: 时间={time_str}, 值={value}")
# 限制数据点数量,避免内存过大
if len(self.data_points) > 100:
self.data_points = self.data_points[-100:]
# 触发重绘
self.update()
def clear_data(self):
"""清除所有数据"""
self.data_points.clear()
self.base_value = 0
self.update()
logger.info("波形图数据已清除")
def paintEvent(self, event):
"""绘制波形图"""
painter = QPainter(self)
painter.setRenderHint(QPainter.Antialiasing)
width = self.width()
height = self.height()
# 如果没有数据点,显示提示信息
if not self.data_points:
self._draw_no_data_message(painter, width, height)
return
# 绘制网格和坐标轴
self._draw_grid(painter, width, height)
# 绘制波形线
self._draw_waveform(painter, width, height)
# 绘制数据点
self._draw_data_points(painter, width, height)
def _draw_grid(self, painter: QPainter, width: int, height: int):
"""绘制网格和坐标轴"""
# 设置网格颜色
grid_color = QColor(100, 100, 100)
painter.setPen(QPen(grid_color, 1, Qt.DashLine))
# 绘制水平网格线
for i in range(1, 5):
y = height * i // 5
painter.drawLine(0, y, width, y)
# 绘制垂直网格线(时间刻度)
time_points = ["9:30", "10:30", "11:30", "13:00", "14:00", "15:00"]
for time_str in time_points:
x = self.time_to_x_position(time_str, width)
painter.drawLine(x, 0, x, height)
# 绘制坐标轴
axis_color = QColor(200, 200, 200)
painter.setPen(QPen(axis_color, 2))
painter.drawLine(0, height // 2, width, height // 2) # X轴
painter.drawLine(0, 0, 0, height) # Y轴
# 绘制时间标签
painter.setPen(QPen(QColor(150, 150, 150), 1))
for time_str in time_points:
x = self.time_to_x_position(time_str, width)
painter.drawText(int(x) - 20, height - 5, time_str)
def _draw_waveform(self, painter: QPainter, width: int, height: int):
"""绘制波形线"""
if len(self.data_points) < 2:
return
# 设置波形线样式
waveform_color = QColor(0, 200, 255)
painter.setPen(QPen(waveform_color, 3))
points = []
# 计算Y轴范围基准值±100点
y_min = self.base_value - 100
y_max = self.base_value + 100
y_range = y_max - y_min
for time_str, value in self.data_points:
x = self.time_to_x_position(time_str, width)
# 计算Y坐标从底部到顶部
y_ratio = (value - y_min) / y_range
y = height - (y_ratio * height)
points.append(QPointF(x, y))
# 绘制折线
for i in range(len(points) - 1):
painter.drawLine(points[i], points[i + 1])
def _draw_data_points(self, painter: QPainter, width: int, height: int):
"""绘制数据点"""
point_color = QColor(255, 100, 100)
painter.setPen(QPen(point_color, 1))
painter.setBrush(QBrush(point_color))
# 计算Y轴范围
y_min = self.base_value - 100
y_max = self.base_value + 100
y_range = y_max - y_min
for time_str, value in self.data_points:
x = self.time_to_x_position(time_str, width)
y_ratio = (value - y_min) / y_range
y = height - (y_ratio * height)
# 绘制数据点圆圈
painter.drawEllipse(int(x) - 3, int(y) - 3, 6, 6)
# 显示数值标签
painter.setPen(QPen(QColor(200, 200, 200), 1))
painter.drawText(int(x) + 5, int(y) - 5, f"{value:.2f}")
def _draw_no_data_message(self, painter: QPainter, width: int, height: int):
"""绘制无数据提示信息"""
# 设置提示信息颜色
message_color = QColor(150, 150, 150)
painter.setPen(QPen(message_color, 2))
# 设置字体
font = painter.font()
font.setPointSize(14)
font.setBold(True)
painter.setFont(font)
# 绘制提示信息
message = "等待数据..."
text_rect = painter.fontMetrics().boundingRect(message)
x = (width - text_rect.width()) // 2
y = height // 2
painter.drawText(x, y, message)
def _draw_non_trading_message(self, painter: QPainter, width: int, height: int):
"""绘制非交易时间提示信息或截图"""
# 如果有截图管理器,尝试显示截图
if self.screenshot_manager:
screenshot_path = self._get_or_capture_screenshot()
if screenshot_path:
self._draw_screenshot(painter, screenshot_path, width, height)
return
# 如果没有截图或截图失败,显示文本提示
self._draw_text_message(painter, width, height)
def _get_or_capture_screenshot(self) -> str:
"""获取或捕获最新的截图"""
try:
# 检查是否有最新的截图
latest_screenshot = self.screenshot_manager.get_latest_screenshot()
# 如果截图文件存在且较新5分钟内使用现有截图
if latest_screenshot and os.path.exists(latest_screenshot):
file_time = datetime.fromtimestamp(os.path.getmtime(latest_screenshot))
current_time = datetime.now()
# 如果截图在5分钟内直接使用
if (current_time - file_time).total_seconds() < 300: # 5分钟
return latest_screenshot
# 否则捕获新的截图
logger.info("开始捕获上海证券交易所网站截图")
new_screenshot = self.screenshot_manager.capture_chart_screenshot()
if new_screenshot:
# 清理旧截图
self.screenshot_manager.cleanup_old_screenshots()
return new_screenshot
except Exception as e:
logger.error(f"获取截图失败: {e}")
return ""
def _draw_screenshot(self, painter: QPainter, screenshot_path: str, width: int, height: int):
"""绘制截图"""
try:
# 加载截图
pixmap = QPixmap(screenshot_path)
if pixmap.isNull():
logger.warning(f"截图加载失败: {screenshot_path}")
self._draw_text_message(painter, width, height)
return
# 计算缩放比例,保持宽高比
pixmap_width = pixmap.width()
pixmap_height = pixmap.height()
# 计算缩放比例,使截图适应显示区域
scale_x = width / pixmap_width
scale_y = height / pixmap_height
scale = min(scale_x, scale_y) * 0.8 # 留出边距
# 计算显示尺寸
display_width = int(pixmap_width * scale)
display_height = int(pixmap_height * scale)
# 计算显示位置(居中)
x = (width - display_width) // 2
y = (height - display_height) // 2
# 绘制截图
scaled_pixmap = pixmap.scaled(display_width, display_height)
painter.drawPixmap(x, y, scaled_pixmap)
# 绘制标题
font = painter.font()
font.setPointSize(12)
font.setBold(True)
painter.setFont(font)
title_color = QColor(255, 255, 255)
painter.setPen(QPen(title_color, 2))
title = "上海证券交易所实时图表"
title_rect = painter.fontMetrics().boundingRect(title)
title_x = (width - title_rect.width()) // 2
title_y = y - 10
painter.drawText(title_x, title_y, title)
# 绘制更新时间
font.setPointSize(8)
font.setBold(False)
painter.setFont(font)
update_time = datetime.now().strftime("更新时间: %H:%M:%S")
time_rect = painter.fontMetrics().boundingRect(update_time)
time_x = (width - time_rect.width()) // 2
time_y = y + display_height + 20
painter.drawText(time_x, time_y, update_time)
except Exception as e:
logger.error(f"绘制截图失败: {e}")
self._draw_text_message(painter, width, height)
def _draw_text_message(self, painter: QPainter, width: int, height: int):
"""绘制文本提示信息"""
# 设置提示信息颜色
message_color = QColor(200, 100, 100)
painter.setPen(QPen(message_color, 2))
# 设置字体
font = painter.font()
font.setPointSize(14)
font.setBold(True)
painter.setFont(font)
# 绘制提示信息
message = "非交易时间"
text_rect = painter.fontMetrics().boundingRect(message)
x = (width - text_rect.width()) // 2
y = height // 2
painter.drawText(x, y, message)
# 绘制交易时间说明
font.setPointSize(10)
font.setBold(False)
painter.setFont(font)
info = "交易时间: 9:30-11:30, 13:00-15:00"
info_rect = painter.fontMetrics().boundingRect(info)
x_info = (width - info_rect.width()) // 2
y_info = y + 30
painter.drawText(x_info, y_info, info)
# 绘制当前时间
current_time = datetime.now().strftime("%H:%M")
time_info = f"当前时间: {current_time}"
time_rect = painter.fontMetrics().boundingRect(time_info)
x_time = (width - time_rect.width()) // 2
y_time = y_info + 25
painter.drawText(x_time, y_time, time_info)