from proxmoxer import ProxmoxAPI from typing import List, Dict, Any, Optional import json import os from datetime import datetime CONFIG_FILE = "config.json" class VMManager: def __init__(self): self.proxmox: Optional[ProxmoxAPI] = None self.config: Dict[str, Any] = {} self.load_config() def load_config(self) -> Dict[str, Any]: if os.path.exists(CONFIG_FILE): try: with open(CONFIG_FILE, 'r', encoding='utf-8') as f: self.config = json.load(f) except Exception: self.config = {} return self.config def save_config(self, config: Dict[str, Any]) -> None: self.config = config with open(CONFIG_FILE, 'w', encoding='utf-8') as f: json.dump(config, f, indent=4, ensure_ascii=False) def get_config(self) -> Dict[str, Any]: return self.config def connect(self, host: str, user: str, password: str, node: str = "pve", verify_ssl: bool = False) -> bool: try: self.proxmox = ProxmoxAPI( host, user=user, password=password, verify_ssl=verify_ssl ) self.proxmox.version.get() return True except Exception as e: print(f"连接失败: {e}") return False def is_connected(self) -> bool: return self.proxmox is not None def get_vms(self) -> List[Dict[str, Any]]: if not self.proxmox: return [] try: resources = self.proxmox.cluster.resources.get() vms = [r for r in resources if r['type'] == 'qemu'] return vms except Exception as e: print(f"获取虚拟机列表失败: {e}") return [] def get_vm_stats(self, vms: List[Dict]) -> Dict[str, int]: stats = {"total": len(vms), "running": 0, "stopped": 0, "paused": 0} for vm in vms: status = vm.get('status', '').lower() if status == 'running': stats['running'] += 1 elif status == 'stopped': stats['stopped'] += 1 elif status == 'paused': stats['paused'] += 1 return stats def format_memory(self, bytes_value: int) -> str: gb = bytes_value / (1024 ** 3) return f"{gb:.1f} GB" def format_uptime(self, seconds: int) -> str: if seconds <= 0: return "-" days = seconds // 86400 hours = (seconds % 86400) // 3600 if days > 0: return f"{days}d {hours}h" return f"{hours}h" def get_vm_info(self, vm: Dict) -> Dict[str, Any]: status = vm.get('status', 'unknown').lower() status_text = '运行中' if status == 'running' else '已停止' if status == 'stopped' else '已暂停' if status == 'paused' else status maxcpu = vm.get('maxcpu', 0) cpu = vm.get('cpu', 0) cpu_percent = f"{int(cpu * 100)}%" if maxcpu > 0 else "0%" mem = vm.get('mem', 0) maxmem = vm.get('maxmem', 0) memory_text = f"{self.format_memory(mem)}/{self.format_memory(maxmem)}" if maxmem > 0 else "-" disk = vm.get('disk', 0) maxdisk = vm.get('maxdisk', 0) disk_text = f"{self.format_memory(disk)}/{self.format_memory(maxdisk)}" if maxdisk > 0 else "-" uptime_seconds = vm.get('uptime', 0) uptime_text = self.format_uptime(uptime_seconds) return { 'vmid': vm.get('vmid', 0), 'name': vm.get('name', '无名称'), 'status': status, 'status_text': status_text, 'cpu_percent': cpu_percent, 'memory': memory_text, 'disk': disk_text, 'uptime': uptime_text } def start_vm(self, vmid: int, node: str) -> bool: return self._vm_operation(vmid, node, 'start') def shutdown_vm(self, vmid: int, node: str) -> bool: return self._vm_operation(vmid, node, 'shutdown') def reboot_vm(self, vmid: int, node: str) -> bool: return self._vm_operation(vmid, node, 'reboot') def stop_vm(self, vmid: int, node: str) -> bool: return self._vm_operation(vmid, node, 'stop') def suspend_vm(self, vmid: int, node: str) -> bool: return self._vm_operation(vmid, node, 'suspend') def _vm_operation(self, vmid: int, node: str, operation: str) -> bool: if not self.proxmox: return False try: endpoint = self.proxmox.nodes(node).qemu(vmid).status if hasattr(endpoint, operation): getattr(endpoint, operation).post() return True return False except Exception as e: print(f"操作失败 ({operation}): {e}") return False