Add Proxmox VM task controller script and README

This commit is contained in:
xiaji
2026-03-30 15:41:08 +08:00
parent 9082cc5e65
commit a6ab2395b9
2 changed files with 229 additions and 355 deletions

127
README.md
View File

@@ -1,104 +1,87 @@
# Proxmox 虚拟机任务控制器 # Proxmox VM 任务控制器
一个Python脚本系统用于自动化控制Proxmox虚拟机执行任务。小主机平时待机监听命令后启动VM执行任务完成后自动关闭VM。 一个Python脚本用于自动化控制Proxmox虚拟机执行任务。小主机平时待机监听命令后启动VM执行任务完成后关闭VM。
## 功能特 ## 功能特
- 📡 **命令监听**监控指定目录的JSON命令文件 - **待机监听**:持续监控命令目录,平时保持低功耗状态
- 🖥️ **VM管理**通过Proxmox API启动/关闭虚拟机 - **自动启停**:按需启动/关闭Proxmox虚拟机
- 🔧 **任务执行**通过SSH在VM内执行自定义命令 - **任务执行**通过SSH在VM内执行命令
- 📊 **状态汇报**:实时汇报任务执行状态 - **状态汇报**:实时记录任务执行状态和结果
- 🔒 **安全认证**支持API Token和SSH密钥认证 - **错误处理**:完善的错误处理和超时机制
- 📝 **日志记录**:完整的操作日志和错误追踪
## 快速开始 ## 快速开始
### 1. 安装依赖 ### 环境要求
- Python 3.8+
- Proxmox VE 6.x+
- SSH访问权限
### 安装配置
1. 克隆仓库
```bash ```bash
pip install requests paramiko python-dotenv git clone http://124.223.26.33:3000/xiaji/proxmox-task.git
cd proxmox-task
``` ```
### 2. 配置环境 2. 设置环境变量
复制环境模板并填写配置:
```bash ```bash
cp .env.template .env export PROXMOX_HOST="your-proxmox-host"
# 编辑 .env 文件填写Proxmox和VM信息 export PROXMOX_USER="root@pam"
export PROXMOX_TOKEN="your-api-token"
export VM_ID="100"
export SSH_USER="ubuntu"
export SSH_KEY_PATH="/path/to/ssh/key"
``` ```
配置说明: 3. 运行脚本
- `PROXMOX_HOST`: Proxmox服务器地址
- `PROXMOX_API_TOKEN`: Proxmox API Token
- `VM_ID`: 目标虚拟机ID
- `VM_SSH_*`: 虚拟机SSH连接信息
- `COMMAND_DIR`: 命令监听目录
### 3. 运行脚本
```bash ```bash
python3 proxmox_task_controller.py python3 proxmox_task_controller.py
``` ```
### 4. 作为系统服务(推荐) ### 使用示例
```bash
# 复制文件到系统目录
sudo cp proxmox_task_controller.py /opt/proxmox-task/
sudo cp .env /opt/proxmox-task/
sudo cp proxmox-task.service /etc/systemd/system/
# 启用并启动服务
sudo systemctl enable proxmox-task
sudo systemctl start proxmox-task
```
## 使用示例
### 创建任务命令
`COMMAND_DIR` 目录(默认 `/var/task-commands`创建JSON文件
1. 创建任务命令文件 `/var/task-commands/task-001.json`:
```json ```json
{ {
"task_id": "backup-2024-01-15", "task_id": "task-001",
"command": "cd /opt/backup && ./run_backup.sh" "command": "cd /opt/myapp && python3 run_task.py"
} }
``` ```
文件名建议使用唯一标识,如 `task-backup-2024-01-15.json` 2. 查看状态文件 `/var/task-status/task-001.json`:
```json
### 查看日志 {
"task_id": "task-001",
```bash "status": "success",
# 实时日志 "message": "任务执行成功",
tail -f /var/log/proxmox_task_controller.log "output": "Task completed successfully...",
"timestamp": 1234567890
# 系统服务日志 }
journalctl -u proxmox-task -f
``` ```
## 系统要求 ## 配置选项
- Python 3.8+ | 变量名 | 说明 | 默认值 |
- Proxmox VE 6.0+ |--------|------|--------|
- 网络访问权限 | PROXMOX_HOST | Proxmox主机地址 | localhost |
- SSH密钥认证配置 | PROXMOX_USER | API用户 | root@pam |
| PROXMOX_TOKEN | API Token | (必需) |
| VM_ID | 虚拟机ID | 100 |
| SSH_USER | SSH用户名 | ubuntu |
| SSH_KEY_PATH | SSH私钥路径 | ~/.ssh/id_rsa |
| COMMAND_DIR | 命令目录 | /var/task-commands |
| STATUS_DIR | 状态目录 | /var/task-status |
| CHECK_INTERVAL | 检查间隔(秒) | 10 |
## 安全建议 ## 安全建议
- 使用API Token而非密码 - 使用Proxmox API Token而非密码
- 限制Proxmox API访问IP - 限制API Token权限
- 使用专用SSH密钥 - 使用SSH密钥认证
- 定期更新依赖库 - 在受信任网络中运行
## 故障排查
1. **Proxmox连接失败**检查API Token和主机地址
2. **SSH连接失败**验证密钥路径和VM SSH配置
3. **命令执行超时**:调整命令超时时间
4. **VM启动慢**:增加等待就绪时间
## 许可证 ## 许可证

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Proxmox 虚拟机任务控制器 Proxmox VM 任务控制器
小主机待机监听 ->命令 -> 启动VM -> 执行任务 -> 汇报进展 -> 关闭VM 小主机待机监听 命令 启动VM 执行任务 汇报进展 关闭VM
""" """
import os import os
@@ -10,320 +10,211 @@ import time
import json import json
import logging import logging
import subprocess import subprocess
import requests
from pathlib import Path from pathlib import Path
from typing import Dict, Any, Optional from typing import Dict, Any, Optional
from dataclasses import dataclass from dataclasses import dataclass
import requests # 配置
import paramiko
from dotenv import load_dotenv
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/proxmox_task_controller.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
@dataclass @dataclass
class Config: class Config:
"""配置类"""
proxmox_host: str proxmox_host: str
proxmox_api_token: str proxmox_user: str
proxmox_token: str
vm_id: int vm_id: int
vm_ssh_host: str ssh_user: str
vm_ssh_port: int ssh_key_path: str
vm_ssh_user: str command_dir: str = "/var/task-commands"
vm_ssh_key_path: str status_dir: str = "/var/task-status"
command_dir: str check_interval: int = 10 # 检查间隔(秒)
status_report_url: Optional[str] = None
class ConfigLoader: class ProxmoxController:
"""配置加载器"""
@staticmethod
def load() -> Config:
load_dotenv()
return Config(
proxmox_host=os.getenv('PROXMOX_HOST', 'https://localhost:8006'),
proxmox_api_token=os.getenv('PROXMOX_API_TOKEN', ''),
vm_id=int(os.getenv('VM_ID', '100')),
vm_ssh_host=os.getenv('VM_SSH_HOST', 'localhost'),
vm_ssh_port=int(os.getenv('VM_SSH_PORT', '22')),
vm_ssh_user=os.getenv('VM_SSH_USER', 'root'),
vm_ssh_key_path=os.getenv('VM_SSH_KEY_PATH', '~/.ssh/id_rsa'),
command_dir=os.getenv('COMMAND_DIR', '/var/task-commands'),
status_report_url=os.getenv('STATUS_REPORT_URL')
)
class ProxmoxAPI:
"""Proxmox API 客户端"""
def __init__(self, config: Config): def __init__(self, config: Config):
self.config = config self.config = config
self.base_url = f"{config.proxmox_host}/api2/json" self.base_url = f"https://{config.proxmox_host}:8006/api2/json"
self.headers = { self.headers = {
'Authorization': f'PVEAPIToken={config.proxmox_api_token}' "Authorization": f"PVEAPIToken={config.proxmox_user}={config.token}"
} }
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def vm_status(self) -> Dict[str, Any]:
"""获取VM状态"""
url = f"{self.base_url}/nodes/proxmox/qemu/{self.config.vm_id}/status/current"
try:
resp = requests.get(url, headers=self.headers, verify=False, timeout=10)
return resp.json()
except Exception as e:
self.logger.error(f"获取VM状态失败: {e}")
return {}
def start_vm(self) -> bool: def start_vm(self) -> bool:
"""启动虚拟机""" """启动VM"""
self.logger.info(f"正在启动VM {self.config.vm_id}")
url = f"{self.base_url}/nodes/proxmox/qemu/{self.config.vm_id}/status/start"
try: try:
url = f"{self.base_url}/nodes/pve/qemu/{self.config.vm_id}/status/start" resp = requests.post(url, headers=self.headers, verify=False, timeout=30)
response = requests.post(url, headers=self.headers, verify=False, timeout=30) if resp.status_code == 200:
if response.status_code == 200: self.logger.info("VM启动命令已发送")
logger.info(f"✅ VM {self.config.vm_id} 启动命令已发送")
return True return True
else:
logger.error(f"❌ 启动VM失败: {response.status_code} - {response.text}")
return False
except Exception as e: except Exception as e:
logger.error(f"启动VM异常: {e}") self.logger.error(f"启动VM失败: {e}")
return False
def shutdown_vm(self) -> bool:
"""关闭虚拟机"""
try:
url = f"{self.base_url}/nodes/pve/qemu/{self.config.vm_id}/status/shutdown"
response = requests.post(url, headers=self.headers, verify=False, timeout=60)
if response.status_code == 200:
logger.info(f"✅ VM {self.config.vm_id} 关闭命令已发送")
return True
else:
logger.error(f"❌ 关闭VM失败: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"❌ 关闭VM异常: {e}")
return False
def get_vm_status(self) -> Dict[str, Any]:
"""获取虚拟机状态"""
try:
url = f"{self.base_url}/nodes/pve/qemu/{self.config.vm_id}/status/current"
response = requests.get(url, headers=self.headers, verify=False, timeout=10)
if response.status_code == 200:
return response.json().get('data', {})
return {}
except Exception as e:
logger.error(f"❌ 获取VM状态异常: {e}")
return {}
class SSHExecutor:
"""SSH 执行器"""
def __init__(self, config: Config):
self.config = config
def execute(self, command: str, timeout: int = 300) -> tuple:
"""在VM中执行命令"""
try:
key_path = os.path.expanduser(self.config.vm_ssh_key_path)
if not os.path.exists(key_path):
logger.error(f"❌ SSH密钥不存在: {key_path}")
return False, f"SSH key not found: {key_path}"
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(
hostname=self.config.vm_ssh_host,
port=self.config.vm_ssh_port,
username=self.config.vm_ssh_user,
key_filename=key_path,
timeout=30
)
logger.info(f"🔌 SSH连接到VM成功")
stdin, stdout, stderr = client.exec_command(command, timeout=timeout)
exit_code = stdout.channel.recv_exit_status()
output = stdout.read().decode('utf-8')
error = stderr.read().decode('utf-8')
client.close()
if exit_code == 0:
logger.info(f"✅ 命令执行成功: {command[:50]}...")
return True, output
else:
logger.error(f"❌ 命令执行失败: {command[:50]}... (exit code: {exit_code})")
return False, error
except Exception as e:
logger.error(f"❌ SSH执行异常: {e}")
return False, str(e)
class CommandProcessor:
"""命令处理器"""
def __init__(self, config: Config):
self.config = config
self.proxmox = ProxmoxAPI(config)
self.ssh = SSHExecutor(config)
self.processed_commands = set()
def load_processed_commands(self):
"""加载已处理的命令记录"""
record_file = Path(self.config.command_dir) / '.processed_commands'
if record_file.exists():
with open(record_file, 'r') as f:
self.processed_commands = set(json.load(f))
def save_processed_commands(self):
"""保存已处理的命令记录"""
record_file = Path(self.config.command_dir) / '.processed_commands'
with open(record_file, 'w') as f:
json.dump(list(self.processed_commands), f)
def report_status(self, status: str, details: str = ""):
"""汇报状态"""
if self.config.status_report_url:
try:
payload = {
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'status': status,
'details': details
}
requests.post(self.config.status_report_url, json=payload, timeout=10)
except Exception as e:
logger.error(f"❌ 状态汇报失败: {e}")
logger.info(f"📊 状态更新: {status} - {details}")
def wait_for_vm_ready(self, max_wait: int = 120) -> bool:
"""等待VM就绪"""
logger.info(f"⏳ 等待VM {self.config.vm_id} 就绪...")
start_time = time.time()
while time.time() - start_time < max_wait:
status = self.proxmox.get_vm_status()
if status.get('status') == 'running':
# 等待SSH可用
time.sleep(10)
try:
self.ssh.execute('echo "VM ready"', timeout=10)
logger.info("✅ VM已就绪")
return True
except:
logger.info("⏳ VM运行中等待SSH...")
time.sleep(5)
logger.error("❌ 等待VM就绪超时")
return False return False
def process_command(self, command_file: Path): def stop_vm(self) -> bool:
"""关闭VM"""
self.logger.info(f"正在关闭VM {self.config.vm_id}")
url = f"{self.base_url}/nodes/proxmox/qemu/{self.config.vm_id}/status/stop"
try:
resp = requests.post(url, headers=self.headers, verify=False, timeout=30)
if resp.status_code == 200:
self.logger.info("VM关闭命令已发送")
return True
except Exception as e:
self.logger.error(f"关闭VM失败: {e}")
return False
def ssh_execute(self, command: str) -> tuple[int, str, str]:
"""在VM内执行命令"""
ssh_cmd = [
"ssh",
"-i", self.config.ssh_key_path,
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=30",
f"{self.config.ssh_user}@localhost",
command
]
try:
result = subprocess.run(ssh_cmd, capture_output=True, text=True, timeout=120)
return result.returncode, result.stdout, result.stderr
except subprocess.TimeoutExpired:
return -1, "", "命令执行超时"
except Exception as e:
return -1, "", str(e)
def wait_for_vm_ready(self, timeout: int = 120) -> bool:
"""等待VM启动完成并可以SSH连接"""
start_time = time.time()
while time.time() - start_time < timeout:
status = self.vm_status()
if status.get("status") == "running":
# 测试SSH连接
ret, _, _ = self.ssh_execute("echo 'VM ready'")
if ret == 0:
self.logger.info("VM已就绪")
return True
time.sleep(5)
self.logger.error("VM启动超时")
return False
def process_command(self, cmd_file: Path) -> bool:
"""处理单个命令文件""" """处理单个命令文件"""
if command_file.name in self.processed_commands: self.logger.info(f"处理命令文件: {cmd_file}")
return
logger.info(f"📋 处理命令文件: {command_file.name}")
# 读取命令
try: try:
with open(command_file, 'r') as f: with open(cmd_file, 'r') as f:
command_data = json.load(f) task_data = json.load(f)
task_id = command_data.get('task_id', 'unknown')
command = command_data.get('command', '')
self.report_status('starting_vm', f'Task {task_id}')
# 1. 启动VM
if not self.proxmox.start_vm():
self.report_status('vm_start_failed', f'Task {task_id}')
return
# 2. 等待VM就绪
if not self.wait_for_vm_ready():
self.report_status('vm_ready_timeout', f'Task {task_id}')
self.proxmox.shutdown_vm()
return
self.report_status('executing_task', f'Task {task_id}')
# 3. 执行命令
success, output = self.ssh.execute(command, timeout=300)
if success:
self.report_status('task_completed', f'Task {task_id}')
logger.info(f"✅ 任务完成: {task_id}")
else:
self.report_status('task_failed', f'Task {task_id}: {output}')
logger.error(f"❌ 任务失败: {task_id}")
# 4. 关闭VM
self.report_status('shutting_down_vm', f'Task {task_id}')
self.proxmox.shutdown_vm()
# 5. 移动命令文件到完成目录
done_dir = Path(self.config.command_dir) / 'done'
done_dir.mkdir(exist_ok=True)
command_file.rename(done_dir / command_file.name)
# 6. 记录已处理
self.processed_commands.add(command_file.name)
self.save_processed_commands()
except Exception as e: except Exception as e:
logger.error(f"❌ 处理命令异常: {e}") self.logger.error(f"读取命令文件失败: {e}")
self.report_status('command_error', str(e)) return False
class TaskController: task_id = task_data.get("task_id", "unknown")
"""主任务控制器""" command = task_data.get("command", "")
def __init__(self):
self.config = ConfigLoader.load()
self.processor = CommandProcessor(self.config)
self.running = True
def setup(self): # 更新状态
"""初始化设置""" self.update_status(task_id, "starting", "准备启动VM")
logger.info("🚀 Proxmox任务控制器启动")
logger.info(f"📁 监控命令目录: {self.config.command_dir}")
# 创建命令目录 # 启动VM
Path(self.config.command_dir).mkdir(parents=True, exist_ok=True) if not self.start_vm():
self.update_status(task_id, "failed", "启动VM失败")
return False
# 加载已处理命令记录 # 等待VM就绪
self.processor.load_processed_commands() if not self.wait_for_vm_ready():
self.update_status(task_id, "failed", "VM启动超时")
self.stop_vm()
return False
# 检查Proxmox连接 self.update_status(task_id, "running", "VM已启动开始执行任务")
try:
status = self.processor.proxmox.get_vm_status() # 执行命令
if status: ret, stdout, stderr = self.ssh_execute(command)
logger.info(f"✅ Proxmox连接正常VM ID: {self.config.vm_id}")
else: if ret == 0:
logger.error("❌ Proxmox连接失败") self.update_status(task_id, "success", "任务执行成功", stdout)
except Exception as e: else:
logger.error(f"❌ Proxmox连接异常: {e}") self.update_status(task_id, "failed", "任务执行失败", stderr)
# 关闭VM
self.stop_vm()
# 移动命令文件到完成目录
done_dir = cmd_file.parent / "done"
done_dir.mkdir(exist_ok=True)
cmd_file.rename(done_dir / cmd_file.name)
return ret == 0
def update_status(self, task_id: str, status: str, message: str, output: str = ""):
"""更新任务状态"""
status_file = Path(self.config.status_dir) / f"{task_id}.json"
status_file.parent.mkdir(exist_ok=True)
status_data = {
"task_id": task_id,
"status": status,
"message": message,
"output": output,
"timestamp": time.time()
}
with open(status_file, 'w') as f:
json.dump(status_data, f, indent=2)
def run(self): def run(self):
"""主运行循环""" """主运行循环"""
self.setup() self.logger.info("Proxmox任务控制器启动")
logger.info("⏳ 进入监听模式,等待命令...") # 创建必要目录
Path(self.config.command_dir).mkdir(exist_ok=True)
Path(self.config.status_dir).mkdir(exist_ok=True)
while self.running: while True:
try: try:
command_dir = Path(self.config.command_dir) # 检查新命令文件
command_files = list(command_dir.glob('*.json')) cmd_files = sorted(Path(self.config.command_dir).glob("*.json"))
for cmd_file in command_files: for cmd_file in cmd_files:
if cmd_file.name not in self.processor.processed_commands: self.process_command(cmd_file)
self.processor.process_command(cmd_file)
time.sleep(5) # 等待下次检查
time.sleep(self.config.check_interval)
except KeyboardInterrupt: except KeyboardInterrupt:
logger.info("🛑 收到止信号") self.logger.info("收到止信号,退出")
self.running = False break
except Exception as e: except Exception as e:
logger.error(f"主循环异常: {e}") self.logger.error(f"主循环错误: {e}")
time.sleep(10) time.sleep(self.config.check_interval)
logger.info("👋 Proxmox任务控制器停止")
def main(): def main():
"""主函数""" # 从环境变量加载配置
controller = TaskController() config = Config(
proxmox_host=os.getenv("PROXMOX_HOST", "localhost"),
proxmox_user=os.getenv("PROXMOX_USER", "root@pam"),
proxmox_token=os.getenv("PROXMOX_TOKEN", ""),
vm_id=int(os.getenv("VM_ID", "100")),
ssh_user=os.getenv("SSH_USER", "ubuntu"),
ssh_key_path=os.getenv("SSH_KEY_PATH", "~/.ssh/id_rsa"),
)
if not config.proxmox_token:
print("错误: 请设置 PROXMOX_TOKEN 环境变量")
sys.exit(1)
controller = ProxmoxController(config)
controller.run() controller.run()
if __name__ == '__main__': if __name__ == "__main__":
main() main()