#!/usr/bin/env python3 """ Proxmox虚拟机任务控制器 监控命令文件,启动VM,执行任务,汇报结果,关闭VM """ import os import time import json import logging import requests import subprocess import threading from datetime import datetime from pathlib import Path # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('/var/log/proxmox_task.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class ProxmoxTaskController: def __init__(self): # 从环境变量读取配置(实际部署中应从安全配置文件或密钥管理系统读取) self.proxmox_host = os.getenv('PROXMOX_HOST', 'https://proxmox.example.com:8006') self.proxmox_token_id = os.getenv('PROXMOX_TOKEN_ID', 'api-token@pam') self.proxmox_token_secret = os.getenv('PROXMOX_TOKEN_SECRET', '') self.verify_ssl = os.getenv('PROXMOX_VERIFY_SSL', 'false').lower() == 'true' # 命令监控目录 self.command_dir = Path('/var/task-commands') self.command_dir.mkdir(exist_ok=True) # 结果目录 self.result_dir = Path('/var/task-results') self.result_dir.mkdir(exist_ok=True) # 会话对象 self.session = requests.Session() self.session.verify = self.verify_ssl # 设置API认证头 if self.proxmox_token_id and self.proxmox_token_secret: self.session.headers.update({ 'Authorization': f'PVEAPIToken={self.proxmox_token_id}={self.proxmox_token_secret}' }) logger.info("Proxmox任务控制器初始化完成") def get_vm_status(self, node, vmid): """获取VM状态""" try: url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/status/current" response = self.session.get(url) response.raise_for_status() return response.json()['data']['status'] except Exception as e: logger.error(f"获取VM状态失败: {e}") return None def start_vm(self, node, vmid): """启动VM""" try: url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/status/start" response = self.session.post(url) response.raise_for_status() logger.info(f"VM {vmid} 启动命令已发送") return True except Exception as e: logger.error(f"启动VM失败: {e}") return False def stop_vm(self, node, vmid): """关闭VM""" try: url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/status/shutdown" response = self.session.post(url) response.raise_for_status() logger.info(f"VM {vmid} 关闭命令已发送") return True except Exception as e: logger.error(f"关闭VM失败: {e}") return False def execute_vm_command(self, node, vmid, command, username=None, password=None): """在VM内执行命令(使用QEMU Guest Agent或SSH)""" # 这里简化处理,实际应根据环境选择QGA或SSH try: # 使用QEMU Guest Agent执行命令 url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/agent/exec" payload = { 'command': command, 'timeout': 300 # 5分钟超时 } if username and password: payload['username'] = username payload['password'] = password response = self.session.post(url, json=payload) response.raise_for_status() result = response.json() pid = result['data']['pid'] # 等待命令完成并获取输出 time.sleep(2) # 简单等待,实际应轮询直到完成 # 获取执行结果 out_url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/agent/exec-status" out_payload = {'pid': pid} out_response = self.session.post(out_url, json=out_payload) out_response.raise_for_status() out_data = out_response.json()['data'] return { 'exitcode': out_data.get('exitcode', -1), 'out': out_data.get('out', ''), 'err': out_data.get('err', '') } except Exception as e: logger.error(f"在VM内执行命令失败: {e}") return {'exitcode': -1, 'out': '', 'err': str(e)} def process_command_file(self, command_file): """处理命令文件""" try: with open(command_file, 'r', encoding='utf-8') as f: command_data = json.load(f) logger.info(f"处理命令文件: {command_file}") # 提取命令参数 node = command_data.get('node') vmid = command_data.get('vmid') task_command = command_data.get('command') task_id = command_data.get('task_id', f"task_{int(time.time())}") username = command_data.get('username') password = command_data.get('password') if not all([node, vmid, task_command]): logger.error("命令文件缺少必要参数") return False # 1. 检查VM状态 status = self.get_vm_status(node, vmid) logger.info(f"VM {vmid} 当前状态: {status}") # 2. 如果VM未运行,则启动它 if status != 'running': logger.info(f"启动VM {vmid}") if not self.start_vm(node, vmid): logger.error(f"启动VM {vmid} 失败") return False # 等待VM完全启动 logger.info("等待VM启动...") for _ in range(30): # 最多等待30秒 time.sleep(1) status = self.get_vm_status(node, vmid) if status == 'running': break else: logger.error("VM启动超时") return False # 3. 执行任务命令 logger.info(f"在VM {vmid} 中执行任务: {task_command}") result = self.execute_vm_command(node, vmid, task_command, username, password) # 4. 汇报结果 result_file = self.result_dir / f"{task_id}_result.json" result_data = { 'task_id': task_id, 'timestamp': datetime.now().isoformat(), 'node': node, 'vmid': vmid, 'command': task_command, 'exitcode': result['exitcode'], 'output': result['out'], 'error': result['err'], 'success': result['exitcode'] == 0 } with open(result_file, 'w', encoding='utf-8') as f: json.dump(result_data, f, indent=2, ensure_ascii=False) logger.info(f"任务结果已保存到: {result_file}") # 5. 关闭VM(除非配置为保持运行) if not command_data.get('keep_running', False): logger.info(f"关闭VM {vmid}") self.stop_vm(node, vmid) else: logger.info(f"VM {vmid} 保持运行状态") # 6. 移除命令文件(避免重复处理) command_file.unlink() logger.info(f"命令文件已处理并删除: {command_file}") return True except Exception as e: logger.error(f"处理命令文件时发生错误: {e}") return False def monitor_commands(self): """监控命令目录""" logger.info(f"开始监控命令目录: {self.command_dir}") while True: try: # 查找新的命令文件 command_files = list(self.command_dir.glob("*.json")) for command_file in command_files: # 简单的文件锁机制:尝试重命名 try: locked_file = command_file.with_suffix('.json.lock') command_file.rename(locked_file) self.process_command_file(locked_file) locked_file.unlink() # 处理完成后删除锁文件 except FileNotFoundError: # 文件可能已被其他进程处理 pass except Exception as e: logger.error(f"处理命令文件 {command_file} 时出错: {e}") # 休眠一段时间再检查 time.sleep(5) except KeyboardInterrupt: logger.info("收到中断信号,停止监控") break except Exception as e: logger.error(f"监控循环发生错误: {e}") time.sleep(10) # 出错后稍长时间再试 def main(): """主函数""" controller = ProxmoxTaskController() try: controller.monitor_commands() except Exception as e: logger.error(f"控制器运行失败: {e}") return 1 return 0 if __name__ == "__main__": exit(main())