Files
webstatus/status/views.py

220 lines
7.5 KiB
Python
Raw Normal View History

2025-09-07 16:47:12 +08:00
from django.shortcuts import render, get_object_or_404
2025-07-31 21:21:45 +08:00
from django.http import JsonResponse
2025-09-07 16:47:12 +08:00
from django.db.models import Count, Case, When, Q, Subquery, OuterRef
from django.views.decorators.csrf import csrf_exempt
from django.utils import timezone
import json
from .models import ServiceGroup, Service, ServiceCheckRecord
from .utils import get_or_create_service, get_status_summary, get_service_latest_status, get_service_status_timeline, get_service_response_time_chart_data
# 引入loguru库用于日志记录
try:
from loguru import logger
except ImportError:
import logging
logger = logging.getLogger(__name__)
2025-07-31 21:21:45 +08:00
# 主页视图
def home(request):
2025-09-07 16:47:12 +08:00
"""首页Dashboard"""
summary = get_status_summary()
# 获取最近异常的服务
recent_issues = ServiceCheckRecord.objects.filter(
status__in=['DOWN', 'UNKNOWN']
).order_by('-checked_at')[:10]
context = {
'summary': summary,
'recent_issues': recent_issues
}
return render(request, 'status/index.html', context)
2025-07-31 21:21:45 +08:00
2025-09-07 16:47:12 +08:00
# 服务列表页
def service_list(request):
"""服务列表页"""
group_filter = request.GET.get('group')
status_filter = request.GET.get('status')
search_query = request.GET.get('search', '')
2025-07-31 21:21:45 +08:00
services = Service.objects.all()
2025-09-07 16:47:12 +08:00
if group_filter:
services = services.filter(group__name=group_filter)
if status_filter:
# 根据最新状态筛选
latest_records = ServiceCheckRecord.objects.filter(
service=OuterRef('pk')
).order_by('-checked_at')
services = services.annotate(
latest_status=Subquery(latest_records.values('status')[:1])
).filter(latest_status=status_filter)
if search_query:
services = services.filter(
Q(name__icontains=search_query) |
Q(host__icontains=search_query) |
Q(description__icontains=search_query)
)
# 获取每个服务的最新状态
latest_records = ServiceCheckRecord.objects.filter(
service=OuterRef('pk')
).order_by('-checked_at')
services = services.annotate(
latest_status=Subquery(latest_records.values('status')[:1]),
latest_check_time=Subquery(latest_records.values('checked_at')[:1]),
latest_response_time=Subquery(latest_records.values('response_time')[:1])
)
groups = ServiceGroup.objects.all()
context = {
'services': services,
'groups': groups,
'current_group': group_filter,
'current_status': status_filter,
'search_query': search_query
}
return render(request, 'status/service_list.html', context)
# 服务详情页
def service_detail(request, service_id):
"""服务详情页"""
service = get_object_or_404(Service, pk=service_id)
# 获取最近10条检测记录
recent_records = service.records.all()[:10]
# 获取状态变化时间轴数据
status_changes = get_service_status_timeline(service, limit=20)
# 获取响应时间趋势图数据
chart_data = get_service_response_time_chart_data(service, hours=24)
# 获取服务的最新状态信息
latest_status = get_service_latest_status(service)
service.latest_status = latest_status['status']
service.latest_check_time = latest_status['check_time']
service.latest_response_time = latest_status['response_time']
service.latest_message = latest_status['message']
context = {
'service': service,
'recent_records': recent_records,
'status_changes': status_changes,
'chart_labels': chart_data['labels'],
'chart_data': chart_data['data']
}
return render(request, 'status/service_detail.html', context)
# API视图 - 客户端上报接口
@csrf_exempt
def checkin(request):
"""客户端定期调用此接口上报服务状态"""
if request.method != 'POST':
return JsonResponse({'code': 405, 'message': '只支持POST请求'}, status=405)
try:
data = json.loads(request.body)
logger.info(f"收到服务状态上报: {data.get('service_name')} - {data.get('status')}")
# 验证必要字段
required_fields = ['service_name', 'host', 'check_type', 'status']
for field in required_fields:
if field not in data:
return JsonResponse({'code': 400, 'message': f'缺少必要字段: {field}'}, status=400)
# 获取或创建服务
service = get_or_create_service(data)
# 创建检测记录
record = ServiceCheckRecord.objects.create(
service=service,
status=data['status'],
response_time=data.get('response_time'),
message=data.get('message', '')
)
logger.info(f"服务状态已记录: {service.name} - {record.status}")
return JsonResponse({
'code': 200,
'message': '上报成功',
'service_id': service.id
})
except json.JSONDecodeError:
return JsonResponse({'code': 400, 'message': '无效的JSON数据'}, status=400)
except Exception as e:
logger.error(f"处理上报数据时出错: {str(e)}")
return JsonResponse({'code': 500, 'message': f'服务器内部错误: {str(e)}'}, status=500)
# API视图 - 获取所有服务列表(含最新状态)
def api_services(request):
"""获取所有服务列表(含最新状态)"""
# 获取每个服务的最新状态
latest_records = ServiceCheckRecord.objects.filter(
service=OuterRef('pk')
).order_by('-checked_at')
services = Service.objects.annotate(
latest_status=Subquery(latest_records.values('status')[:1]),
latest_check_time=Subquery(latest_records.values('checked_at')[:1]),
latest_response_time=Subquery(latest_records.values('response_time')[:1])
)
2025-07-31 21:21:45 +08:00
data = []
for service in services:
data.append({
2025-09-07 16:47:12 +08:00
'id': service.id,
2025-07-31 21:21:45 +08:00
'name': service.name,
2025-09-07 16:47:12 +08:00
'group': service.group.name,
'host': service.host,
2025-07-31 21:21:45 +08:00
'port': service.port,
2025-09-07 16:47:12 +08:00
'check_type': service.check_type,
'is_active': service.is_active,
'latest_status': service.latest_status,
'latest_check_time': service.latest_check_time.isoformat() if service.latest_check_time else None,
'latest_response_time': service.latest_response_time
2025-07-31 21:21:45 +08:00
})
2025-09-07 16:47:12 +08:00
2025-07-31 21:21:45 +08:00
return JsonResponse(data, safe=False)
2025-09-07 16:47:12 +08:00
# API视图 - 获取某服务历史记录(分页)
def api_service_history(request, service_id):
"""获取某服务历史记录(分页)"""
service = get_object_or_404(Service, pk=service_id)
# 分页参数
page = int(request.GET.get('page', 1))
page_size = int(request.GET.get('page_size', 20))
# 计算分页范围
start = (page - 1) * page_size
end = start + page_size
records = service.records.all()[start:end]
total = service.records.count()
data = []
for record in records:
data.append({
'id': record.id,
'status': record.status,
'response_time': record.response_time,
'message': record.message,
'checked_at': record.checked_at.isoformat()
})
return JsonResponse({
'total': total,
'page': page,
'page_size': page_size,
'records': data
})
# API视图 - 获取全局状态摘要
def api_status_summary(request):
"""获取全局状态摘要(如:总共服务数、正常数、异常数)"""
summary = get_status_summary()
return JsonResponse(summary)