Files
proxmox-task/proxmox_task/controller.py

254 lines
9.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Proxmox虚拟机任务控制器
监控命令文件启动VM执行任务汇报结果关闭VM
"""
import os
import time
import json
import logging
import requests
import subprocess
import threading
from datetime import datetime
from pathlib import Path
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/proxmox_task.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class ProxmoxTaskController:
def __init__(self):
# 从环境变量读取配置(实际部署中应从安全配置文件或密钥管理系统读取)
self.proxmox_host = os.getenv('PROXMOX_HOST', 'https://proxmox.example.com:8006')
self.proxmox_token_id = os.getenv('PROXMOX_TOKEN_ID', 'api-token@pam')
self.proxmox_token_secret = os.getenv('PROXMOX_TOKEN_SECRET', '')
self.verify_ssl = os.getenv('PROXMOX_VERIFY_SSL', 'false').lower() == 'true'
# 命令监控目录
self.command_dir = Path('/var/task-commands')
self.command_dir.mkdir(exist_ok=True)
# 结果目录
self.result_dir = Path('/var/task-results')
self.result_dir.mkdir(exist_ok=True)
# 会话对象
self.session = requests.Session()
self.session.verify = self.verify_ssl
# 设置API认证头
if self.proxmox_token_id and self.proxmox_token_secret:
self.session.headers.update({
'Authorization': f'PVEAPIToken={self.proxmox_token_id}={self.proxmox_token_secret}'
})
logger.info("Proxmox任务控制器初始化完成")
def get_vm_status(self, node, vmid):
"""获取VM状态"""
try:
url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/status/current"
response = self.session.get(url)
response.raise_for_status()
return response.json()['data']['status']
except Exception as e:
logger.error(f"获取VM状态失败: {e}")
return None
def start_vm(self, node, vmid):
"""启动VM"""
try:
url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/status/start"
response = self.session.post(url)
response.raise_for_status()
logger.info(f"VM {vmid} 启动命令已发送")
return True
except Exception as e:
logger.error(f"启动VM失败: {e}")
return False
def stop_vm(self, node, vmid):
"""关闭VM"""
try:
url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/status/shutdown"
response = self.session.post(url)
response.raise_for_status()
logger.info(f"VM {vmid} 关闭命令已发送")
return True
except Exception as e:
logger.error(f"关闭VM失败: {e}")
return False
def execute_vm_command(self, node, vmid, command, username=None, password=None):
"""在VM内执行命令使用QEMU Guest Agent或SSH"""
# 这里简化处理实际应根据环境选择QGA或SSH
try:
# 使用QEMU Guest Agent执行命令
url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/agent/exec"
payload = {
'command': command,
'timeout': 300 # 5分钟超时
}
if username and password:
payload['username'] = username
payload['password'] = password
response = self.session.post(url, json=payload)
response.raise_for_status()
result = response.json()
pid = result['data']['pid']
# 等待命令完成并获取输出
time.sleep(2) # 简单等待,实际应轮询直到完成
# 获取执行结果
out_url = f"{self.proxmox_host}/api2/json/nodes/{node}/qemu/{vmid}/agent/exec-status"
out_payload = {'pid': pid}
out_response = self.session.post(out_url, json=out_payload)
out_response.raise_for_status()
out_data = out_response.json()['data']
return {
'exitcode': out_data.get('exitcode', -1),
'out': out_data.get('out', ''),
'err': out_data.get('err', '')
}
except Exception as e:
logger.error(f"在VM内执行命令失败: {e}")
return {'exitcode': -1, 'out': '', 'err': str(e)}
def process_command_file(self, command_file):
"""处理命令文件"""
try:
with open(command_file, 'r', encoding='utf-8') as f:
command_data = json.load(f)
logger.info(f"处理命令文件: {command_file}")
# 提取命令参数
node = command_data.get('node')
vmid = command_data.get('vmid')
task_command = command_data.get('command')
task_id = command_data.get('task_id', f"task_{int(time.time())}")
username = command_data.get('username')
password = command_data.get('password')
if not all([node, vmid, task_command]):
logger.error("命令文件缺少必要参数")
return False
# 1. 检查VM状态
status = self.get_vm_status(node, vmid)
logger.info(f"VM {vmid} 当前状态: {status}")
# 2. 如果VM未运行则启动它
if status != 'running':
logger.info(f"启动VM {vmid}")
if not self.start_vm(node, vmid):
logger.error(f"启动VM {vmid} 失败")
return False
# 等待VM完全启动
logger.info("等待VM启动...")
for _ in range(30): # 最多等待30秒
time.sleep(1)
status = self.get_vm_status(node, vmid)
if status == 'running':
break
else:
logger.error("VM启动超时")
return False
# 3. 执行任务命令
logger.info(f"在VM {vmid} 中执行任务: {task_command}")
result = self.execute_vm_command(node, vmid, task_command, username, password)
# 4. 汇报结果
result_file = self.result_dir / f"{task_id}_result.json"
result_data = {
'task_id': task_id,
'timestamp': datetime.now().isoformat(),
'node': node,
'vmid': vmid,
'command': task_command,
'exitcode': result['exitcode'],
'output': result['out'],
'error': result['err'],
'success': result['exitcode'] == 0
}
with open(result_file, 'w', encoding='utf-8') as f:
json.dump(result_data, f, indent=2, ensure_ascii=False)
logger.info(f"任务结果已保存到: {result_file}")
# 5. 关闭VM除非配置为保持运行
if not command_data.get('keep_running', False):
logger.info(f"关闭VM {vmid}")
self.stop_vm(node, vmid)
else:
logger.info(f"VM {vmid} 保持运行状态")
# 6. 移除命令文件(避免重复处理)
command_file.unlink()
logger.info(f"命令文件已处理并删除: {command_file}")
return True
except Exception as e:
logger.error(f"处理命令文件时发生错误: {e}")
return False
def monitor_commands(self):
"""监控命令目录"""
logger.info(f"开始监控命令目录: {self.command_dir}")
while True:
try:
# 查找新的命令文件
command_files = list(self.command_dir.glob("*.json"))
for command_file in command_files:
# 简单的文件锁机制:尝试重命名
try:
locked_file = command_file.with_suffix('.json.lock')
command_file.rename(locked_file)
self.process_command_file(locked_file)
locked_file.unlink() # 处理完成后删除锁文件
except FileNotFoundError:
# 文件可能已被其他进程处理
pass
except Exception as e:
logger.error(f"处理命令文件 {command_file} 时出错: {e}")
# 休眠一段时间再检查
time.sleep(5)
except KeyboardInterrupt:
logger.info("收到中断信号,停止监控")
break
except Exception as e:
logger.error(f"监控循环发生错误: {e}")
time.sleep(10) # 出错后稍长时间再试
def main():
"""主函数"""
controller = ProxmoxTaskController()
try:
controller.monitor_commands()
except Exception as e:
logger.error(f"控制器运行失败: {e}")
return 1
return 0
if __name__ == "__main__":
exit(main())