217 lines
9.7 KiB
Python
217 lines
9.7 KiB
Python
import os
|
|
from datetime import date
|
|
from django.shortcuts import get_object_or_404, render
|
|
from django.http import HttpResponse
|
|
from django_filters import rest_framework as filters
|
|
from rest_framework import viewsets, status, parsers, renderers
|
|
from rest_framework.decorators import action, api_view, parser_classes
|
|
from rest_framework.response import Response
|
|
from rest_framework.permissions import IsAuthenticatedOrReadOnly, IsAuthenticated
|
|
from drf_yasg.utils import swagger_auto_schema
|
|
from drf_yasg import openapi as swagger_openapi
|
|
from openpyxl import Workbook, load_workbook
|
|
from openpyxl.styles import Font
|
|
import pandas as pd
|
|
from loguru import logger
|
|
|
|
|
|
def home_page(request):
|
|
return render(request, 'device_management/index.html')
|
|
|
|
from .models import (
|
|
Device, DeviceSerial, DeviceIP, MaintenanceRecord, DeviceAttachment
|
|
)
|
|
from .serializers import (
|
|
DeviceSerializer, DeviceListSerializer, DeviceSerialSerializer,
|
|
DeviceIPSerializer, MaintenanceRecordSerializer, DeviceAttachmentSerializer
|
|
)
|
|
|
|
|
|
class DeviceFilter(filters.FilterSet):
|
|
location = filters.CharFilter(lookup_expr='icontains')
|
|
building = filters.CharFilter(lookup_expr='icontains')
|
|
from .models import DeviceStatus
|
|
status = filters.ChoiceFilter(choices=DeviceStatus.choices)
|
|
device_name = filters.CharFilter(lookup_expr='icontains')
|
|
brand = filters.CharFilter(lookup_expr='icontains')
|
|
enable_date_from = filters.DateFilter(field_name='enable_date', lookup_expr='gte')
|
|
enable_date_to = filters.DateFilter(field_name='enable_date', lookup_expr='lte')
|
|
warranty_expire_soon = filters.BooleanFilter(method='filter_warranty_expire_soon')
|
|
|
|
class Meta:
|
|
model = Device
|
|
fields = ['location', 'building', 'status', 'device_name', 'brand']
|
|
|
|
def filter_warranty_expire_soon(self, queryset, name, value):
|
|
if value:
|
|
from datetime import timedelta
|
|
soon_date = date.today() + timedelta(days=30)
|
|
return queryset.filter(warranty_expire__lte=soon_date, warranty_expire__gte=date.today())
|
|
return queryset
|
|
|
|
|
|
class DeviceViewSet(viewsets.ModelViewSet):
|
|
queryset = Device.objects.all().prefetch_related('serials', 'ips', 'maintenance_records')
|
|
serializer_class = DeviceSerializer
|
|
filterset_class = DeviceFilter
|
|
search_fields = ['device_name', 'location', 'building', 'brand', 'model']
|
|
ordering_fields = ['id', 'device_name', 'location', 'enable_date', 'created_at']
|
|
permission_classes = [IsAuthenticatedOrReadOnly]
|
|
|
|
def get_serializer_class(self):
|
|
if self.action == 'list':
|
|
return DeviceListSerializer
|
|
return DeviceSerializer
|
|
|
|
@action(detail=True, methods=['post'], parser_classes=[parsers.MultiPartParser])
|
|
def upload_attachment(self, request, pk=None):
|
|
device = self.get_object()
|
|
file_obj = request.FILES.get('file')
|
|
|
|
if not file_obj:
|
|
return Response({'error': 'No file provided'}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
file_type = request.data.get('file_type', '')
|
|
attachment = DeviceAttachment.objects.create(
|
|
device=device,
|
|
file=file_obj,
|
|
file_name=file_obj.name,
|
|
file_type=file_type
|
|
)
|
|
|
|
if file_type.startswith('image/') or file_obj.name.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp')):
|
|
device.generate_thumbnail(file_obj)
|
|
device.save()
|
|
|
|
logger.info(f'Uploaded attachment {attachment.id} for device {device.id}')
|
|
return Response(DeviceAttachmentSerializer(attachment).data, status=status.HTTP_201_CREATED)
|
|
|
|
@action(detail=False, methods=['get'])
|
|
def export_excel(self, request):
|
|
queryset = self.filter_queryset(self.get_queryset())
|
|
|
|
wb = Workbook()
|
|
ws = wb.active
|
|
ws.title = '设备列表'
|
|
|
|
headers = ['ID', '地点', '楼栋', '设备名称', '型号', '品牌', '状态',
|
|
'负责人', '启用日期', '主序列号', '主IP', '最近维修简述', '服役天数']
|
|
for col, header in enumerate(headers, 1):
|
|
cell = ws.cell(row=1, column=col, value=header)
|
|
cell.font = Font(bold=True)
|
|
|
|
for row, device in enumerate(queryset, 2):
|
|
primary_serial = device.serials.filter(is_primary=True).first()
|
|
primary_ip = device.ips.filter(is_primary=True).first()
|
|
latest_maintenance = device.maintenance_records.first()
|
|
|
|
ws.cell(row=row, column=1, value=device.id)
|
|
ws.cell(row=row, column=2, value=device.location)
|
|
ws.cell(row=row, column=3, value=device.building or '')
|
|
ws.cell(row=row, column=4, value=device.device_name)
|
|
ws.cell(row=row, column=5, value=device.model or '')
|
|
ws.cell(row=row, column=6, value=device.brand or '')
|
|
ws.cell(row=row, column=7, value=device.get_status_display())
|
|
ws.cell(row=row, column=8, value=device.responsible_person or '')
|
|
ws.cell(row=row, column=9, value=str(device.enable_date) if device.enable_date else '')
|
|
ws.cell(row=row, column=10, value=primary_serial.serial_number if primary_serial else '')
|
|
ws.cell(row=row, column=11, value=primary_ip.ip_address if primary_ip else '')
|
|
ws.cell(row=row, column=12, value=str(latest_maintenance) if latest_maintenance else '')
|
|
ws.cell(row=row, column=13, value=device.service_duration_days)
|
|
|
|
response = HttpResponse(content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
|
|
response['Content-Disposition'] = f'attachment; filename="devices_{date.today()}.xlsx"'
|
|
wb.save(response)
|
|
|
|
logger.info(f'Exported {queryset.count()} devices to Excel')
|
|
return response
|
|
|
|
@action(detail=False, methods=['post'], parser_classes=[parsers.MultiPartParser])
|
|
def import_excel(self, request):
|
|
file_obj = request.FILES.get('file')
|
|
if not file_obj:
|
|
return Response({'error': 'No file provided'}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
try:
|
|
df = pd.read_excel(file_obj)
|
|
created_count = 0
|
|
|
|
for _, row in df.iterrows():
|
|
device_data = {
|
|
'location': str(row.get('地点', '')),
|
|
'building': str(row.get('楼栋', '')) if pd.notna(row.get('楼栋')) else None,
|
|
'device_name': str(row.get('设备名称', '')),
|
|
'model': str(row.get('型号', '')) if pd.notna(row.get('型号')) else None,
|
|
'brand': str(row.get('品牌', '')) if pd.notna(row.get('品牌')) else None,
|
|
'status': str(row.get('状态', 'normal')) if pd.notna(row.get('状态')) else 'normal',
|
|
'responsible_person': str(row.get('负责人', '')) if pd.notna(row.get('负责人')) else None,
|
|
}
|
|
|
|
device = Device.objects.create(**device_data)
|
|
|
|
if pd.notna(row.get('主序列号')):
|
|
DeviceSerial.objects.create(
|
|
device=device,
|
|
serial_number=str(row.get('主序列号')),
|
|
serial_type='main',
|
|
is_primary=True
|
|
)
|
|
|
|
if pd.notna(row.get('主IP')):
|
|
DeviceIP.objects.create(
|
|
device=device,
|
|
ip_address=str(row.get('主IP')),
|
|
ip_type='management',
|
|
is_primary=True
|
|
)
|
|
|
|
created_count += 1
|
|
|
|
logger.info(f'Imported {created_count} devices from Excel')
|
|
return Response({'message': f'成功导入 {created_count} 台设备', 'count': created_count})
|
|
|
|
except Exception as e:
|
|
logger.error(f'Excel import failed: {e}')
|
|
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
class DeviceSerialViewSet(viewsets.ModelViewSet):
|
|
queryset = DeviceSerial.objects.all()
|
|
serializer_class = DeviceSerialSerializer
|
|
search_fields = ['serial_number', 'serial_type']
|
|
permission_classes = [IsAuthenticatedOrReadOnly]
|
|
|
|
|
|
class DeviceIPViewSet(viewsets.ModelViewSet):
|
|
queryset = DeviceIP.objects.all()
|
|
serializer_class = DeviceIPSerializer
|
|
search_fields = ['ip_address', 'ip_type']
|
|
permission_classes = [IsAuthenticatedOrReadOnly]
|
|
|
|
|
|
class MaintenanceRecordViewSet(viewsets.ModelViewSet):
|
|
queryset = MaintenanceRecord.objects.all()
|
|
serializer_class = MaintenanceRecordSerializer
|
|
filterset_fields = ['device_id', 'maintenance_by']
|
|
search_fields = ['fault_description', 'repair_content', 'maintenance_by']
|
|
permission_classes = [IsAuthenticatedOrReadOnly]
|
|
|
|
|
|
class DeviceAttachmentViewSet(viewsets.ModelViewSet):
|
|
queryset = DeviceAttachment.objects.all()
|
|
serializer_class = DeviceAttachmentSerializer
|
|
parser_classes = [parsers.MultiPartParser, parsers.FormParser]
|
|
permission_classes = [IsAuthenticatedOrReadOnly]
|
|
|
|
def perform_create(self, serializer):
|
|
device_id = self.request.data.get('device')
|
|
device = get_object_or_404(Device, id=device_id)
|
|
file_obj = self.request.FILES.get('file')
|
|
|
|
attachment = serializer.save(
|
|
device=device,
|
|
file_name=file_obj.name if file_obj else ''
|
|
)
|
|
|
|
logger.info(f'Created attachment {attachment.id}')
|