#!/usr/bin/env python3 """ Proxmox 虚拟机任务控制器 小主机待机监听 -> 接收命令 -> 启动VM -> 执行任务 -> 汇报进展 -> 关闭VM """ import os import sys import time import json import logging import subprocess from pathlib import Path from typing import Dict, Any, Optional from dataclasses import dataclass import requests import paramiko from dotenv import load_dotenv # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('/var/log/proxmox_task_controller.log'), logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) @dataclass class Config: """配置类""" proxmox_host: str proxmox_api_token: str vm_id: int vm_ssh_host: str vm_ssh_port: int vm_ssh_user: str vm_ssh_key_path: str command_dir: str status_report_url: Optional[str] = None class ConfigLoader: """配置加载器""" @staticmethod def load() -> Config: load_dotenv() return Config( proxmox_host=os.getenv('PROXMOX_HOST', 'https://localhost:8006'), proxmox_api_token=os.getenv('PROXMOX_API_TOKEN', ''), vm_id=int(os.getenv('VM_ID', '100')), vm_ssh_host=os.getenv('VM_SSH_HOST', 'localhost'), vm_ssh_port=int(os.getenv('VM_SSH_PORT', '22')), vm_ssh_user=os.getenv('VM_SSH_USER', 'root'), vm_ssh_key_path=os.getenv('VM_SSH_KEY_PATH', '~/.ssh/id_rsa'), command_dir=os.getenv('COMMAND_DIR', '/var/task-commands'), status_report_url=os.getenv('STATUS_REPORT_URL') ) class ProxmoxAPI: """Proxmox API 客户端""" def __init__(self, config: Config): self.config = config self.base_url = f"{config.proxmox_host}/api2/json" self.headers = { 'Authorization': f'PVEAPIToken={config.proxmox_api_token}' } def start_vm(self) -> bool: """启动虚拟机""" try: url = f"{self.base_url}/nodes/pve/qemu/{self.config.vm_id}/status/start" response = requests.post(url, headers=self.headers, verify=False, timeout=30) if response.status_code == 200: logger.info(f"✅ VM {self.config.vm_id} 启动命令已发送") return True else: logger.error(f"❌ 启动VM失败: {response.status_code} - {response.text}") return False except Exception as e: logger.error(f"❌ 启动VM异常: {e}") return False def shutdown_vm(self) -> bool: """关闭虚拟机""" try: url = f"{self.base_url}/nodes/pve/qemu/{self.config.vm_id}/status/shutdown" response = requests.post(url, headers=self.headers, verify=False, timeout=60) if response.status_code == 200: logger.info(f"✅ VM {self.config.vm_id} 关闭命令已发送") return True else: logger.error(f"❌ 关闭VM失败: {response.status_code} - {response.text}") return False except Exception as e: logger.error(f"❌ 关闭VM异常: {e}") return False def get_vm_status(self) -> Dict[str, Any]: """获取虚拟机状态""" try: url = f"{self.base_url}/nodes/pve/qemu/{self.config.vm_id}/status/current" response = requests.get(url, headers=self.headers, verify=False, timeout=10) if response.status_code == 200: return response.json().get('data', {}) return {} except Exception as e: logger.error(f"❌ 获取VM状态异常: {e}") return {} class SSHExecutor: """SSH 执行器""" def __init__(self, config: Config): self.config = config def execute(self, command: str, timeout: int = 300) -> tuple: """在VM中执行命令""" try: key_path = os.path.expanduser(self.config.vm_ssh_key_path) if not os.path.exists(key_path): logger.error(f"❌ SSH密钥不存在: {key_path}") return False, f"SSH key not found: {key_path}" client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect( hostname=self.config.vm_ssh_host, port=self.config.vm_ssh_port, username=self.config.vm_ssh_user, key_filename=key_path, timeout=30 ) logger.info(f"🔌 SSH连接到VM成功") stdin, stdout, stderr = client.exec_command(command, timeout=timeout) exit_code = stdout.channel.recv_exit_status() output = stdout.read().decode('utf-8') error = stderr.read().decode('utf-8') client.close() if exit_code == 0: logger.info(f"✅ 命令执行成功: {command[:50]}...") return True, output else: logger.error(f"❌ 命令执行失败: {command[:50]}... (exit code: {exit_code})") return False, error except Exception as e: logger.error(f"❌ SSH执行异常: {e}") return False, str(e) class CommandProcessor: """命令处理器""" def __init__(self, config: Config): self.config = config self.proxmox = ProxmoxAPI(config) self.ssh = SSHExecutor(config) self.processed_commands = set() def load_processed_commands(self): """加载已处理的命令记录""" record_file = Path(self.config.command_dir) / '.processed_commands' if record_file.exists(): with open(record_file, 'r') as f: self.processed_commands = set(json.load(f)) def save_processed_commands(self): """保存已处理的命令记录""" record_file = Path(self.config.command_dir) / '.processed_commands' with open(record_file, 'w') as f: json.dump(list(self.processed_commands), f) def report_status(self, status: str, details: str = ""): """汇报状态""" if self.config.status_report_url: try: payload = { 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), 'status': status, 'details': details } requests.post(self.config.status_report_url, json=payload, timeout=10) except Exception as e: logger.error(f"❌ 状态汇报失败: {e}") logger.info(f"📊 状态更新: {status} - {details}") def wait_for_vm_ready(self, max_wait: int = 120) -> bool: """等待VM就绪""" logger.info(f"⏳ 等待VM {self.config.vm_id} 就绪...") start_time = time.time() while time.time() - start_time < max_wait: status = self.proxmox.get_vm_status() if status.get('status') == 'running': # 等待SSH可用 time.sleep(10) try: self.ssh.execute('echo "VM ready"', timeout=10) logger.info("✅ VM已就绪") return True except: logger.info("⏳ VM运行中,等待SSH...") time.sleep(5) logger.error("❌ 等待VM就绪超时") return False def process_command(self, command_file: Path): """处理单个命令文件""" if command_file.name in self.processed_commands: return logger.info(f"📋 处理命令文件: {command_file.name}") try: with open(command_file, 'r') as f: command_data = json.load(f) task_id = command_data.get('task_id', 'unknown') command = command_data.get('command', '') self.report_status('starting_vm', f'Task {task_id}') # 1. 启动VM if not self.proxmox.start_vm(): self.report_status('vm_start_failed', f'Task {task_id}') return # 2. 等待VM就绪 if not self.wait_for_vm_ready(): self.report_status('vm_ready_timeout', f'Task {task_id}') self.proxmox.shutdown_vm() return self.report_status('executing_task', f'Task {task_id}') # 3. 执行命令 success, output = self.ssh.execute(command, timeout=300) if success: self.report_status('task_completed', f'Task {task_id}') logger.info(f"✅ 任务完成: {task_id}") else: self.report_status('task_failed', f'Task {task_id}: {output}') logger.error(f"❌ 任务失败: {task_id}") # 4. 关闭VM self.report_status('shutting_down_vm', f'Task {task_id}') self.proxmox.shutdown_vm() # 5. 移动命令文件到完成目录 done_dir = Path(self.config.command_dir) / 'done' done_dir.mkdir(exist_ok=True) command_file.rename(done_dir / command_file.name) # 6. 记录已处理 self.processed_commands.add(command_file.name) self.save_processed_commands() except Exception as e: logger.error(f"❌ 处理命令异常: {e}") self.report_status('command_error', str(e)) class TaskController: """主任务控制器""" def __init__(self): self.config = ConfigLoader.load() self.processor = CommandProcessor(self.config) self.running = True def setup(self): """初始化设置""" logger.info("🚀 Proxmox任务控制器启动") logger.info(f"📁 监控命令目录: {self.config.command_dir}") # 创建命令目录 Path(self.config.command_dir).mkdir(parents=True, exist_ok=True) # 加载已处理命令记录 self.processor.load_processed_commands() # 检查Proxmox连接 try: status = self.processor.proxmox.get_vm_status() if status: logger.info(f"✅ Proxmox连接正常,VM ID: {self.config.vm_id}") else: logger.error("❌ Proxmox连接失败") except Exception as e: logger.error(f"❌ Proxmox连接异常: {e}") def run(self): """主运行循环""" self.setup() logger.info("⏳ 进入监听模式,等待命令...") while self.running: try: command_dir = Path(self.config.command_dir) command_files = list(command_dir.glob('*.json')) for cmd_file in command_files: if cmd_file.name not in self.processor.processed_commands: self.processor.process_command(cmd_file) time.sleep(5) except KeyboardInterrupt: logger.info("🛑 收到停止信号") self.running = False except Exception as e: logger.error(f"❌ 主循环异常: {e}") time.sleep(10) logger.info("👋 Proxmox任务控制器停止") def main(): """主函数""" controller = TaskController() controller.run() if __name__ == '__main__': main()