From d48dc740d6aabd239a9056508b9e29231c424ce4 Mon Sep 17 00:00:00 2001 From: xiaji Date: Mon, 30 Mar 2026 13:39:17 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0Proxmox=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E6=8E=A7=E5=88=B6=E8=84=9A=E6=9C=AC=EF=BC=9A=E7=9B=91=E6=8E=A7?= =?UTF-8?q?=E5=91=BD=E4=BB=A4=E2=86=92=E5=90=AF=E5=8A=A8VM=E2=86=92?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E4=BB=BB=E5=8A=A1=E2=86=92=E6=B1=87=E6=8A=A5?= =?UTF-8?q?=E2=86=92=E5=85=B3=E9=97=ADVM?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- proxmox_task/README.md | 64 +++++++++ proxmox_task/controller.py | 254 ++++++++++++++++++++++++++++++++++ proxmox_task/requirements.txt | 1 + 3 files changed, 319 insertions(+) create mode 100644 proxmox_task/README.md create mode 100644 proxmox_task/controller.py create mode 100644 proxmox_task/requirements.txt diff --git a/proxmox_task/README.md b/proxmox_task/README.md new file mode 100644 index 0000000..b41bca8 --- /dev/null +++ b/proxmox_task/README.md @@ -0,0 +1,64 @@ +# Proxmox任务控制器 + +一个用于监控命令文件、启动Proxmox虚拟机、执行任务、汇报结果并关闭虚拟机的Python脚本。 + +## 功能特性 + +- 监控指定目录中的JSON命令文件 +- 通过Proxmox API启动和关闭虚拟机 +- 在虚拟机内执行指定命令(支持QEMU Guest Agent) +- 汇报执行结果到结果目录 +- 支持凭证传递(用于需要认证的命令) +- 日志记录到文件和控制台 + +## 使用方法 + +### 1. 配置环境变量 + +```bash +export PROXMOX_HOST="https://your-proxmox-host:8006" +export PROXMOX_TOKEN_ID="your-token-id@pam" +export PROXMOX_TOKEN_SECRET="your-token-secret" +export PROXMOX_VERIFY_SSL="false" # 如果使用自签名证书 +``` + +### 2. 准备命令文件 + +在 `/var/task-commands/` 目录中放置JSON格式的命令文件: + +```json +{ + "node": "pve-node-1", + "vmid": 100, + "command": "ls -la /tmp && echo 'Hello World'", + "task_id": "my-task-001", + "username": "root", + "password": "your-vm-password", + "keep_running": false +} +``` + +### 3. 运行控制器 + +```bash +python3 controller.py +``` + +### 4. 查看结果 + +任务执行结果将保存为JSON文件在 `/var/task-results/` 目录中。 + +## 部署建议 + +1. 将脚本部署到小主机上 +2. 确保小主机能够访问Proxmox服务器的API端口(默认8006) +3. 为小主机创建系统服务以实现开机自启动 +4. 定期清理旧的结果文件 +5. 考虑使用更安全的凭证管理方式(如HashiCorp Vault或AWS Secrets Manager) + +## 注意事项 + +- 确保目标虚拟机已安装并运行QEMU Guest Agent(为了使用agent/exec API) +- 或者修改脚本使用SSH方式在VM内执行命令 +- 生产环境中应使用更安全的方式处理凭证 +- 建议在测试环境中先验证功能 \ No newline at end of file diff --git a/proxmox_task/controller.py b/proxmox_task/controller.py new file mode 100644 index 0000000..3fa53cf --- /dev/null +++ b/proxmox_task/controller.py @@ -0,0 +1,254 @@ +#!/usr/bin/env python3 +""" +Proxmox虚拟机任务控制器 +监控命令文件,启动VM,执行任务,汇报结果,关闭VM +""" + +import os +import time +import json +import logging +import requests +import subprocess +import threading +from datetime import datetime +from pathlib import Path + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('/var/log/proxmox_task.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + +class ProxmoxTaskController: + def __init__(self): + # 从环境变量读取配置(实际部署中应从安全配置文件或密钥管理系统读取) + self.proxmox_host = os.getenv('PROXMOX_HOST', 'https://proxmox.example.com:8006') + self.proxmox_token_id = os.getenv('PROXMOX_TOKEN_ID', 'api-token@pam') + self.proxmox_token_secret = os.getenv('PROXMOX_TOKEN_SECRET', '') + self.verify_ssl = os.getenv('PROXMOX_VERIFY_SSL', 'false').lower() == 'true' + + # 命令监控目录 + self.command_dir = Path('/var/task-commands') + self.command_dir.mkdir(exist_ok=True) + + # 结果目录 + self.result_dir = Path('/var/task-results') + self.result_dir.mkdir(exist_ok=True) + + # 会话对象 + self.session = requests.Session() + self.session.verify = self.verify_ssl + + # 设置API认证头 + if self.proxmox_token_id and self.proxmox_token_secret: + self.session.headers.update({ + 'Authorization': f'PVEAPIToken={self.proxmox_token_id}={self.proxmox_token_secret}' + }) + + logger.info("Proxmox任务控制器初始化完成") + + def get_vm_status(self, node, vmid): + """获取VM状态""" + try: + url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/status/current" + response = self.session.get(url) + response.raise_for_status() + return response.json()['data']['status'] + except Exception as e: + logger.error(f"获取VM状态失败: {e}") + return None + + def start_vm(self, node, vmid): + """启动VM""" + try: + url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/status/start" + response = self.session.post(url) + response.raise_for_status() + logger.info(f"VM {vmid} 启动命令已发送") + return True + except Exception as e: + logger.error(f"启动VM失败: {e}") + return False + + def stop_vm(self, node, vmid): + """关闭VM""" + try: + url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/status/shutdown" + response = self.session.post(url) + response.raise_for_status() + logger.info(f"VM {vmid} 关闭命令已发送") + return True + except Exception as e: + logger.error(f"关闭VM失败: {e}") + return False + + def execute_vm_command(self, node, vmid, command, username=None, password=None): + """在VM内执行命令(使用QEMU Guest Agent或SSH)""" + # 这里简化处理,实际应根据环境选择QGA或SSH + try: + # 使用QEMU Guest Agent执行命令 + url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/agent/exec" + payload = { + 'command': command, + 'timeout': 300 # 5分钟超时 + } + if username and password: + payload['username'] = username + payload['password'] = password + + response = self.session.post(url, json=payload) + response.raise_for_status() + result = response.json() + pid = result['data']['pid'] + + # 等待命令完成并获取输出 + time.sleep(2) # 简单等待,实际应轮询直到完成 + + # 获取执行结果 + out_url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/agent/exec-status" + out_payload = {'pid': pid} + out_response = self.session.post(out_url, json=out_payload) + out_response.raise_for_status() + out_data = out_response.json()['data'] + + return { + 'exitcode': out_data.get('exitcode', -1), + 'out': out_data.get('out', ''), + 'err': out_data.get('err', '') + } + except Exception as e: + logger.error(f"在VM内执行命令失败: {e}") + return {'exitcode': -1, 'out': '', 'err': str(e)} + + def process_command_file(self, command_file): + """处理命令文件""" + try: + with open(command_file, 'r', encoding='utf-8') as f: + command_data = json.load(f) + + logger.info(f"处理命令文件: {command_file}") + + # 提取命令参数 + node = command_data.get('node') + vmid = command_data.get('vmid') + task_command = command_data.get('command') + task_id = command_data.get('task_id', f"task_{int(time.time())}") + username = command_data.get('username') + password = command_data.get('password') + + if not all([node, vmid, task_command]): + logger.error("命令文件缺少必要参数") + return False + + # 1. 检查VM状态 + status = self.get_vm_status(node, vmid) + logger.info(f"VM {vmid} 当前状态: {status}") + + # 2. 如果VM未运行,则启动它 + if status != 'running': + logger.info(f"启动VM {vmid}") + if not self.start_vm(node, vmid): + logger.error(f"启动VM {vmid} 失败") + return False + + # 等待VM完全启动 + logger.info("等待VM启动...") + for _ in range(30): # 最多等待30秒 + time.sleep(1) + status = self.get_vm_status(node, vmid) + if status == 'running': + break + else: + logger.error("VM启动超时") + return False + + # 3. 执行任务命令 + logger.info(f"在VM {vmid} 中执行任务: {task_command}") + result = self.execute_vm_command(node, vmid, task_command, username, password) + + # 4. 汇报结果 + result_file = self.result_dir / f"{task_id}_result.json" + result_data = { + 'task_id': task_id, + 'timestamp': datetime.now().isoformat(), + 'node': node, + 'vmid': vmid, + 'command': task_command, + 'exitcode': result['exitcode'], + 'output': result['out'], + 'error': result['err'], + 'success': result['exitcode'] == 0 + } + + with open(result_file, 'w', encoding='utf-8') as f: + json.dump(result_data, f, indent=2, ensure_ascii=False) + + logger.info(f"任务结果已保存到: {result_file}") + + # 5. 关闭VM(除非配置为保持运行) + if not command_data.get('keep_running', False): + logger.info(f"关闭VM {vmid}") + self.stop_vm(node, vmid) + else: + logger.info(f"VM {vmid} 保持运行状态") + + # 6. 移除命令文件(避免重复处理) + command_file.unlink() + logger.info(f"命令文件已处理并删除: {command_file}") + + return True + + except Exception as e: + logger.error(f"处理命令文件时发生错误: {e}") + return False + + def monitor_commands(self): + """监控命令目录""" + logger.info(f"开始监控命令目录: {self.command_dir}") + + while True: + try: + # 查找新的命令文件 + command_files = list(self.command_dir.glob("*.json")) + + for command_file in command_files: + # 简单的文件锁机制:尝试重命名 + try: + locked_file = command_file.with_suffix('.json.lock') + command_file.rename(locked_file) + self.process_command_file(locked_file) + locked_file.unlink() # 处理完成后删除锁文件 + except FileNotFoundError: + # 文件可能已被其他进程处理 + pass + except Exception as e: + logger.error(f"处理命令文件 {command_file} 时出错: {e}") + + # 休眠一段时间再检查 + time.sleep(5) + + except KeyboardInterrupt: + logger.info("收到中断信号,停止监控") + break + except Exception as e: + logger.error(f"监控循环发生错误: {e}") + time.sleep(10) # 出错后稍长时间再试 + +def main(): + """主函数""" + controller = ProxmoxTaskController() + try: + controller.monitor_commands() + except Exception as e: + logger.error(f"控制器运行失败: {e}") + return 1 + return 0 + +if __name__ == "__main__": + exit(main()) \ No newline at end of file diff --git a/proxmox_task/requirements.txt b/proxmox_task/requirements.txt new file mode 100644 index 0000000..9e4b84c --- /dev/null +++ b/proxmox_task/requirements.txt @@ -0,0 +1 @@ +requests>=2.25.1 \ No newline at end of file