import os from datetime import date from django.shortcuts import get_object_or_404, render from django.http import HttpResponse from django_filters import rest_framework as filters from rest_framework import viewsets, status, parsers, renderers from rest_framework.decorators import action, api_view, parser_classes from rest_framework.response import Response from rest_framework.permissions import IsAuthenticatedOrReadOnly, IsAuthenticated from openpyxl import Workbook, load_workbook from openpyxl.styles import Font import pandas as pd from loguru import logger def home_page(request): from collections import OrderedDict from django.db.models import Count, Q all_devices = Device.objects.select_related(None).prefetch_related( 'serials', 'ips', 'maintenance_records' ).order_by('location', 'building', 'cabinet') devices_by_location = OrderedDict() for device in all_devices: loc = device.location or '未分类' if loc not in devices_by_location: devices_by_location[loc] = [] devices_by_location[loc].append(device) status_counts = { 'normal': all_devices.filter(status='normal').count(), 'warning': all_devices.filter(status='warning').count(), 'offline': all_devices.filter(status='offline').count(), 'repair': all_devices.filter(status='repair').count(), 'scrap': all_devices.filter(status='scrap').count(), } warranty_expiring = all_devices.filter( warranty_expire__lte=date.today() + __import__('datetime').timedelta(days=30), warranty_expire__gte=date.today() ).count() context = { 'devices_by_location': devices_by_location, 'total_count': all_devices.count(), 'status_counts': status_counts, 'warranty_expiring': warranty_expiring, 'location_count': len(devices_by_location), } return render(request, 'device_management/index.html', context) from .models import ( Device, DeviceSerial, DeviceIP, MaintenanceRecord, DeviceAttachment ) from .serializers import ( DeviceSerializer, DeviceListSerializer, DeviceSerialSerializer, DeviceIPSerializer, MaintenanceRecordSerializer, DeviceAttachmentSerializer ) class DeviceFilter(filters.FilterSet): location = filters.CharFilter(lookup_expr='icontains') building = filters.CharFilter(lookup_expr='icontains') from .models import DeviceStatus status = filters.ChoiceFilter(choices=DeviceStatus.choices) device_name = filters.CharFilter(lookup_expr='icontains') brand = filters.CharFilter(lookup_expr='icontains') enable_date_from = filters.DateFilter(field_name='enable_date', lookup_expr='gte') enable_date_to = filters.DateFilter(field_name='enable_date', lookup_expr='lte') warranty_expire_soon = filters.BooleanFilter(method='filter_warranty_expire_soon') class Meta: model = Device fields = ['location', 'building', 'status', 'device_name', 'brand'] def filter_warranty_expire_soon(self, queryset, name, value): if value: from datetime import timedelta soon_date = date.today() + timedelta(days=30) return queryset.filter(warranty_expire__lte=soon_date, warranty_expire__gte=date.today()) return queryset class DeviceViewSet(viewsets.ModelViewSet): queryset = Device.objects.all().prefetch_related('serials', 'ips', 'maintenance_records') serializer_class = DeviceSerializer filterset_class = DeviceFilter search_fields = ['device_name', 'location', 'building', 'brand', 'model'] ordering_fields = ['id', 'device_name', 'location', 'enable_date', 'created_at'] permission_classes = [IsAuthenticatedOrReadOnly] def get_serializer_class(self): if self.action == 'list': return DeviceListSerializer return DeviceSerializer @action(detail=True, methods=['post'], parser_classes=[parsers.MultiPartParser]) def upload_attachment(self, request, pk=None): device = self.get_object() file_obj = request.FILES.get('file') if not file_obj: return Response({'error': 'No file provided'}, status=status.HTTP_400_BAD_REQUEST) file_type = request.data.get('file_type', '') attachment = DeviceAttachment.objects.create( device=device, file=file_obj, file_name=file_obj.name, file_type=file_type ) if file_type.startswith('image/') or file_obj.name.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp')): device.generate_thumbnail(file_obj) device.save() logger.info(f'Uploaded attachment {attachment.id} for device {device.id}') return Response(DeviceAttachmentSerializer(attachment).data, status=status.HTTP_201_CREATED) @action(detail=False, methods=['get']) def export_excel(self, request): queryset = self.filter_queryset(self.get_queryset()) wb = Workbook() ws = wb.active ws.title = '设备列表' headers = ['ID', '地点', '楼栋', '设备名称', '型号', '品牌', '状态', '运维人员', '启用日期', '主序列号', '主IP', '最近维修简述', '服役天数'] for col, header in enumerate(headers, 1): cell = ws.cell(row=1, column=col, value=header) cell.font = Font(bold=True) for row, device in enumerate(queryset, 2): primary_serial = device.serials.filter(is_primary=True).first() primary_ip = device.ips.filter(is_primary=True).first() latest_maintenance = device.maintenance_records.first() ws.cell(row=row, column=1, value=device.id) ws.cell(row=row, column=2, value=device.location) ws.cell(row=row, column=3, value=device.building or '') ws.cell(row=row, column=4, value=device.device_name) ws.cell(row=row, column=5, value=device.model or '') ws.cell(row=row, column=6, value=device.brand or '') ws.cell(row=row, column=7, value=device.get_status_display()) ws.cell(row=row, column=8, value=device.responsible_person or '') ws.cell(row=row, column=9, value=str(device.enable_date) if device.enable_date else '') ws.cell(row=row, column=10, value=primary_serial.serial_number if primary_serial else '') ws.cell(row=row, column=11, value=primary_ip.ip_address if primary_ip else '') ws.cell(row=row, column=12, value=str(latest_maintenance) if latest_maintenance else '') ws.cell(row=row, column=13, value=device.service_duration_days) response = HttpResponse(content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') response['Content-Disposition'] = f'attachment; filename="devices_{date.today()}.xlsx"' wb.save(response) logger.info(f'Exported {queryset.count()} devices to Excel') return response @action(detail=False, methods=['post'], parser_classes=[parsers.MultiPartParser]) def import_excel(self, request): file_obj = request.FILES.get('file') if not file_obj: return Response({'error': 'No file provided'}, status=status.HTTP_400_BAD_REQUEST) try: df = pd.read_excel(file_obj) created_count = 0 for _, row in df.iterrows(): device_data = { 'location': str(row.get('地点', '')), 'building': str(row.get('楼栋', '')) if pd.notna(row.get('楼栋')) else None, 'device_name': str(row.get('设备名称', '')), 'model': str(row.get('型号', '')) if pd.notna(row.get('型号')) else None, 'brand': str(row.get('品牌', '')) if pd.notna(row.get('品牌')) else None, 'status': str(row.get('状态', 'normal')) if pd.notna(row.get('状态')) else 'normal', 'responsible_person': str(row.get('运维人员', '')) if pd.notna(row.get('运维人员')) else None, } device = Device.objects.create(**device_data) if pd.notna(row.get('主序列号')): DeviceSerial.objects.create( device=device, serial_number=str(row.get('主序列号')), serial_type='main', is_primary=True ) if pd.notna(row.get('主IP')): DeviceIP.objects.create( device=device, ip_address=str(row.get('主IP')), ip_type='management', is_primary=True ) created_count += 1 logger.info(f'Imported {created_count} devices from Excel') return Response({'message': f'成功导入 {created_count} 台设备', 'count': created_count}) except Exception as e: logger.error(f'Excel import failed: {e}') return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST) class DeviceSerialViewSet(viewsets.ModelViewSet): queryset = DeviceSerial.objects.all() serializer_class = DeviceSerialSerializer search_fields = ['serial_number', 'serial_type'] permission_classes = [IsAuthenticatedOrReadOnly] class DeviceIPViewSet(viewsets.ModelViewSet): queryset = DeviceIP.objects.all() serializer_class = DeviceIPSerializer search_fields = ['ip_address', 'ip_type'] permission_classes = [IsAuthenticatedOrReadOnly] class MaintenanceRecordViewSet(viewsets.ModelViewSet): queryset = MaintenanceRecord.objects.all() serializer_class = MaintenanceRecordSerializer filterset_fields = ['device_id', 'maintenance_by'] search_fields = ['fault_description', 'repair_content', 'maintenance_by'] permission_classes = [IsAuthenticatedOrReadOnly] class DeviceAttachmentViewSet(viewsets.ModelViewSet): queryset = DeviceAttachment.objects.all() serializer_class = DeviceAttachmentSerializer parser_classes = [parsers.MultiPartParser, parsers.FormParser] permission_classes = [IsAuthenticatedOrReadOnly] def perform_create(self, serializer): device_id = self.request.data.get('device') device = get_object_or_404(Device, id=device_id) file_obj = self.request.FILES.get('file') attachment = serializer.save( device=device, file_name=file_obj.name if file_obj else '' ) logger.info(f'Created attachment {attachment.id}')