from django.db.models import Count, Case, When, Q, Subquery, OuterRef from .models import ServiceGroup, Service, ServiceCheckRecord # 引入loguru库用于日志记录 try: from loguru import logger except ImportError: import logging logger = logging.getLogger(__name__) # 工具函数:获取或创建服务 def get_or_create_service(data): """当客户端上报一个不存在的服务时,自动创建""" service, created = Service.objects.get_or_create( name=data['service_name'], host=data['host'], port=data.get('port'), defaults={ 'group': ServiceGroup.objects.get_or_create(name="Default")[0], 'check_type': data['check_type'], 'description': f"Auto-created from client checkin: {data['service_name']}" } ) if created: logger.info(f"自动创建新服务: {service.name} ({service.host})") return service # 工具函数:获取状态摘要 def get_status_summary(): """获取全局状态摘要(如:总共服务数、正常数、异常数)""" # 获取每个服务的最新状态 latest_records = ServiceCheckRecord.objects.filter( service=OuterRef('pk') ).order_by('-checked_at') services_with_status = Service.objects.annotate( latest_status=Subquery(latest_records.values('status')[:1]) ) total = services_with_status.count() up_count = services_with_status.filter(latest_status='UP').count() down_count = services_with_status.filter(latest_status='DOWN').count() unknown_count = services_with_status.filter(latest_status='UNKNOWN').count() return { 'total': total, 'up': up_count, 'down': down_count, 'unknown': unknown_count } # 工具函数:获取服务的最新状态 def get_service_latest_status(service): """获取服务的最新状态""" latest_record = service.records.order_by('-checked_at').first() if latest_record: return { 'status': latest_record.status, 'check_time': latest_record.checked_at, 'response_time': latest_record.response_time, 'message': latest_record.message } return { 'status': 'UNKNOWN', 'check_time': None, 'response_time': None, 'message': None } # 工具函数:获取服务的状态变化时间轴 def get_service_status_timeline(service, limit=20): """获取服务的状态变化时间轴""" status_changes = [] prev_status = None for record in service.records.all().order_by('-checked_at'): if prev_status is None or record.status != prev_status: status_changes.append(record) prev_status = record.status if len(status_changes) >= limit: break return status_changes # 工具函数:获取服务的响应时间趋势数据 def get_service_response_time_chart_data(service, hours=24): """获取服务的响应时间趋势数据""" from django.utils import timezone import datetime end_time = timezone.now() start_time = end_time - datetime.timedelta(hours=hours) # 获取指定时间范围内的记录 chart_records = service.records.filter( checked_at__gte=start_time, checked_at__lte=end_time, response_time__isnull=False ).order_by('checked_at') # 准备图表数据 chart_labels = [] chart_data = [] for record in chart_records: chart_labels.append(record.checked_at.strftime('%H:%M')) chart_data.append(record.response_time) return { 'labels': chart_labels, 'data': chart_data }