diff --git a/README.md b/README.md index 6bf8a05..d151269 100644 --- a/README.md +++ b/README.md @@ -1,104 +1,87 @@ -# Proxmox 虚拟机任务控制器 +# Proxmox VM 任务控制器 -一个Python脚本系统,用于自动化控制Proxmox虚拟机执行任务。小主机平时待机监听,接收命令后启动VM执行任务,完成后自动关闭VM。 +一个Python脚本,用于自动化控制Proxmox虚拟机执行任务。小主机平时待机监听,接受命令后启动VM执行任务,完成后关闭VM。 -## 功能特性 +## 功能特点 -- 📡 **命令监听**:监控指定目录的JSON命令文件 -- 🖥️ **VM管理**:通过Proxmox API启动/关闭虚拟机 -- 🔧 **任务执行**:通过SSH在VM内执行自定义命令 -- 📊 **状态汇报**:实时汇报任务执行状态 -- 🔒 **安全认证**:支持API Token和SSH密钥认证 -- 📝 **日志记录**:完整的操作日志和错误追踪 +- **待机监听**:持续监控命令目录,平时保持低功耗状态 +- **自动启停**:按需启动/关闭Proxmox虚拟机 +- **任务执行**:通过SSH在VM内执行命令 +- **状态汇报**:实时记录任务执行状态和结果 +- **错误处理**:完善的错误处理和超时机制 ## 快速开始 -### 1. 安装依赖 +### 环境要求 +- Python 3.8+ +- Proxmox VE 6.x+ +- SSH访问权限 + +### 安装配置 + +1. 克隆仓库 ```bash -pip install requests paramiko python-dotenv +git clone http://124.223.26.33:3000/xiaji/proxmox-task.git +cd proxmox-task ``` -### 2. 配置环境 - -复制环境模板并填写配置: - +2. 设置环境变量 ```bash -cp .env.template .env -# 编辑 .env 文件,填写Proxmox和VM信息 +export PROXMOX_HOST="your-proxmox-host" +export PROXMOX_USER="root@pam" +export PROXMOX_TOKEN="your-api-token" +export VM_ID="100" +export SSH_USER="ubuntu" +export SSH_KEY_PATH="/path/to/ssh/key" ``` -配置说明: -- `PROXMOX_HOST`: Proxmox服务器地址 -- `PROXMOX_API_TOKEN`: Proxmox API Token -- `VM_ID`: 目标虚拟机ID -- `VM_SSH_*`: 虚拟机SSH连接信息 -- `COMMAND_DIR`: 命令监听目录 - -### 3. 运行脚本 - +3. 运行脚本 ```bash python3 proxmox_task_controller.py ``` -### 4. 作为系统服务(推荐) - -```bash -# 复制文件到系统目录 -sudo cp proxmox_task_controller.py /opt/proxmox-task/ -sudo cp .env /opt/proxmox-task/ -sudo cp proxmox-task.service /etc/systemd/system/ - -# 启用并启动服务 -sudo systemctl enable proxmox-task -sudo systemctl start proxmox-task -``` - -## 使用示例 - -### 创建任务命令 - -在 `COMMAND_DIR` 目录(默认 `/var/task-commands`)创建JSON文件: +### 使用示例 +1. 创建任务命令文件 `/var/task-commands/task-001.json`: ```json { - "task_id": "backup-2024-01-15", - "command": "cd /opt/backup && ./run_backup.sh" + "task_id": "task-001", + "command": "cd /opt/myapp && python3 run_task.py" } ``` -文件名建议使用唯一标识,如 `task-backup-2024-01-15.json`。 - -### 查看日志 - -```bash -# 实时日志 -tail -f /var/log/proxmox_task_controller.log - -# 系统服务日志 -journalctl -u proxmox-task -f +2. 查看状态文件 `/var/task-status/task-001.json`: +```json +{ + "task_id": "task-001", + "status": "success", + "message": "任务执行成功", + "output": "Task completed successfully...", + "timestamp": 1234567890 +} ``` -## 系统要求 +## 配置选项 -- Python 3.8+ -- Proxmox VE 6.0+ -- 网络访问权限 -- SSH密钥认证配置 +| 变量名 | 说明 | 默认值 | +|--------|------|--------| +| PROXMOX_HOST | Proxmox主机地址 | localhost | +| PROXMOX_USER | API用户 | root@pam | +| PROXMOX_TOKEN | API Token | (必需) | +| VM_ID | 虚拟机ID | 100 | +| SSH_USER | SSH用户名 | ubuntu | +| SSH_KEY_PATH | SSH私钥路径 | ~/.ssh/id_rsa | +| COMMAND_DIR | 命令目录 | /var/task-commands | +| STATUS_DIR | 状态目录 | /var/task-status | +| CHECK_INTERVAL | 检查间隔(秒) | 10 | ## 安全建议 -- 使用API Token而非密码 -- 限制Proxmox API访问IP -- 使用专用SSH密钥 -- 定期更新依赖库 - -## 故障排查 - -1. **Proxmox连接失败**:检查API Token和主机地址 -2. **SSH连接失败**:验证密钥路径和VM SSH配置 -3. **命令执行超时**:调整命令超时时间 -4. **VM启动慢**:增加等待就绪时间 +- 使用Proxmox API Token而非密码 +- 限制API Token权限 +- 使用SSH密钥认证 +- 在受信任网络中运行 ## 许可证 diff --git a/proxmox_task_controller.py b/proxmox_task_controller.py index 68e7b4b..aa9a145 100644 --- a/proxmox_task_controller.py +++ b/proxmox_task_controller.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ -Proxmox 虚拟机任务控制器 -小主机待机监听 -> 接收命令 -> 启动VM -> 执行任务 -> 汇报进展 -> 关闭VM +Proxmox VM 任务控制器 +小主机待机监听 → 接受命令 → 启动VM → 执行任务 → 汇报进展 → 关闭VM """ import os @@ -10,320 +10,211 @@ import time import json import logging import subprocess +import requests from pathlib import Path from typing import Dict, Any, Optional from dataclasses import dataclass -import requests -import paramiko -from dotenv import load_dotenv - -# 配置日志 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler('/var/log/proxmox_task_controller.log'), - logging.StreamHandler(sys.stdout) - ] -) -logger = logging.getLogger(__name__) - +# 配置 @dataclass class Config: - """配置类""" proxmox_host: str - proxmox_api_token: str + proxmox_user: str + proxmox_token: str vm_id: int - vm_ssh_host: str - vm_ssh_port: int - vm_ssh_user: str - vm_ssh_key_path: str - command_dir: str - status_report_url: Optional[str] = None + ssh_user: str + ssh_key_path: str + command_dir: str = "/var/task-commands" + status_dir: str = "/var/task-status" + check_interval: int = 10 # 检查间隔(秒) -class ConfigLoader: - """配置加载器""" - @staticmethod - def load() -> Config: - load_dotenv() - return Config( - proxmox_host=os.getenv('PROXMOX_HOST', 'https://localhost:8006'), - proxmox_api_token=os.getenv('PROXMOX_API_TOKEN', ''), - vm_id=int(os.getenv('VM_ID', '100')), - vm_ssh_host=os.getenv('VM_SSH_HOST', 'localhost'), - vm_ssh_port=int(os.getenv('VM_SSH_PORT', '22')), - vm_ssh_user=os.getenv('VM_SSH_USER', 'root'), - vm_ssh_key_path=os.getenv('VM_SSH_KEY_PATH', '~/.ssh/id_rsa'), - command_dir=os.getenv('COMMAND_DIR', '/var/task-commands'), - status_report_url=os.getenv('STATUS_REPORT_URL') - ) - -class ProxmoxAPI: - """Proxmox API 客户端""" +class ProxmoxController: def __init__(self, config: Config): self.config = config - self.base_url = f"{config.proxmox_host}/api2/json" + self.base_url = f"https://{config.proxmox_host}:8006/api2/json" self.headers = { - 'Authorization': f'PVEAPIToken={config.proxmox_api_token}' + "Authorization": f"PVEAPIToken={config.proxmox_user}={config.token}" } - + logging.basicConfig(level=logging.INFO) + self.logger = logging.getLogger(__name__) + + def vm_status(self) -> Dict[str, Any]: + """获取VM状态""" + url = f"{self.base_url}/nodes/proxmox/qemu/{self.config.vm_id}/status/current" + try: + resp = requests.get(url, headers=self.headers, verify=False, timeout=10) + return resp.json() + except Exception as e: + self.logger.error(f"获取VM状态失败: {e}") + return {} + def start_vm(self) -> bool: - """启动虚拟机""" + """启动VM""" + self.logger.info(f"正在启动VM {self.config.vm_id}") + url = f"{self.base_url}/nodes/proxmox/qemu/{self.config.vm_id}/status/start" try: - url = f"{self.base_url}/nodes/pve/qemu/{self.config.vm_id}/status/start" - response = requests.post(url, headers=self.headers, verify=False, timeout=30) - if response.status_code == 200: - logger.info(f"✅ VM {self.config.vm_id} 启动命令已发送") + resp = requests.post(url, headers=self.headers, verify=False, timeout=30) + if resp.status_code == 200: + self.logger.info("VM启动命令已发送") return True - else: - logger.error(f"❌ 启动VM失败: {response.status_code} - {response.text}") - return False except Exception as e: - logger.error(f"❌ 启动VM异常: {e}") - return False - - def shutdown_vm(self) -> bool: - """关闭虚拟机""" - try: - url = f"{self.base_url}/nodes/pve/qemu/{self.config.vm_id}/status/shutdown" - response = requests.post(url, headers=self.headers, verify=False, timeout=60) - if response.status_code == 200: - logger.info(f"✅ VM {self.config.vm_id} 关闭命令已发送") - return True - else: - logger.error(f"❌ 关闭VM失败: {response.status_code} - {response.text}") - return False - except Exception as e: - logger.error(f"❌ 关闭VM异常: {e}") - return False - - def get_vm_status(self) -> Dict[str, Any]: - """获取虚拟机状态""" - try: - url = f"{self.base_url}/nodes/pve/qemu/{self.config.vm_id}/status/current" - response = requests.get(url, headers=self.headers, verify=False, timeout=10) - if response.status_code == 200: - return response.json().get('data', {}) - return {} - except Exception as e: - logger.error(f"❌ 获取VM状态异常: {e}") - return {} - -class SSHExecutor: - """SSH 执行器""" - def __init__(self, config: Config): - self.config = config - - def execute(self, command: str, timeout: int = 300) -> tuple: - """在VM中执行命令""" - try: - key_path = os.path.expanduser(self.config.vm_ssh_key_path) - if not os.path.exists(key_path): - logger.error(f"❌ SSH密钥不存在: {key_path}") - return False, f"SSH key not found: {key_path}" - - client = paramiko.SSHClient() - client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - client.connect( - hostname=self.config.vm_ssh_host, - port=self.config.vm_ssh_port, - username=self.config.vm_ssh_user, - key_filename=key_path, - timeout=30 - ) - - logger.info(f"🔌 SSH连接到VM成功") - stdin, stdout, stderr = client.exec_command(command, timeout=timeout) - - exit_code = stdout.channel.recv_exit_status() - output = stdout.read().decode('utf-8') - error = stderr.read().decode('utf-8') - - client.close() - - if exit_code == 0: - logger.info(f"✅ 命令执行成功: {command[:50]}...") - return True, output - else: - logger.error(f"❌ 命令执行失败: {command[:50]}... (exit code: {exit_code})") - return False, error - - except Exception as e: - logger.error(f"❌ SSH执行异常: {e}") - return False, str(e) - -class CommandProcessor: - """命令处理器""" - def __init__(self, config: Config): - self.config = config - self.proxmox = ProxmoxAPI(config) - self.ssh = SSHExecutor(config) - self.processed_commands = set() - - def load_processed_commands(self): - """加载已处理的命令记录""" - record_file = Path(self.config.command_dir) / '.processed_commands' - if record_file.exists(): - with open(record_file, 'r') as f: - self.processed_commands = set(json.load(f)) - - def save_processed_commands(self): - """保存已处理的命令记录""" - record_file = Path(self.config.command_dir) / '.processed_commands' - with open(record_file, 'w') as f: - json.dump(list(self.processed_commands), f) - - def report_status(self, status: str, details: str = ""): - """汇报状态""" - if self.config.status_report_url: - try: - payload = { - 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), - 'status': status, - 'details': details - } - requests.post(self.config.status_report_url, json=payload, timeout=10) - except Exception as e: - logger.error(f"❌ 状态汇报失败: {e}") - - logger.info(f"📊 状态更新: {status} - {details}") - - def wait_for_vm_ready(self, max_wait: int = 120) -> bool: - """等待VM就绪""" - logger.info(f"⏳ 等待VM {self.config.vm_id} 就绪...") - start_time = time.time() - - while time.time() - start_time < max_wait: - status = self.proxmox.get_vm_status() - if status.get('status') == 'running': - # 等待SSH可用 - time.sleep(10) - try: - self.ssh.execute('echo "VM ready"', timeout=10) - logger.info("✅ VM已就绪") - return True - except: - logger.info("⏳ VM运行中,等待SSH...") - time.sleep(5) - - logger.error("❌ 等待VM就绪超时") + self.logger.error(f"启动VM失败: {e}") return False - - def process_command(self, command_file: Path): - """处理单个命令文件""" - if command_file.name in self.processed_commands: - return - - logger.info(f"📋 处理命令文件: {command_file.name}") - - try: - with open(command_file, 'r') as f: - command_data = json.load(f) - - task_id = command_data.get('task_id', 'unknown') - command = command_data.get('command', '') - - self.report_status('starting_vm', f'Task {task_id}') - - # 1. 启动VM - if not self.proxmox.start_vm(): - self.report_status('vm_start_failed', f'Task {task_id}') - return - - # 2. 等待VM就绪 - if not self.wait_for_vm_ready(): - self.report_status('vm_ready_timeout', f'Task {task_id}') - self.proxmox.shutdown_vm() - return - - self.report_status('executing_task', f'Task {task_id}') - - # 3. 执行命令 - success, output = self.ssh.execute(command, timeout=300) - - if success: - self.report_status('task_completed', f'Task {task_id}') - logger.info(f"✅ 任务完成: {task_id}") - else: - self.report_status('task_failed', f'Task {task_id}: {output}') - logger.error(f"❌ 任务失败: {task_id}") - - # 4. 关闭VM - self.report_status('shutting_down_vm', f'Task {task_id}') - self.proxmox.shutdown_vm() - - # 5. 移动命令文件到完成目录 - done_dir = Path(self.config.command_dir) / 'done' - done_dir.mkdir(exist_ok=True) - command_file.rename(done_dir / command_file.name) - - # 6. 记录已处理 - self.processed_commands.add(command_file.name) - self.save_processed_commands() - - except Exception as e: - logger.error(f"❌ 处理命令异常: {e}") - self.report_status('command_error', str(e)) -class TaskController: - """主任务控制器""" - def __init__(self): - self.config = ConfigLoader.load() - self.processor = CommandProcessor(self.config) - self.running = True - - def setup(self): - """初始化设置""" - logger.info("🚀 Proxmox任务控制器启动") - logger.info(f"📁 监控命令目录: {self.config.command_dir}") - - # 创建命令目录 - Path(self.config.command_dir).mkdir(parents=True, exist_ok=True) - - # 加载已处理命令记录 - self.processor.load_processed_commands() - - # 检查Proxmox连接 + def stop_vm(self) -> bool: + """关闭VM""" + self.logger.info(f"正在关闭VM {self.config.vm_id}") + url = f"{self.base_url}/nodes/proxmox/qemu/{self.config.vm_id}/status/stop" try: - status = self.processor.proxmox.get_vm_status() - if status: - logger.info(f"✅ Proxmox连接正常,VM ID: {self.config.vm_id}") - else: - logger.error("❌ Proxmox连接失败") + resp = requests.post(url, headers=self.headers, verify=False, timeout=30) + if resp.status_code == 200: + self.logger.info("VM关闭命令已发送") + return True except Exception as e: - logger.error(f"❌ Proxmox连接异常: {e}") - + self.logger.error(f"关闭VM失败: {e}") + return False + + def ssh_execute(self, command: str) -> tuple[int, str, str]: + """在VM内执行命令""" + ssh_cmd = [ + "ssh", + "-i", self.config.ssh_key_path, + "-o", "StrictHostKeyChecking=no", + "-o", "ConnectTimeout=30", + f"{self.config.ssh_user}@localhost", + command + ] + try: + result = subprocess.run(ssh_cmd, capture_output=True, text=True, timeout=120) + return result.returncode, result.stdout, result.stderr + except subprocess.TimeoutExpired: + return -1, "", "命令执行超时" + except Exception as e: + return -1, "", str(e) + + def wait_for_vm_ready(self, timeout: int = 120) -> bool: + """等待VM启动完成并可以SSH连接""" + start_time = time.time() + while time.time() - start_time < timeout: + status = self.vm_status() + if status.get("status") == "running": + # 测试SSH连接 + ret, _, _ = self.ssh_execute("echo 'VM ready'") + if ret == 0: + self.logger.info("VM已就绪") + return True + time.sleep(5) + self.logger.error("VM启动超时") + return False + + def process_command(self, cmd_file: Path) -> bool: + """处理单个命令文件""" + self.logger.info(f"处理命令文件: {cmd_file}") + + # 读取命令 + try: + with open(cmd_file, 'r') as f: + task_data = json.load(f) + except Exception as e: + self.logger.error(f"读取命令文件失败: {e}") + return False + + task_id = task_data.get("task_id", "unknown") + command = task_data.get("command", "") + + # 更新状态 + self.update_status(task_id, "starting", "准备启动VM") + + # 启动VM + if not self.start_vm(): + self.update_status(task_id, "failed", "启动VM失败") + return False + + # 等待VM就绪 + if not self.wait_for_vm_ready(): + self.update_status(task_id, "failed", "VM启动超时") + self.stop_vm() + return False + + self.update_status(task_id, "running", "VM已启动,开始执行任务") + + # 执行命令 + ret, stdout, stderr = self.ssh_execute(command) + + if ret == 0: + self.update_status(task_id, "success", "任务执行成功", stdout) + else: + self.update_status(task_id, "failed", "任务执行失败", stderr) + + # 关闭VM + self.stop_vm() + + # 移动命令文件到完成目录 + done_dir = cmd_file.parent / "done" + done_dir.mkdir(exist_ok=True) + cmd_file.rename(done_dir / cmd_file.name) + + return ret == 0 + + def update_status(self, task_id: str, status: str, message: str, output: str = ""): + """更新任务状态""" + status_file = Path(self.config.status_dir) / f"{task_id}.json" + status_file.parent.mkdir(exist_ok=True) + + status_data = { + "task_id": task_id, + "status": status, + "message": message, + "output": output, + "timestamp": time.time() + } + + with open(status_file, 'w') as f: + json.dump(status_data, f, indent=2) + def run(self): """主运行循环""" - self.setup() + self.logger.info("Proxmox任务控制器启动") - logger.info("⏳ 进入监听模式,等待命令...") + # 创建必要目录 + Path(self.config.command_dir).mkdir(exist_ok=True) + Path(self.config.status_dir).mkdir(exist_ok=True) - while self.running: + while True: try: - command_dir = Path(self.config.command_dir) - command_files = list(command_dir.glob('*.json')) + # 检查新命令文件 + cmd_files = sorted(Path(self.config.command_dir).glob("*.json")) - for cmd_file in command_files: - if cmd_file.name not in self.processor.processed_commands: - self.processor.process_command(cmd_file) + for cmd_file in cmd_files: + self.process_command(cmd_file) - time.sleep(5) + # 等待下次检查 + time.sleep(self.config.check_interval) except KeyboardInterrupt: - logger.info("🛑 收到停止信号") - self.running = False + self.logger.info("收到终止信号,退出") + break except Exception as e: - logger.error(f"❌ 主循环异常: {e}") - time.sleep(10) - - logger.info("👋 Proxmox任务控制器停止") + self.logger.error(f"主循环错误: {e}") + time.sleep(self.config.check_interval) def main(): - """主函数""" - controller = TaskController() + # 从环境变量加载配置 + config = Config( + proxmox_host=os.getenv("PROXMOX_HOST", "localhost"), + proxmox_user=os.getenv("PROXMOX_USER", "root@pam"), + proxmox_token=os.getenv("PROXMOX_TOKEN", ""), + vm_id=int(os.getenv("VM_ID", "100")), + ssh_user=os.getenv("SSH_USER", "ubuntu"), + ssh_key_path=os.getenv("SSH_KEY_PATH", "~/.ssh/id_rsa"), + ) + + if not config.proxmox_token: + print("错误: 请设置 PROXMOX_TOKEN 环境变量") + sys.exit(1) + + controller = ProxmoxController(config) controller.run() -if __name__ == '__main__': +if __name__ == "__main__": main() \ No newline at end of file