Files
webstatus/status/api_views.py

142 lines
5.0 KiB
Python
Raw Permalink Normal View History

from rest_framework import status, generics
from rest_framework.views import APIView
from rest_framework.response import Response
from django.db.models import Count, Q
from .models import Service, ServiceCheckRecord, ServiceGroup
from .serializers import (
ServiceSerializer,
ServiceCheckRecordSerializer,
ServiceCheckInSerializer,
StatusSummarySerializer
)
# 引入loguru库用于日志记录
try:
from loguru import logger
except ImportError:
import logging
logger = logging.getLogger(__name__)
class CheckInAPIView(APIView):
"""客户端上报接口"""
def post(self, request, *args, **kwargs):
"""处理客户端上报请求"""
serializer = ServiceCheckInSerializer(data=request.data)
if serializer.is_valid():
try:
result = serializer.create(serializer.validated_data)
logger.info(f"服务上报成功服务ID: {result['service_id']}")
return Response({
"code": status.HTTP_200_OK,
"message": "上报成功",
"service_id": result['service_id']
})
except Exception as e:
logger.error(f"服务上报失败: {str(e)}")
return Response({
"code": status.HTTP_500_INTERNAL_SERVER_ERROR,
"message": f"上报失败: {str(e)}"
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
else:
logger.warning(f"服务上报数据无效: {serializer.errors}")
return Response({
"code": status.HTTP_400_BAD_REQUEST,
"message": "数据无效",
"errors": serializer.errors
}, status=status.HTTP_400_BAD_REQUEST)
class ServiceListAPIView(generics.ListAPIView):
"""获取所有服务列表(含最新状态)"""
serializer_class = ServiceSerializer
queryset = Service.objects.all().select_related('group')
def get_queryset(self):
queryset = super().get_queryset()
# 支持按分组过滤
group_name = self.request.query_params.get('group')
if group_name:
queryset = queryset.filter(group__name=group_name)
# 支持按状态过滤
status_filter = self.request.query_params.get('status')
if status_filter:
# 获取每个服务的最新状态
service_ids_with_status = []
for service in queryset:
if service.get_latest_status() == status_filter.upper():
service_ids_with_status.append(service.id)
queryset = queryset.filter(id__in=service_ids_with_status)
# 支持按名称搜索
search = self.request.query_params.get('search')
if search:
queryset = queryset.filter(
Q(name__icontains=search) |
Q(host__icontains=search) |
Q(description__icontains=search)
)
logger.info(f"获取服务列表,过滤参数: group={group_name}, status={status_filter}, search={search}")
return queryset
class ServiceHistoryAPIView(generics.ListAPIView):
"""获取某服务历史记录(分页)"""
serializer_class = ServiceCheckRecordSerializer
def get_queryset(self):
service_id = self.kwargs.get('service_id')
try:
service = Service.objects.get(id=service_id)
except Service.DoesNotExist:
logger.warning(f"请求的服务不存在ID: {service_id}")
return ServiceCheckRecord.objects.none()
queryset = ServiceCheckRecord.objects.filter(service=service)
logger.info(f"获取服务历史记录服务ID: {service_id}")
return queryset
class StatusSummaryAPIView(APIView):
"""获取全局状态摘要"""
def get(self, request, *args, **kwargs):
"""获取全局状态摘要"""
services = Service.objects.all()
total_services = services.count()
up_services = 0
down_services = 0
unknown_services = 0
for service in services:
latest_status = service.get_latest_status()
if latest_status == 'UP':
up_services += 1
elif latest_status == 'DOWN':
down_services += 1
else:
unknown_services += 1
# 计算正常运行时间百分比
if total_services > 0:
uptime_percentage = (up_services / total_services) * 100
else:
uptime_percentage = 0
data = {
'total_services': total_services,
'up_services': up_services,
'down_services': down_services,
'unknown_services': unknown_services,
'uptime_percentage': round(uptime_percentage, 2)
}
serializer = StatusSummarySerializer(data)
logger.info(f"获取状态摘要: {data}")
return Response(serializer.data)