添加Proxmox任务控制脚本:监控命令→启动VM→执行任务→汇报→关闭VM

This commit is contained in:
xiaji
2026-03-30 13:39:17 +08:00
parent fa665b0395
commit d48dc740d6
3 changed files with 319 additions and 0 deletions

64
proxmox_task/README.md Normal file
View File

@@ -0,0 +1,64 @@
# Proxmox任务控制器
一个用于监控命令文件、启动Proxmox虚拟机、执行任务、汇报结果并关闭虚拟机的Python脚本。
## 功能特性
- 监控指定目录中的JSON命令文件
- 通过Proxmox API启动和关闭虚拟机
- 在虚拟机内执行指定命令支持QEMU Guest Agent
- 汇报执行结果到结果目录
- 支持凭证传递(用于需要认证的命令)
- 日志记录到文件和控制台
## 使用方法
### 1. 配置环境变量
```bash
export PROXMOX_HOST="https://your-proxmox-host:8006"
export PROXMOX_TOKEN_ID="your-token-id@pam"
export PROXMOX_TOKEN_SECRET="your-token-secret"
export PROXMOX_VERIFY_SSL="false" # 如果使用自签名证书
```
### 2. 准备命令文件
`/var/task-commands/` 目录中放置JSON格式的命令文件
```json
{
"node": "pve-node-1",
"vmid": 100,
"command": "ls -la /tmp && echo 'Hello World'",
"task_id": "my-task-001",
"username": "root",
"password": "your-vm-password",
"keep_running": false
}
```
### 3. 运行控制器
```bash
python3 controller.py
```
### 4. 查看结果
任务执行结果将保存为JSON文件在 `/var/task-results/` 目录中。
## 部署建议
1. 将脚本部署到小主机上
2. 确保小主机能够访问Proxmox服务器的API端口默认8006
3. 为小主机创建系统服务以实现开机自启动
4. 定期清理旧的结果文件
5. 考虑使用更安全的凭证管理方式如HashiCorp Vault或AWS Secrets Manager
## 注意事项
- 确保目标虚拟机已安装并运行QEMU Guest Agent为了使用agent/exec API
- 或者修改脚本使用SSH方式在VM内执行命令
- 生产环境中应使用更安全的方式处理凭证
- 建议在测试环境中先验证功能

254
proxmox_task/controller.py Normal file
View File

@@ -0,0 +1,254 @@
#!/usr/bin/env python3
"""
Proxmox虚拟机任务控制器
监控命令文件启动VM执行任务汇报结果关闭VM
"""
import os
import time
import json
import logging
import requests
import subprocess
import threading
from datetime import datetime
from pathlib import Path
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/proxmox_task.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class ProxmoxTaskController:
def __init__(self):
# 从环境变量读取配置(实际部署中应从安全配置文件或密钥管理系统读取)
self.proxmox_host = os.getenv('PROXMOX_HOST', 'https://proxmox.example.com:8006')
self.proxmox_token_id = os.getenv('PROXMOX_TOKEN_ID', 'api-token@pam')
self.proxmox_token_secret = os.getenv('PROXMOX_TOKEN_SECRET', '')
self.verify_ssl = os.getenv('PROXMOX_VERIFY_SSL', 'false').lower() == 'true'
# 命令监控目录
self.command_dir = Path('/var/task-commands')
self.command_dir.mkdir(exist_ok=True)
# 结果目录
self.result_dir = Path('/var/task-results')
self.result_dir.mkdir(exist_ok=True)
# 会话对象
self.session = requests.Session()
self.session.verify = self.verify_ssl
# 设置API认证头
if self.proxmox_token_id and self.proxmox_token_secret:
self.session.headers.update({
'Authorization': f'PVEAPIToken={self.proxmox_token_id}={self.proxmox_token_secret}'
})
logger.info("Proxmox任务控制器初始化完成")
def get_vm_status(self, node, vmid):
"""获取VM状态"""
try:
url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/status/current"
response = self.session.get(url)
response.raise_for_status()
return response.json()['data']['status']
except Exception as e:
logger.error(f"获取VM状态失败: {e}")
return None
def start_vm(self, node, vmid):
"""启动VM"""
try:
url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/status/start"
response = self.session.post(url)
response.raise_for_status()
logger.info(f"VM {vmid} 启动命令已发送")
return True
except Exception as e:
logger.error(f"启动VM失败: {e}")
return False
def stop_vm(self, node, vmid):
"""关闭VM"""
try:
url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/status/shutdown"
response = self.session.post(url)
response.raise_for_status()
logger.info(f"VM {vmid} 关闭命令已发送")
return True
except Exception as e:
logger.error(f"关闭VM失败: {e}")
return False
def execute_vm_command(self, node, vmid, command, username=None, password=None):
"""在VM内执行命令使用QEMU Guest Agent或SSH"""
# 这里简化处理实际应根据环境选择QGA或SSH
try:
# 使用QEMU Guest Agent执行命令
url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/agent/exec"
payload = {
'command': command,
'timeout': 300 # 5分钟超时
}
if username and password:
payload['username'] = username
payload['password'] = password
response = self.session.post(url, json=payload)
response.raise_for_status()
result = response.json()
pid = result['data']['pid']
# 等待命令完成并获取输出
time.sleep(2) # 简单等待,实际应轮询直到完成
# 获取执行结果
out_url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/agent/exec-status"
out_payload = {'pid': pid}
out_response = self.session.post(out_url, json=out_payload)
out_response.raise_for_status()
out_data = out_response.json()['data']
return {
'exitcode': out_data.get('exitcode', -1),
'out': out_data.get('out', ''),
'err': out_data.get('err', '')
}
except Exception as e:
logger.error(f"在VM内执行命令失败: {e}")
return {'exitcode': -1, 'out': '', 'err': str(e)}
def process_command_file(self, command_file):
"""处理命令文件"""
try:
with open(command_file, 'r', encoding='utf-8') as f:
command_data = json.load(f)
logger.info(f"处理命令文件: {command_file}")
# 提取命令参数
node = command_data.get('node')
vmid = command_data.get('vmid')
task_command = command_data.get('command')
task_id = command_data.get('task_id', f"task_{int(time.time())}")
username = command_data.get('username')
password = command_data.get('password')
if not all([node, vmid, task_command]):
logger.error("命令文件缺少必要参数")
return False
# 1. 检查VM状态
status = self.get_vm_status(node, vmid)
logger.info(f"VM {vmid} 当前状态: {status}")
# 2. 如果VM未运行则启动它
if status != 'running':
logger.info(f"启动VM {vmid}")
if not self.start_vm(node, vmid):
logger.error(f"启动VM {vmid} 失败")
return False
# 等待VM完全启动
logger.info("等待VM启动...")
for _ in range(30): # 最多等待30秒
time.sleep(1)
status = self.get_vm_status(node, vmid)
if status == 'running':
break
else:
logger.error("VM启动超时")
return False
# 3. 执行任务命令
logger.info(f"在VM {vmid} 中执行任务: {task_command}")
result = self.execute_vm_command(node, vmid, task_command, username, password)
# 4. 汇报结果
result_file = self.result_dir / f"{task_id}_result.json"
result_data = {
'task_id': task_id,
'timestamp': datetime.now().isoformat(),
'node': node,
'vmid': vmid,
'command': task_command,
'exitcode': result['exitcode'],
'output': result['out'],
'error': result['err'],
'success': result['exitcode'] == 0
}
with open(result_file, 'w', encoding='utf-8') as f:
json.dump(result_data, f, indent=2, ensure_ascii=False)
logger.info(f"任务结果已保存到: {result_file}")
# 5. 关闭VM除非配置为保持运行
if not command_data.get('keep_running', False):
logger.info(f"关闭VM {vmid}")
self.stop_vm(node, vmid)
else:
logger.info(f"VM {vmid} 保持运行状态")
# 6. 移除命令文件(避免重复处理)
command_file.unlink()
logger.info(f"命令文件已处理并删除: {command_file}")
return True
except Exception as e:
logger.error(f"处理命令文件时发生错误: {e}")
return False
def monitor_commands(self):
"""监控命令目录"""
logger.info(f"开始监控命令目录: {self.command_dir}")
while True:
try:
# 查找新的命令文件
command_files = list(self.command_dir.glob("*.json"))
for command_file in command_files:
# 简单的文件锁机制:尝试重命名
try:
locked_file = command_file.with_suffix('.json.lock')
command_file.rename(locked_file)
self.process_command_file(locked_file)
locked_file.unlink() # 处理完成后删除锁文件
except FileNotFoundError:
# 文件可能已被其他进程处理
pass
except Exception as e:
logger.error(f"处理命令文件 {command_file} 时出错: {e}")
# 休眠一段时间再检查
time.sleep(5)
except KeyboardInterrupt:
logger.info("收到中断信号,停止监控")
break
except Exception as e:
logger.error(f"监控循环发生错误: {e}")
time.sleep(10) # 出错后稍长时间再试
def main():
"""主函数"""
controller = ProxmoxTaskController()
try:
controller.monitor_commands()
except Exception as e:
logger.error(f"控制器运行失败: {e}")
return 1
return 0
if __name__ == "__main__":
exit(main())

View File

@@ -0,0 +1 @@
requests>=2.25.1