Files
proxmox-task/proxmox_task_controller.py

220 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Proxmox VM 任务控制器
小主机待机监听 → 接受命令 → 启动VM → 执行任务 → 汇报进展 → 关闭VM
"""
import os
import sys
import time
import json
import logging
import subprocess
import requests
from pathlib import Path
from typing import Dict, Any, Optional
from dataclasses import dataclass
# 配置
@dataclass
class Config:
proxmox_host: str
proxmox_user: str
proxmox_token: str
vm_id: int
ssh_user: str
ssh_key_path: str
command_dir: str = "/var/task-commands"
status_dir: str = "/var/task-status"
check_interval: int = 10 # 检查间隔(秒)
class ProxmoxController:
def __init__(self, config: Config):
self.config = config
self.base_url = f"https://{config.proxmox_host}:8006/api2/json"
self.headers = {
"Authorization": f"PVEAPIToken={config.proxmox_user}={config.token}"
}
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def vm_status(self) -> Dict[str, Any]:
"""获取VM状态"""
url = f"{self.base_url}/nodes/proxmox/qemu/{self.config.vm_id}/status/current"
try:
resp = requests.get(url, headers=self.headers, verify=False, timeout=10)
return resp.json()
except Exception as e:
self.logger.error(f"获取VM状态失败: {e}")
return {}
def start_vm(self) -> bool:
"""启动VM"""
self.logger.info(f"正在启动VM {self.config.vm_id}")
url = f"{self.base_url}/nodes/proxmox/qemu/{self.config.vm_id}/status/start"
try:
resp = requests.post(url, headers=self.headers, verify=False, timeout=30)
if resp.status_code == 200:
self.logger.info("VM启动命令已发送")
return True
except Exception as e:
self.logger.error(f"启动VM失败: {e}")
return False
def stop_vm(self) -> bool:
"""关闭VM"""
self.logger.info(f"正在关闭VM {self.config.vm_id}")
url = f"{self.base_url}/nodes/proxmox/qemu/{self.config.vm_id}/status/stop"
try:
resp = requests.post(url, headers=self.headers, verify=False, timeout=30)
if resp.status_code == 200:
self.logger.info("VM关闭命令已发送")
return True
except Exception as e:
self.logger.error(f"关闭VM失败: {e}")
return False
def ssh_execute(self, command: str) -> tuple[int, str, str]:
"""在VM内执行命令"""
ssh_cmd = [
"ssh",
"-i", self.config.ssh_key_path,
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=30",
f"{self.config.ssh_user}@localhost",
command
]
try:
result = subprocess.run(ssh_cmd, capture_output=True, text=True, timeout=120)
return result.returncode, result.stdout, result.stderr
except subprocess.TimeoutExpired:
return -1, "", "命令执行超时"
except Exception as e:
return -1, "", str(e)
def wait_for_vm_ready(self, timeout: int = 120) -> bool:
"""等待VM启动完成并可以SSH连接"""
start_time = time.time()
while time.time() - start_time < timeout:
status = self.vm_status()
if status.get("status") == "running":
# 测试SSH连接
ret, _, _ = self.ssh_execute("echo 'VM ready'")
if ret == 0:
self.logger.info("VM已就绪")
return True
time.sleep(5)
self.logger.error("VM启动超时")
return False
def process_command(self, cmd_file: Path) -> bool:
"""处理单个命令文件"""
self.logger.info(f"处理命令文件: {cmd_file}")
# 读取命令
try:
with open(cmd_file, 'r') as f:
task_data = json.load(f)
except Exception as e:
self.logger.error(f"读取命令文件失败: {e}")
return False
task_id = task_data.get("task_id", "unknown")
command = task_data.get("command", "")
# 更新状态
self.update_status(task_id, "starting", "准备启动VM")
# 启动VM
if not self.start_vm():
self.update_status(task_id, "failed", "启动VM失败")
return False
# 等待VM就绪
if not self.wait_for_vm_ready():
self.update_status(task_id, "failed", "VM启动超时")
self.stop_vm()
return False
self.update_status(task_id, "running", "VM已启动开始执行任务")
# 执行命令
ret, stdout, stderr = self.ssh_execute(command)
if ret == 0:
self.update_status(task_id, "success", "任务执行成功", stdout)
else:
self.update_status(task_id, "failed", "任务执行失败", stderr)
# 关闭VM
self.stop_vm()
# 移动命令文件到完成目录
done_dir = cmd_file.parent / "done"
done_dir.mkdir(exist_ok=True)
cmd_file.rename(done_dir / cmd_file.name)
return ret == 0
def update_status(self, task_id: str, status: str, message: str, output: str = ""):
"""更新任务状态"""
status_file = Path(self.config.status_dir) / f"{task_id}.json"
status_file.parent.mkdir(exist_ok=True)
status_data = {
"task_id": task_id,
"status": status,
"message": message,
"output": output,
"timestamp": time.time()
}
with open(status_file, 'w') as f:
json.dump(status_data, f, indent=2)
def run(self):
"""主运行循环"""
self.logger.info("Proxmox任务控制器启动")
# 创建必要目录
Path(self.config.command_dir).mkdir(exist_ok=True)
Path(self.config.status_dir).mkdir(exist_ok=True)
while True:
try:
# 检查新命令文件
cmd_files = sorted(Path(self.config.command_dir).glob("*.json"))
for cmd_file in cmd_files:
self.process_command(cmd_file)
# 等待下次检查
time.sleep(self.config.check_interval)
except KeyboardInterrupt:
self.logger.info("收到终止信号,退出")
break
except Exception as e:
self.logger.error(f"主循环错误: {e}")
time.sleep(self.config.check_interval)
def main():
# 从环境变量加载配置
config = Config(
proxmox_host=os.getenv("PROXMOX_HOST", "localhost"),
proxmox_user=os.getenv("PROXMOX_USER", "root@pam"),
proxmox_token=os.getenv("PROXMOX_TOKEN", ""),
vm_id=int(os.getenv("VM_ID", "100")),
ssh_user=os.getenv("SSH_USER", "ubuntu"),
ssh_key_path=os.getenv("SSH_KEY_PATH", "~/.ssh/id_rsa"),
)
if not config.proxmox_token:
print("错误: 请设置 PROXMOX_TOKEN 环境变量")
sys.exit(1)
controller = ProxmoxController(config)
controller.run()
if __name__ == "__main__":
main()