Files
webstatus/status/utils.py
2025-09-07 16:47:12 +08:00

113 lines
3.6 KiB
Python

from django.db.models import Count, Case, When, Q, Subquery, OuterRef
from .models import ServiceGroup, Service, ServiceCheckRecord
# 引入loguru库用于日志记录
try:
from loguru import logger
except ImportError:
import logging
logger = logging.getLogger(__name__)
# 工具函数:获取或创建服务
def get_or_create_service(data):
"""当客户端上报一个不存在的服务时,自动创建"""
service, created = Service.objects.get_or_create(
name=data['service_name'],
host=data['host'],
port=data.get('port'),
defaults={
'group': ServiceGroup.objects.get_or_create(name="Default")[0],
'check_type': data['check_type'],
'description': f"Auto-created from client checkin: {data['service_name']}"
}
)
if created:
logger.info(f"自动创建新服务: {service.name} ({service.host})")
return service
# 工具函数:获取状态摘要
def get_status_summary():
"""获取全局状态摘要(如:总共服务数、正常数、异常数)"""
# 获取每个服务的最新状态
latest_records = ServiceCheckRecord.objects.filter(
service=OuterRef('pk')
).order_by('-checked_at')
services_with_status = Service.objects.annotate(
latest_status=Subquery(latest_records.values('status')[:1])
)
total = services_with_status.count()
up_count = services_with_status.filter(latest_status='UP').count()
down_count = services_with_status.filter(latest_status='DOWN').count()
unknown_count = services_with_status.filter(latest_status='UNKNOWN').count()
return {
'total': total,
'up': up_count,
'down': down_count,
'unknown': unknown_count
}
# 工具函数:获取服务的最新状态
def get_service_latest_status(service):
"""获取服务的最新状态"""
latest_record = service.records.order_by('-checked_at').first()
if latest_record:
return {
'status': latest_record.status,
'check_time': latest_record.checked_at,
'response_time': latest_record.response_time,
'message': latest_record.message
}
return {
'status': 'UNKNOWN',
'check_time': None,
'response_time': None,
'message': None
}
# 工具函数:获取服务的状态变化时间轴
def get_service_status_timeline(service, limit=20):
"""获取服务的状态变化时间轴"""
status_changes = []
prev_status = None
for record in service.records.all().order_by('-checked_at'):
if prev_status is None or record.status != prev_status:
status_changes.append(record)
prev_status = record.status
if len(status_changes) >= limit:
break
return status_changes
# 工具函数:获取服务的响应时间趋势数据
def get_service_response_time_chart_data(service, hours=24):
"""获取服务的响应时间趋势数据"""
from django.utils import timezone
import datetime
end_time = timezone.now()
start_time = end_time - datetime.timedelta(hours=hours)
# 获取指定时间范围内的记录
chart_records = service.records.filter(
checked_at__gte=start_time,
checked_at__lte=end_time,
response_time__isnull=False
).order_by('checked_at')
# 准备图表数据
chart_labels = []
chart_data = []
for record in chart_records:
chart_labels.append(record.checked_at.strftime('%H:%M'))
chart_data.append(record.response_time)
return {
'labels': chart_labels,
'data': chart_data
}