From 9082cc5e657611820d0338e79762b2dffed27942 Mon Sep 17 00:00:00 2001 From: xiaji Date: Mon, 30 Mar 2026 14:17:50 +0800 Subject: [PATCH] Add Proxmox task controller script and configuration files --- .env.template | 16 ++ README.md | 105 ++++++++++++ proxmox-task.service | 19 +++ proxmox_task_controller.py | 329 +++++++++++++++++++++++++++++++++++++ 4 files changed, 469 insertions(+) create mode 100644 .env.template create mode 100644 proxmox-task.service create mode 100644 proxmox_task_controller.py diff --git a/.env.template b/.env.template new file mode 100644 index 0000000..490ce3b --- /dev/null +++ b/.env.template @@ -0,0 +1,16 @@ +# Proxmox 配置 +PROXMOX_HOST=https://your-proxmox-host:8006 +PROXMOX_API_TOKEN=your-user@pve!your-token + +# 虚拟机配置 +VM_ID=100 +VM_SSH_HOST=your-vm-ip +VM_SSH_PORT=22 +VM_SSH_USER=root +VM_SSH_KEY_PATH=~/.ssh/id_rsa + +# 命令监听配置 +COMMAND_DIR=/var/task-commands + +# 状态汇报URL(可选) +STATUS_REPORT_URL=http://your-report-server/status \ No newline at end of file diff --git a/README.md b/README.md index e69de29..6bf8a05 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,105 @@ +# Proxmox 虚拟机任务控制器 + +一个Python脚本系统,用于自动化控制Proxmox虚拟机执行任务。小主机平时待机监听,接收命令后启动VM执行任务,完成后自动关闭VM。 + +## 功能特性 + +- 📡 **命令监听**:监控指定目录的JSON命令文件 +- 🖥️ **VM管理**:通过Proxmox API启动/关闭虚拟机 +- 🔧 **任务执行**:通过SSH在VM内执行自定义命令 +- 📊 **状态汇报**:实时汇报任务执行状态 +- 🔒 **安全认证**:支持API Token和SSH密钥认证 +- 📝 **日志记录**:完整的操作日志和错误追踪 + +## 快速开始 + +### 1. 安装依赖 + +```bash +pip install requests paramiko python-dotenv +``` + +### 2. 配置环境 + +复制环境模板并填写配置: + +```bash +cp .env.template .env +# 编辑 .env 文件,填写Proxmox和VM信息 +``` + +配置说明: +- `PROXMOX_HOST`: Proxmox服务器地址 +- `PROXMOX_API_TOKEN`: Proxmox API Token +- `VM_ID`: 目标虚拟机ID +- `VM_SSH_*`: 虚拟机SSH连接信息 +- `COMMAND_DIR`: 命令监听目录 + +### 3. 运行脚本 + +```bash +python3 proxmox_task_controller.py +``` + +### 4. 作为系统服务(推荐) + +```bash +# 复制文件到系统目录 +sudo cp proxmox_task_controller.py /opt/proxmox-task/ +sudo cp .env /opt/proxmox-task/ +sudo cp proxmox-task.service /etc/systemd/system/ + +# 启用并启动服务 +sudo systemctl enable proxmox-task +sudo systemctl start proxmox-task +``` + +## 使用示例 + +### 创建任务命令 + +在 `COMMAND_DIR` 目录(默认 `/var/task-commands`)创建JSON文件: + +```json +{ + "task_id": "backup-2024-01-15", + "command": "cd /opt/backup && ./run_backup.sh" +} +``` + +文件名建议使用唯一标识,如 `task-backup-2024-01-15.json`。 + +### 查看日志 + +```bash +# 实时日志 +tail -f /var/log/proxmox_task_controller.log + +# 系统服务日志 +journalctl -u proxmox-task -f +``` + +## 系统要求 + +- Python 3.8+ +- Proxmox VE 6.0+ +- 网络访问权限 +- SSH密钥认证配置 + +## 安全建议 + +- 使用API Token而非密码 +- 限制Proxmox API访问IP +- 使用专用SSH密钥 +- 定期更新依赖库 + +## 故障排查 + +1. **Proxmox连接失败**:检查API Token和主机地址 +2. **SSH连接失败**:验证密钥路径和VM SSH配置 +3. **命令执行超时**:调整命令超时时间 +4. **VM启动慢**:增加等待就绪时间 + +## 许可证 + +MIT License \ No newline at end of file diff --git a/proxmox-task.service b/proxmox-task.service new file mode 100644 index 0000000..119fb8a --- /dev/null +++ b/proxmox-task.service @@ -0,0 +1,19 @@ +[Unit] +Description=Proxmox Task Controller +After=network.target + +[Service] +Type=simple +User=root +WorkingDirectory=/opt/proxmox-task +ExecStart=/usr/bin/python3 /opt/proxmox-task/proxmox_task_controller.py +Restart=always +RestartSec=10 +StandardOutput=journal +StandardError=journal + +# 环境变量文件 +EnvironmentFile=/opt/proxmox-task/.env + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/proxmox_task_controller.py b/proxmox_task_controller.py new file mode 100644 index 0000000..68e7b4b --- /dev/null +++ b/proxmox_task_controller.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python3 +""" +Proxmox 虚拟机任务控制器 +小主机待机监听 -> 接收命令 -> 启动VM -> 执行任务 -> 汇报进展 -> 关闭VM +""" + +import os +import sys +import time +import json +import logging +import subprocess +from pathlib import Path +from typing import Dict, Any, Optional +from dataclasses import dataclass + +import requests +import paramiko +from dotenv import load_dotenv + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('/var/log/proxmox_task_controller.log'), + logging.StreamHandler(sys.stdout) + ] +) +logger = logging.getLogger(__name__) + +@dataclass +class Config: + """配置类""" + proxmox_host: str + proxmox_api_token: str + vm_id: int + vm_ssh_host: str + vm_ssh_port: int + vm_ssh_user: str + vm_ssh_key_path: str + command_dir: str + status_report_url: Optional[str] = None + +class ConfigLoader: + """配置加载器""" + @staticmethod + def load() -> Config: + load_dotenv() + return Config( + proxmox_host=os.getenv('PROXMOX_HOST', 'https://localhost:8006'), + proxmox_api_token=os.getenv('PROXMOX_API_TOKEN', ''), + vm_id=int(os.getenv('VM_ID', '100')), + vm_ssh_host=os.getenv('VM_SSH_HOST', 'localhost'), + vm_ssh_port=int(os.getenv('VM_SSH_PORT', '22')), + vm_ssh_user=os.getenv('VM_SSH_USER', 'root'), + vm_ssh_key_path=os.getenv('VM_SSH_KEY_PATH', '~/.ssh/id_rsa'), + command_dir=os.getenv('COMMAND_DIR', '/var/task-commands'), + status_report_url=os.getenv('STATUS_REPORT_URL') + ) + +class ProxmoxAPI: + """Proxmox API 客户端""" + def __init__(self, config: Config): + self.config = config + self.base_url = f"{config.proxmox_host}/api2/json" + self.headers = { + 'Authorization': f'PVEAPIToken={config.proxmox_api_token}' + } + + def start_vm(self) -> bool: + """启动虚拟机""" + try: + url = f"{self.base_url}/nodes/pve/qemu/{self.config.vm_id}/status/start" + response = requests.post(url, headers=self.headers, verify=False, timeout=30) + if response.status_code == 200: + logger.info(f"✅ VM {self.config.vm_id} 启动命令已发送") + return True + else: + logger.error(f"❌ 启动VM失败: {response.status_code} - {response.text}") + return False + except Exception as e: + logger.error(f"❌ 启动VM异常: {e}") + return False + + def shutdown_vm(self) -> bool: + """关闭虚拟机""" + try: + url = f"{self.base_url}/nodes/pve/qemu/{self.config.vm_id}/status/shutdown" + response = requests.post(url, headers=self.headers, verify=False, timeout=60) + if response.status_code == 200: + logger.info(f"✅ VM {self.config.vm_id} 关闭命令已发送") + return True + else: + logger.error(f"❌ 关闭VM失败: {response.status_code} - {response.text}") + return False + except Exception as e: + logger.error(f"❌ 关闭VM异常: {e}") + return False + + def get_vm_status(self) -> Dict[str, Any]: + """获取虚拟机状态""" + try: + url = f"{self.base_url}/nodes/pve/qemu/{self.config.vm_id}/status/current" + response = requests.get(url, headers=self.headers, verify=False, timeout=10) + if response.status_code == 200: + return response.json().get('data', {}) + return {} + except Exception as e: + logger.error(f"❌ 获取VM状态异常: {e}") + return {} + +class SSHExecutor: + """SSH 执行器""" + def __init__(self, config: Config): + self.config = config + + def execute(self, command: str, timeout: int = 300) -> tuple: + """在VM中执行命令""" + try: + key_path = os.path.expanduser(self.config.vm_ssh_key_path) + if not os.path.exists(key_path): + logger.error(f"❌ SSH密钥不存在: {key_path}") + return False, f"SSH key not found: {key_path}" + + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect( + hostname=self.config.vm_ssh_host, + port=self.config.vm_ssh_port, + username=self.config.vm_ssh_user, + key_filename=key_path, + timeout=30 + ) + + logger.info(f"🔌 SSH连接到VM成功") + stdin, stdout, stderr = client.exec_command(command, timeout=timeout) + + exit_code = stdout.channel.recv_exit_status() + output = stdout.read().decode('utf-8') + error = stderr.read().decode('utf-8') + + client.close() + + if exit_code == 0: + logger.info(f"✅ 命令执行成功: {command[:50]}...") + return True, output + else: + logger.error(f"❌ 命令执行失败: {command[:50]}... (exit code: {exit_code})") + return False, error + + except Exception as e: + logger.error(f"❌ SSH执行异常: {e}") + return False, str(e) + +class CommandProcessor: + """命令处理器""" + def __init__(self, config: Config): + self.config = config + self.proxmox = ProxmoxAPI(config) + self.ssh = SSHExecutor(config) + self.processed_commands = set() + + def load_processed_commands(self): + """加载已处理的命令记录""" + record_file = Path(self.config.command_dir) / '.processed_commands' + if record_file.exists(): + with open(record_file, 'r') as f: + self.processed_commands = set(json.load(f)) + + def save_processed_commands(self): + """保存已处理的命令记录""" + record_file = Path(self.config.command_dir) / '.processed_commands' + with open(record_file, 'w') as f: + json.dump(list(self.processed_commands), f) + + def report_status(self, status: str, details: str = ""): + """汇报状态""" + if self.config.status_report_url: + try: + payload = { + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), + 'status': status, + 'details': details + } + requests.post(self.config.status_report_url, json=payload, timeout=10) + except Exception as e: + logger.error(f"❌ 状态汇报失败: {e}") + + logger.info(f"📊 状态更新: {status} - {details}") + + def wait_for_vm_ready(self, max_wait: int = 120) -> bool: + """等待VM就绪""" + logger.info(f"⏳ 等待VM {self.config.vm_id} 就绪...") + start_time = time.time() + + while time.time() - start_time < max_wait: + status = self.proxmox.get_vm_status() + if status.get('status') == 'running': + # 等待SSH可用 + time.sleep(10) + try: + self.ssh.execute('echo "VM ready"', timeout=10) + logger.info("✅ VM已就绪") + return True + except: + logger.info("⏳ VM运行中,等待SSH...") + time.sleep(5) + + logger.error("❌ 等待VM就绪超时") + return False + + def process_command(self, command_file: Path): + """处理单个命令文件""" + if command_file.name in self.processed_commands: + return + + logger.info(f"📋 处理命令文件: {command_file.name}") + + try: + with open(command_file, 'r') as f: + command_data = json.load(f) + + task_id = command_data.get('task_id', 'unknown') + command = command_data.get('command', '') + + self.report_status('starting_vm', f'Task {task_id}') + + # 1. 启动VM + if not self.proxmox.start_vm(): + self.report_status('vm_start_failed', f'Task {task_id}') + return + + # 2. 等待VM就绪 + if not self.wait_for_vm_ready(): + self.report_status('vm_ready_timeout', f'Task {task_id}') + self.proxmox.shutdown_vm() + return + + self.report_status('executing_task', f'Task {task_id}') + + # 3. 执行命令 + success, output = self.ssh.execute(command, timeout=300) + + if success: + self.report_status('task_completed', f'Task {task_id}') + logger.info(f"✅ 任务完成: {task_id}") + else: + self.report_status('task_failed', f'Task {task_id}: {output}') + logger.error(f"❌ 任务失败: {task_id}") + + # 4. 关闭VM + self.report_status('shutting_down_vm', f'Task {task_id}') + self.proxmox.shutdown_vm() + + # 5. 移动命令文件到完成目录 + done_dir = Path(self.config.command_dir) / 'done' + done_dir.mkdir(exist_ok=True) + command_file.rename(done_dir / command_file.name) + + # 6. 记录已处理 + self.processed_commands.add(command_file.name) + self.save_processed_commands() + + except Exception as e: + logger.error(f"❌ 处理命令异常: {e}") + self.report_status('command_error', str(e)) + +class TaskController: + """主任务控制器""" + def __init__(self): + self.config = ConfigLoader.load() + self.processor = CommandProcessor(self.config) + self.running = True + + def setup(self): + """初始化设置""" + logger.info("🚀 Proxmox任务控制器启动") + logger.info(f"📁 监控命令目录: {self.config.command_dir}") + + # 创建命令目录 + Path(self.config.command_dir).mkdir(parents=True, exist_ok=True) + + # 加载已处理命令记录 + self.processor.load_processed_commands() + + # 检查Proxmox连接 + try: + status = self.processor.proxmox.get_vm_status() + if status: + logger.info(f"✅ Proxmox连接正常,VM ID: {self.config.vm_id}") + else: + logger.error("❌ Proxmox连接失败") + except Exception as e: + logger.error(f"❌ Proxmox连接异常: {e}") + + def run(self): + """主运行循环""" + self.setup() + + logger.info("⏳ 进入监听模式,等待命令...") + + while self.running: + try: + command_dir = Path(self.config.command_dir) + command_files = list(command_dir.glob('*.json')) + + for cmd_file in command_files: + if cmd_file.name not in self.processor.processed_commands: + self.processor.process_command(cmd_file) + + time.sleep(5) + + except KeyboardInterrupt: + logger.info("🛑 收到停止信号") + self.running = False + except Exception as e: + logger.error(f"❌ 主循环异常: {e}") + time.sleep(10) + + logger.info("👋 Proxmox任务控制器停止") + +def main(): + """主函数""" + controller = TaskController() + controller.run() + +if __name__ == '__main__': + main() \ No newline at end of file