初始化项目:添加Proxmox GUI管理平台
This commit is contained in:
142
vm_manager.py
Normal file
142
vm_manager.py
Normal file
@@ -0,0 +1,142 @@
|
||||
from proxmoxer import ProxmoxAPI
|
||||
from typing import List, Dict, Any, Optional
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
CONFIG_FILE = "config.json"
|
||||
|
||||
class VMManager:
|
||||
def __init__(self):
|
||||
self.proxmox: Optional[ProxmoxAPI] = None
|
||||
self.config: Dict[str, Any] = {}
|
||||
self.load_config()
|
||||
|
||||
def load_config(self) -> Dict[str, Any]:
|
||||
if os.path.exists(CONFIG_FILE):
|
||||
try:
|
||||
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
|
||||
self.config = json.load(f)
|
||||
except Exception:
|
||||
self.config = {}
|
||||
return self.config
|
||||
|
||||
def save_config(self, config: Dict[str, Any]) -> None:
|
||||
self.config = config
|
||||
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
|
||||
json.dump(config, f, indent=4, ensure_ascii=False)
|
||||
|
||||
def get_config(self) -> Dict[str, Any]:
|
||||
return self.config
|
||||
|
||||
def connect(self, host: str, user: str, password: str,
|
||||
node: str = "pve", verify_ssl: bool = False) -> bool:
|
||||
try:
|
||||
self.proxmox = ProxmoxAPI(
|
||||
host,
|
||||
user=user,
|
||||
password=password,
|
||||
verify_ssl=verify_ssl
|
||||
)
|
||||
self.proxmox.version.get()
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"连接失败: {e}")
|
||||
return False
|
||||
|
||||
def is_connected(self) -> bool:
|
||||
return self.proxmox is not None
|
||||
|
||||
def get_vms(self) -> List[Dict[str, Any]]:
|
||||
if not self.proxmox:
|
||||
return []
|
||||
try:
|
||||
resources = self.proxmox.cluster.resources.get()
|
||||
vms = [r for r in resources if r['type'] == 'qemu']
|
||||
return vms
|
||||
except Exception as e:
|
||||
print(f"获取虚拟机列表失败: {e}")
|
||||
return []
|
||||
|
||||
def get_vm_stats(self, vms: List[Dict]) -> Dict[str, int]:
|
||||
stats = {"total": len(vms), "running": 0, "stopped": 0, "paused": 0}
|
||||
for vm in vms:
|
||||
status = vm.get('status', '').lower()
|
||||
if status == 'running':
|
||||
stats['running'] += 1
|
||||
elif status == 'stopped':
|
||||
stats['stopped'] += 1
|
||||
elif status == 'paused':
|
||||
stats['paused'] += 1
|
||||
return stats
|
||||
|
||||
def format_memory(self, bytes_value: int) -> str:
|
||||
gb = bytes_value / (1024 ** 3)
|
||||
return f"{gb:.1f} GB"
|
||||
|
||||
def format_uptime(self, seconds: int) -> str:
|
||||
if seconds <= 0:
|
||||
return "-"
|
||||
days = seconds // 86400
|
||||
hours = (seconds % 86400) // 3600
|
||||
if days > 0:
|
||||
return f"{days}d {hours}h"
|
||||
return f"{hours}h"
|
||||
|
||||
def get_vm_info(self, vm: Dict) -> Dict[str, Any]:
|
||||
status = vm.get('status', 'unknown').lower()
|
||||
status_text = '运行中' if status == 'running' else '已停止' if status == 'stopped' else '已暂停' if status == 'paused' else status
|
||||
|
||||
maxcpu = vm.get('maxcpu', 0)
|
||||
cpu = vm.get('cpu', 0)
|
||||
cpu_percent = f"{int(cpu * 100)}%" if maxcpu > 0 else "0%"
|
||||
|
||||
mem = vm.get('mem', 0)
|
||||
maxmem = vm.get('maxmem', 0)
|
||||
memory_text = f"{self.format_memory(mem)}/{self.format_memory(maxmem)}" if maxmem > 0 else "-"
|
||||
|
||||
disk = vm.get('disk', 0)
|
||||
maxdisk = vm.get('maxdisk', 0)
|
||||
disk_text = f"{self.format_memory(disk)}/{self.format_memory(maxdisk)}" if maxdisk > 0 else "-"
|
||||
|
||||
uptime_seconds = vm.get('uptime', 0)
|
||||
uptime_text = self.format_uptime(uptime_seconds)
|
||||
|
||||
return {
|
||||
'vmid': vm.get('vmid', 0),
|
||||
'name': vm.get('name', '无名称'),
|
||||
'status': status,
|
||||
'status_text': status_text,
|
||||
'cpu_percent': cpu_percent,
|
||||
'memory': memory_text,
|
||||
'disk': disk_text,
|
||||
'uptime': uptime_text
|
||||
}
|
||||
|
||||
def start_vm(self, vmid: int, node: str) -> bool:
|
||||
return self._vm_operation(vmid, node, 'start')
|
||||
|
||||
def shutdown_vm(self, vmid: int, node: str) -> bool:
|
||||
return self._vm_operation(vmid, node, 'shutdown')
|
||||
|
||||
def reboot_vm(self, vmid: int, node: str) -> bool:
|
||||
return self._vm_operation(vmid, node, 'reboot')
|
||||
|
||||
def stop_vm(self, vmid: int, node: str) -> bool:
|
||||
return self._vm_operation(vmid, node, 'stop')
|
||||
|
||||
def suspend_vm(self, vmid: int, node: str) -> bool:
|
||||
return self._vm_operation(vmid, node, 'suspend')
|
||||
|
||||
def _vm_operation(self, vmid: int, node: str, operation: str) -> bool:
|
||||
if not self.proxmox:
|
||||
return False
|
||||
try:
|
||||
endpoint = self.proxmox.nodes(node).qemu(vmid).status
|
||||
if hasattr(endpoint, operation):
|
||||
getattr(endpoint, operation).post()
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"操作失败 ({operation}): {e}")
|
||||
return False
|
||||
Reference in New Issue
Block a user