143 lines
4.8 KiB
Python
143 lines
4.8 KiB
Python
|
|
from proxmoxer import ProxmoxAPI
|
||
|
|
from typing import List, Dict, Any, Optional
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
from datetime import datetime
|
||
|
|
|
||
|
|
CONFIG_FILE = "config.json"
|
||
|
|
|
||
|
|
class VMManager:
|
||
|
|
def __init__(self):
|
||
|
|
self.proxmox: Optional[ProxmoxAPI] = None
|
||
|
|
self.config: Dict[str, Any] = {}
|
||
|
|
self.load_config()
|
||
|
|
|
||
|
|
def load_config(self) -> Dict[str, Any]:
|
||
|
|
if os.path.exists(CONFIG_FILE):
|
||
|
|
try:
|
||
|
|
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
|
||
|
|
self.config = json.load(f)
|
||
|
|
except Exception:
|
||
|
|
self.config = {}
|
||
|
|
return self.config
|
||
|
|
|
||
|
|
def save_config(self, config: Dict[str, Any]) -> None:
|
||
|
|
self.config = config
|
||
|
|
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
|
||
|
|
json.dump(config, f, indent=4, ensure_ascii=False)
|
||
|
|
|
||
|
|
def get_config(self) -> Dict[str, Any]:
|
||
|
|
return self.config
|
||
|
|
|
||
|
|
def connect(self, host: str, user: str, password: str,
|
||
|
|
node: str = "pve", verify_ssl: bool = False) -> bool:
|
||
|
|
try:
|
||
|
|
self.proxmox = ProxmoxAPI(
|
||
|
|
host,
|
||
|
|
user=user,
|
||
|
|
password=password,
|
||
|
|
verify_ssl=verify_ssl
|
||
|
|
)
|
||
|
|
self.proxmox.version.get()
|
||
|
|
return True
|
||
|
|
except Exception as e:
|
||
|
|
print(f"连接失败: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
def is_connected(self) -> bool:
|
||
|
|
return self.proxmox is not None
|
||
|
|
|
||
|
|
def get_vms(self) -> List[Dict[str, Any]]:
|
||
|
|
if not self.proxmox:
|
||
|
|
return []
|
||
|
|
try:
|
||
|
|
resources = self.proxmox.cluster.resources.get()
|
||
|
|
vms = [r for r in resources if r['type'] == 'qemu']
|
||
|
|
return vms
|
||
|
|
except Exception as e:
|
||
|
|
print(f"获取虚拟机列表失败: {e}")
|
||
|
|
return []
|
||
|
|
|
||
|
|
def get_vm_stats(self, vms: List[Dict]) -> Dict[str, int]:
|
||
|
|
stats = {"total": len(vms), "running": 0, "stopped": 0, "paused": 0}
|
||
|
|
for vm in vms:
|
||
|
|
status = vm.get('status', '').lower()
|
||
|
|
if status == 'running':
|
||
|
|
stats['running'] += 1
|
||
|
|
elif status == 'stopped':
|
||
|
|
stats['stopped'] += 1
|
||
|
|
elif status == 'paused':
|
||
|
|
stats['paused'] += 1
|
||
|
|
return stats
|
||
|
|
|
||
|
|
def format_memory(self, bytes_value: int) -> str:
|
||
|
|
gb = bytes_value / (1024 ** 3)
|
||
|
|
return f"{gb:.1f} GB"
|
||
|
|
|
||
|
|
def format_uptime(self, seconds: int) -> str:
|
||
|
|
if seconds <= 0:
|
||
|
|
return "-"
|
||
|
|
days = seconds // 86400
|
||
|
|
hours = (seconds % 86400) // 3600
|
||
|
|
if days > 0:
|
||
|
|
return f"{days}d {hours}h"
|
||
|
|
return f"{hours}h"
|
||
|
|
|
||
|
|
def get_vm_info(self, vm: Dict) -> Dict[str, Any]:
|
||
|
|
status = vm.get('status', 'unknown').lower()
|
||
|
|
status_text = '运行中' if status == 'running' else '已停止' if status == 'stopped' else '已暂停' if status == 'paused' else status
|
||
|
|
|
||
|
|
maxcpu = vm.get('maxcpu', 0)
|
||
|
|
cpu = vm.get('cpu', 0)
|
||
|
|
cpu_percent = f"{int(cpu * 100)}%" if maxcpu > 0 else "0%"
|
||
|
|
|
||
|
|
mem = vm.get('mem', 0)
|
||
|
|
maxmem = vm.get('maxmem', 0)
|
||
|
|
memory_text = f"{self.format_memory(mem)}/{self.format_memory(maxmem)}" if maxmem > 0 else "-"
|
||
|
|
|
||
|
|
disk = vm.get('disk', 0)
|
||
|
|
maxdisk = vm.get('maxdisk', 0)
|
||
|
|
disk_text = f"{self.format_memory(disk)}/{self.format_memory(maxdisk)}" if maxdisk > 0 else "-"
|
||
|
|
|
||
|
|
uptime_seconds = vm.get('uptime', 0)
|
||
|
|
uptime_text = self.format_uptime(uptime_seconds)
|
||
|
|
|
||
|
|
return {
|
||
|
|
'vmid': vm.get('vmid', 0),
|
||
|
|
'name': vm.get('name', '无名称'),
|
||
|
|
'status': status,
|
||
|
|
'status_text': status_text,
|
||
|
|
'cpu_percent': cpu_percent,
|
||
|
|
'memory': memory_text,
|
||
|
|
'disk': disk_text,
|
||
|
|
'uptime': uptime_text
|
||
|
|
}
|
||
|
|
|
||
|
|
def start_vm(self, vmid: int, node: str) -> bool:
|
||
|
|
return self._vm_operation(vmid, node, 'start')
|
||
|
|
|
||
|
|
def shutdown_vm(self, vmid: int, node: str) -> bool:
|
||
|
|
return self._vm_operation(vmid, node, 'shutdown')
|
||
|
|
|
||
|
|
def reboot_vm(self, vmid: int, node: str) -> bool:
|
||
|
|
return self._vm_operation(vmid, node, 'reboot')
|
||
|
|
|
||
|
|
def stop_vm(self, vmid: int, node: str) -> bool:
|
||
|
|
return self._vm_operation(vmid, node, 'stop')
|
||
|
|
|
||
|
|
def suspend_vm(self, vmid: int, node: str) -> bool:
|
||
|
|
return self._vm_operation(vmid, node, 'suspend')
|
||
|
|
|
||
|
|
def _vm_operation(self, vmid: int, node: str, operation: str) -> bool:
|
||
|
|
if not self.proxmox:
|
||
|
|
return False
|
||
|
|
try:
|
||
|
|
endpoint = self.proxmox.nodes(node).qemu(vmid).status
|
||
|
|
if hasattr(endpoint, operation):
|
||
|
|
getattr(endpoint, operation).post()
|
||
|
|
return True
|
||
|
|
return False
|
||
|
|
except Exception as e:
|
||
|
|
print(f"操作失败 ({operation}): {e}")
|
||
|
|
return False
|