#!/usr/bin/env python3 """ Proxmox VM 任务控制器 小主机待机监听 → 接受命令 → 启动VM → 执行任务 → 汇报进展 → 关闭VM """ import os import sys import time import json import logging import subprocess import requests from pathlib import Path from typing import Dict, Any, Optional from dataclasses import dataclass # 配置 @dataclass class Config: proxmox_host: str proxmox_user: str proxmox_token: str vm_id: int ssh_user: str ssh_key_path: str command_dir: str = "/var/task-commands" status_dir: str = "/var/task-status" check_interval: int = 10 # 检查间隔(秒) class ProxmoxController: def __init__(self, config: Config): self.config = config self.base_url = f"https://{config.proxmox_host}:8006/api2/json" self.headers = { "Authorization": f"PVEAPIToken={config.proxmox_user}={config.token}" } logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) def vm_status(self) -> Dict[str, Any]: """获取VM状态""" url = f"{self.base_url}/nodes/proxmox/qemu/{self.config.vm_id}/status/current" try: resp = requests.get(url, headers=self.headers, verify=False, timeout=10) return resp.json() except Exception as e: self.logger.error(f"获取VM状态失败: {e}") return {} def start_vm(self) -> bool: """启动VM""" self.logger.info(f"正在启动VM {self.config.vm_id}") url = f"{self.base_url}/nodes/proxmox/qemu/{self.config.vm_id}/status/start" try: resp = requests.post(url, headers=self.headers, verify=False, timeout=30) if resp.status_code == 200: self.logger.info("VM启动命令已发送") return True except Exception as e: self.logger.error(f"启动VM失败: {e}") return False def stop_vm(self) -> bool: """关闭VM""" self.logger.info(f"正在关闭VM {self.config.vm_id}") url = f"{self.base_url}/nodes/proxmox/qemu/{self.config.vm_id}/status/stop" try: resp = requests.post(url, headers=self.headers, verify=False, timeout=30) if resp.status_code == 200: self.logger.info("VM关闭命令已发送") return True except Exception as e: self.logger.error(f"关闭VM失败: {e}") return False def ssh_execute(self, command: str) -> tuple[int, str, str]: """在VM内执行命令""" ssh_cmd = [ "ssh", "-i", self.config.ssh_key_path, "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=30", f"{self.config.ssh_user}@localhost", command ] try: result = subprocess.run(ssh_cmd, capture_output=True, text=True, timeout=120) return result.returncode, result.stdout, result.stderr except subprocess.TimeoutExpired: return -1, "", "命令执行超时" except Exception as e: return -1, "", str(e) def wait_for_vm_ready(self, timeout: int = 120) -> bool: """等待VM启动完成并可以SSH连接""" start_time = time.time() while time.time() - start_time < timeout: status = self.vm_status() if status.get("status") == "running": # 测试SSH连接 ret, _, _ = self.ssh_execute("echo 'VM ready'") if ret == 0: self.logger.info("VM已就绪") return True time.sleep(5) self.logger.error("VM启动超时") return False def process_command(self, cmd_file: Path) -> bool: """处理单个命令文件""" self.logger.info(f"处理命令文件: {cmd_file}") # 读取命令 try: with open(cmd_file, 'r') as f: task_data = json.load(f) except Exception as e: self.logger.error(f"读取命令文件失败: {e}") return False task_id = task_data.get("task_id", "unknown") command = task_data.get("command", "") # 更新状态 self.update_status(task_id, "starting", "准备启动VM") # 启动VM if not self.start_vm(): self.update_status(task_id, "failed", "启动VM失败") return False # 等待VM就绪 if not self.wait_for_vm_ready(): self.update_status(task_id, "failed", "VM启动超时") self.stop_vm() return False self.update_status(task_id, "running", "VM已启动,开始执行任务") # 执行命令 ret, stdout, stderr = self.ssh_execute(command) if ret == 0: self.update_status(task_id, "success", "任务执行成功", stdout) else: self.update_status(task_id, "failed", "任务执行失败", stderr) # 关闭VM self.stop_vm() # 移动命令文件到完成目录 done_dir = cmd_file.parent / "done" done_dir.mkdir(exist_ok=True) cmd_file.rename(done_dir / cmd_file.name) return ret == 0 def update_status(self, task_id: str, status: str, message: str, output: str = ""): """更新任务状态""" status_file = Path(self.config.status_dir) / f"{task_id}.json" status_file.parent.mkdir(exist_ok=True) status_data = { "task_id": task_id, "status": status, "message": message, "output": output, "timestamp": time.time() } with open(status_file, 'w') as f: json.dump(status_data, f, indent=2) def run(self): """主运行循环""" self.logger.info("Proxmox任务控制器启动") # 创建必要目录 Path(self.config.command_dir).mkdir(exist_ok=True) Path(self.config.status_dir).mkdir(exist_ok=True) while True: try: # 检查新命令文件 cmd_files = sorted(Path(self.config.command_dir).glob("*.json")) for cmd_file in cmd_files: self.process_command(cmd_file) # 等待下次检查 time.sleep(self.config.check_interval) except KeyboardInterrupt: self.logger.info("收到终止信号,退出") break except Exception as e: self.logger.error(f"主循环错误: {e}") time.sleep(self.config.check_interval) def main(): # 从环境变量加载配置 config = Config( proxmox_host=os.getenv("PROXMOX_HOST", "localhost"), proxmox_user=os.getenv("PROXMOX_USER", "root@pam"), proxmox_token=os.getenv("PROXMOX_TOKEN", ""), vm_id=int(os.getenv("VM_ID", "100")), ssh_user=os.getenv("SSH_USER", "ubuntu"), ssh_key_path=os.getenv("SSH_KEY_PATH", "~/.ssh/id_rsa"), ) if not config.proxmox_token: print("错误: 请设置 PROXMOX_TOKEN 环境变量") sys.exit(1) controller = ProxmoxController(config) controller.run() if __name__ == "__main__": main()