Files
djangohelper/threads.py

948 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import time
from loguru import logger
from PySide6.QtCore import QThread, Signal
import paramiko
class PasswordDialog:
def __init__(self, parent=None):
self.parent = parent
self.password = None
def get_password(self, prompt="请输入sudo密码:"):
# 这里简化实现实际应该使用QInputDialog
if self.parent:
from PySide6.QtWidgets import QInputDialog
password, ok = QInputDialog.getText(self.parent, "sudo密码", prompt,
echo=QInputDialog.Password)
if ok:
self.password = password
return password
return None
class SSHConnectionThread(QThread):
connection_status = Signal(bool, str)
def __init__(self, host, username, password, port=22):
super().__init__()
self.host = host
self.username = username
self.password = password
self.port = port
def run(self):
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self.host, port=self.port, username=self.username, password=self.password, timeout=10)
# 测试连接
stdin, stdout, stderr = ssh.exec_command("echo 'Connection test'")
output = stdout.read().decode().strip()
if output == "Connection test":
self.connection_status.emit(True, f"成功连接到 {self.host}")
logger.info(f"SSH连接成功: {self.host}")
else:
self.connection_status.emit(False, "连接测试失败")
logger.error("SSH连接测试失败")
ssh.close()
except Exception as e:
error_msg = str(e)
self.connection_status.emit(False, error_msg)
logger.error(f"SSH连接失败: {error_msg}")
class GitInstallThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client):
super().__init__()
self.ssh_client = ssh_client
def run(self):
try:
# 检查Git是否已安装
stdin, stdout, stderr = self.ssh_client.exec_command("git --version")
git_version = stdout.read().decode().strip()
if git_version:
self.result_ready.emit(True, f"Git已安装: {git_version}")
logger.info(f"Git已安装: {git_version}")
return
# 安装Git
stdin, stdout, stderr = self.ssh_client.exec_command("sudo apt update && sudo apt install -y git")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
stdin, stdout, stderr = self.ssh_client.exec_command("git --version")
git_version = stdout.read().decode().strip()
self.result_ready.emit(True, f"Git安装成功: {git_version}")
logger.info(f"Git安装成功: {git_version}")
else:
error = stderr.read().decode()
self.result_ready.emit(False, f"Git安装失败: {error}")
logger.error(f"Git安装失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Git安装异常: {error_msg}")
class GitCloneThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, git_url, project_path):
super().__init__()
self.ssh_client = ssh_client
self.git_url = git_url
self.project_path = project_path
def run(self):
try:
# 检查目录是否存在
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.project_path}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.result_ready.emit(False, f"目录 {self.project_path} 已存在")
logger.warning(f"目录已存在: {self.project_path}")
return
# 创建父目录
parent_dir = os.path.dirname(self.project_path)
if parent_dir:
stdin, stdout, stderr = self.ssh_client.exec_command(f"mkdir -p {parent_dir}")
# 克隆仓库
stdin, stdout, stderr = self.ssh_client.exec_command(f"cd {parent_dir} && git clone {self.git_url} {os.path.basename(self.project_path)}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.result_ready.emit(True, f"Git克隆成功: {self.project_path}")
logger.info(f"Git克隆成功: {self.project_path}")
else:
error = stderr.read().decode()
self.result_ready.emit(False, f"Git克隆失败: {error}")
logger.error(f"Git克隆失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Git克隆异常: {error_msg}")
class ListDirectoryThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, path):
super().__init__()
self.ssh_client = ssh_client
self.path = path
def run(self):
try:
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.path}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
output = stdout.read().decode()
self.result_ready.emit(True, output)
logger.info(f"目录列表成功: {self.path}")
else:
error = stderr.read().decode()
self.result_ready.emit(False, error)
logger.error(f"目录列表失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"目录列表异常: {error_msg}")
class DeleteDirectoryThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, path):
super().__init__()
self.ssh_client = ssh_client
self.path = path
def run(self):
try:
stdin, stdout, stderr = self.ssh_client.exec_command(f"rm -rf {self.path}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.result_ready.emit(True, f"目录删除成功: {self.path}")
logger.info(f"目录删除成功: {self.path}")
else:
error = stderr.read().decode()
self.result_ready.emit(False, error)
logger.error(f"目录删除失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"目录删除异常: {error_msg}")
class SetTimezoneAndRestartThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client):
super().__init__()
self.ssh_client = ssh_client
def run(self):
try:
# 设置时区
stdin, stdout, stderr = self.ssh_client.exec_command("sudo timedatectl set-timezone Asia/Shanghai")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
self.result_ready.emit(False, f"设置时区失败: {error}")
logger.error(f"设置时区失败: {error}")
return
# 重启服务器
stdin, stdout, stderr = self.ssh_client.exec_command("sudo reboot")
self.result_ready.emit(True, "时区设置成功,服务器正在重启")
logger.info("时区设置成功,服务器正在重启")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"时区设置异常: {error_msg}")
class CheckFirewallThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client):
super().__init__()
self.ssh_client = ssh_client
def run(self):
try:
# 检查UFW状态
stdin, stdout, stderr = self.ssh_client.exec_command("sudo ufw status")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
output = stdout.read().decode()
self.result_ready.emit(True, output)
logger.info("防火墙状态检查成功")
else:
error = stderr.read().decode()
self.result_ready.emit(False, error)
logger.error(f"防火墙状态检查失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"防火墙状态检查异常: {error_msg}")
class OpenPortThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, port):
super().__init__()
self.ssh_client = ssh_client
self.port = port
def run(self):
try:
# 开放端口
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo ufw allow {self.port}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
# 重新加载防火墙
stdin, stdout, stderr = self.ssh_client.exec_command("sudo ufw reload")
self.result_ready.emit(True, f"端口 {self.port} 开放成功")
logger.info(f"端口 {self.port} 开放成功")
else:
error = stderr.read().decode()
self.result_ready.emit(False, error)
logger.error(f"端口开放失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"端口开放异常: {error_msg}")
class DjangoInstallThread(QThread):
result_ready = Signal(bool, str)
progress_updated = Signal(int)
def __init__(self, ssh_client):
super().__init__()
self.ssh_client = ssh_client
def run(self):
try:
self.progress_updated.emit(10)
# 检查Django是否已安装
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
django_version = stdout.read().decode().strip()
if django_version:
self.result_ready.emit(True, f"Django已安装: {django_version}")
logger.info(f"Django已安装: {django_version}")
return
self.progress_updated.emit(30)
# 尝试使用pip安装
stdin, stdout, stderr = self.ssh_client.exec_command("pip3 install --break-system-packages django")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.progress_updated.emit(90)
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
django_version = stdout.read().decode().strip()
self.result_ready.emit(True, f"Django安装成功: {django_version}")
logger.info(f"Django安装成功: {django_version}")
return
self.progress_updated.emit(50)
# 如果pip安装失败尝试使用apt安装
stdin, stdout, stderr = self.ssh_client.exec_command("sudo apt update && sudo apt install -y python3-django")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.progress_updated.emit(90)
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
django_version = stdout.read().decode().strip()
self.result_ready.emit(True, f"Django安装成功: {django_version}")
logger.info(f"Django安装成功: {django_version}")
else:
error = stderr.read().decode()
self.result_ready.emit(False, f"Django安装失败: {error}")
logger.error(f"Django安装失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Django安装异常: {error_msg}")
class DjangoTestThread(QThread):
result_ready = Signal(bool, str)
progress_updated = Signal(int)
def __init__(self, ssh_client, django_path):
super().__init__()
self.ssh_client = ssh_client
self.django_path = django_path
def run(self):
try:
self.progress_updated.emit(10)
# 检查Django项目是否存在
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/manage.py")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
self.result_ready.emit(False, f"Django项目不存在: {self.django_path}")
logger.error(f"Django项目不存在: {self.django_path}")
return
self.progress_updated.emit(30)
# 检查requirements.txt
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/requirements.txt")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.progress_updated.emit(50)
# 安装依赖
stdin, stdout, stderr = self.ssh_client.exec_command(f"cd {self.django_path} && pip3 install --break-system-packages -r requirements.txt")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
self.result_ready.emit(False, f"依赖安装失败: {error}")
logger.error(f"依赖安装失败: {error}")
return
self.progress_updated.emit(70)
# 运行Django测试服务器
stdin, stdout, stderr = self.ssh_client.exec_command(f"cd {self.django_path} && timeout 10 python3 manage.py runserver 0.0.0.0:8000 --noreload &")
time.sleep(3) # 等待服务器启动
# 检查服务器是否运行
stdin, stdout, stderr = self.ssh_client.exec_command("ps aux | grep 'manage.py runserver'")
output = stdout.read().decode()
if "manage.py runserver" in output:
self.progress_updated.emit(100)
self.result_ready.emit(True, "Django测试服务器启动成功")
logger.info("Django测试服务器启动成功")
else:
self.result_ready.emit(False, "Django测试服务器启动失败")
logger.error("Django测试服务器启动失败")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Django测试异常: {error_msg}")
class DownloadSettingsThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, django_path):
super().__init__()
self.ssh_client = ssh_client
self.django_path = django_path
def run(self):
try:
# 查找settings.py文件
stdin, stdout, stderr = self.ssh_client.exec_command(f"find {self.django_path} -name settings.py")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
self.result_ready.emit(False, "未找到settings.py文件")
logger.error("未找到settings.py文件")
return
settings_path = stdout.read().decode().strip()
# 下载settings.py内容
stdin, stdout, stderr = self.ssh_client.exec_command(f"cat {settings_path}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
settings_content = stdout.read().decode()
self.result_ready.emit(True, settings_content)
logger.info("settings.py下载成功")
else:
error = stderr.read().decode()
self.result_ready.emit(False, error)
logger.error(f"settings.py下载失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"settings.py下载异常: {error_msg}")
class UploadSettingsThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, django_path, settings_content):
super().__init__()
self.ssh_client = ssh_client
self.django_path = django_path
self.settings_content = settings_content
def run(self):
try:
# 查找settings.py文件
stdin, stdout, stderr = self.ssh_client.exec_command(f"find {self.django_path} -name settings.py")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
self.result_ready.emit(False, "未找到settings.py文件")
logger.error("未找到settings.py文件")
return
settings_path = stdout.read().decode().strip()
# 创建临时文件
temp_file = "/tmp/settings_upload.py"
sftp = self.ssh_client.open_sftp()
with sftp.file(temp_file, 'w') as f:
f.write(self.settings_content)
# 移动到目标位置
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo mv {temp_file} {settings_path}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.result_ready.emit(True, "settings.py上传成功")
logger.info("settings.py上传成功")
else:
error = stderr.read().decode()
self.result_ready.emit(False, error)
logger.error(f"settings.py上传失败: {error}")
sftp.close()
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"settings.py上传异常: {error_msg}")
class CollectStaticThread(QThread):
result_ready = Signal(bool, str)
progress_updated = Signal(int)
def __init__(self, ssh_client, django_path):
super().__init__()
self.ssh_client = ssh_client
self.django_path = django_path
def run(self):
try:
self.progress_updated.emit(10)
# 检查Django项目是否存在
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/manage.py")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
self.result_ready.emit(False, f"Django项目不存在: {self.django_path}")
logger.error(f"Django项目不存在: {self.django_path}")
return
self.progress_updated.emit(30)
# 收集静态文件
stdin, stdout, stderr = self.ssh_client.exec_command(f"cd {self.django_path} && python3 manage.py collectstatic --noinput")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.progress_updated.emit(100)
output = stdout.read().decode()
self.result_ready.emit(True, f"静态文件收集成功: {output}")
logger.info("静态文件收集成功")
else:
error = stderr.read().decode()
self.result_ready.emit(False, error)
logger.error(f"静态文件收集失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"静态文件收集异常: {error_msg}")
class CheckDjangoStatusThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, django_path):
super().__init__()
self.ssh_client = ssh_client
self.django_path = django_path
def run(self):
try:
status_info = []
# 检查Django项目是否存在
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/manage.py")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
self.result_ready.emit(False, f"Django项目不存在: {self.django_path}")
logger.error(f"Django项目不存在: {self.django_path}")
return
status_info.append("✓ Django项目存在")
# 检查Django版本
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
django_version = stdout.read().decode().strip()
status_info.append(f"✓ Django版本: {django_version}")
# 检查requirements.txt
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/requirements.txt")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
status_info.append("✓ requirements.txt存在")
else:
status_info.append("✗ requirements.txt不存在")
# 检查settings.py
stdin, stdout, stderr = self.ssh_client.exec_command(f"find {self.django_path} -name settings.py")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
status_info.append("✓ settings.py存在")
else:
status_info.append("✗ settings.py不存在")
# 检查静态文件目录
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/static")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
status_info.append("✓ static目录存在")
else:
status_info.append("✗ static目录不存在")
status_text = "\n".join(status_info)
self.result_ready.emit(True, status_text)
logger.info("Django状态检查完成")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Django状态检查异常: {error_msg}")
class GunicornInstallThread(QThread):
result_ready = Signal(bool, str)
progress_updated = Signal(int)
def __init__(self, ssh_client, password=None):
super().__init__()
self.ssh_client = ssh_client
self.password = password
def run(self):
try:
self.progress_updated.emit(10)
# 检查Gunicorn是否已安装
stdin, stdout, stderr = self.ssh_client.exec_command("gunicorn --version")
gunicorn_version = stdout.read().decode().strip()
if gunicorn_version:
self.result_ready.emit(True, f"Gunicorn已安装: {gunicorn_version}")
logger.info(f"Gunicorn已安装: {gunicorn_version}")
return
self.progress_updated.emit(30)
# 尝试使用pip安装
stdin, stdout, stderr = self.ssh_client.exec_command("pip3 install gunicorn")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.progress_updated.emit(90)
stdin, stdout, stderr = self.ssh_client.exec_command("gunicorn --version")
gunicorn_version = stdout.read().decode().strip()
self.result_ready.emit(True, f"Gunicorn安装成功: {gunicorn_version}")
logger.info(f"Gunicorn安装成功: {gunicorn_version}")
return
self.progress_updated.emit(50)
# 如果pip安装失败尝试使用apt安装
if self.password:
stdin, stdout, stderr = self.ssh_client.exec_command("sudo -S apt update && sudo -S apt install -y gunicorn")
stdin.write(f"{self.password}\n")
stdin.flush()
else:
stdin, stdout, stderr = self.ssh_client.exec_command("sudo apt update && sudo apt install -y gunicorn")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.progress_updated.emit(90)
stdin, stdout, stderr = self.ssh_client.exec_command("gunicorn --version")
gunicorn_version = stdout.read().decode().strip()
self.result_ready.emit(True, f"Gunicorn安装成功: {gunicorn_version}")
logger.info(f"Gunicorn安装成功: {gunicorn_version}")
else:
error = stderr.read().decode()
self.result_ready.emit(False, f"Gunicorn安装失败: {error}")
logger.error(f"Gunicorn安装失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Gunicorn安装异常: {error_msg}")
class GunicornTestThread(QThread):
result_ready = Signal(bool, str)
progress_updated = Signal(int)
def __init__(self, ssh_client, django_path):
super().__init__()
self.ssh_client = ssh_client
self.django_path = django_path
def run(self):
try:
self.progress_updated.emit(10)
# 检查Django项目是否存在
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/manage.py")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
self.result_ready.emit(False, f"Django项目不存在: {self.django_path}")
logger.error(f"Django项目不存在: {self.django_path}")
return
self.progress_updated.emit(30)
# 检查wsgi.py文件 - 先尝试项目根目录
stdin, stdout, stderr = self.ssh_client.exec_command(f"find {self.django_path} -name wsgi.py -type f")
wsgi_files = stdout.read().decode().strip().split('\n')
if not wsgi_files or wsgi_files == ['']:
self.result_ready.emit(False, f"未找到wsgi.py文件")
logger.error(f"未找到wsgi.py文件")
return
# 使用第一个找到的wsgi.py文件
wsgi_file = wsgi_files[0].strip()
logger.info(f"找到wsgi.py文件: {wsgi_file}")
# 获取项目目录的相对路径
project_dir = os.path.dirname(wsgi_file)
project_name = os.path.basename(project_dir)
# 如果wsgi.py在子目录中调整项目名
if project_dir != self.django_path.rstrip('/'):
# wsgi.py在子目录中使用子目录名作为项目名
project_name = os.path.basename(project_dir)
else:
# wsgi.py在项目根目录中使用manage.py所在目录名作为项目名
project_name = os.path.basename(self.django_path.rstrip('/'))
self.progress_updated.emit(50)
# 测试Gunicorn启动
test_command = f"cd {self.django_path} && timeout 10 gunicorn --workers 1 --bind 0.0.0.0:8001 {project_name}.wsgi:application --timeout 5"
stdin, stdout, stderr = self.ssh_client.exec_command(test_command)
time.sleep(3) # 等待Gunicorn启动
# 检查Gunicorn是否运行
stdin, stdout, stderr = self.ssh_client.exec_command("ps aux | grep gunicorn")
output = stdout.read().decode()
# 清理测试进程
stdin, stdout, stderr = self.ssh_client.exec_command("pkill -f 'gunicorn.*8001'")
if "gunicorn" in output and "8001" in output:
self.progress_updated.emit(100)
self.result_ready.emit(True, f"Gunicorn测试成功 - 项目: {project_name}")
logger.info(f"Gunicorn测试成功 - 项目: {project_name}")
else:
# 尝试更简单的测试方式
self.progress_updated.emit(80)
test_command2 = f"cd {self.django_path} && python3 -c \"import {project_name}.wsgi; print('WSGI导入成功')\""
stdin, stdout, stderr = self.ssh_client.exec_command(test_command2)
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.result_ready.emit(True, f"WSGI配置验证成功 - 项目: {project_name}")
logger.info(f"WSGI配置验证成功 - 项目: {project_name}")
else:
error_output = stderr.read().decode()
self.result_ready.emit(False, f"Gunicorn测试失败: {error_output}")
logger.error(f"Gunicorn测试失败: {error_output}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Gunicorn测试异常: {error_msg}")
class UploadGunicornServiceThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, service_name, service_content, password=None):
super().__init__()
self.ssh_client = ssh_client
self.service_name = service_name
self.service_content = service_content
self.password = password
def run(self):
try:
# 创建服务文件
service_file = f"/etc/systemd/system/{self.service_name}.service"
temp_file = f"/tmp/{self.service_name}.service"
logger.info(f"准备上传服务文件: {service_file}")
sftp = self.ssh_client.open_sftp()
with sftp.file(temp_file, 'w') as f:
f.write(self.service_content)
logger.info(f"临时服务文件创建成功: {temp_file}")
# 移动到systemd目录
if self.password:
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S mv {temp_file} {service_file}")
stdin.write(f"{self.password}\n")
stdin.flush()
else:
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo mv {temp_file} {service_file}")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
logger.error(f"服务文件移动失败: {error}")
self.result_ready.emit(False, f"服务文件移动失败: {error}")
sftp.close()
return
logger.info(f"服务文件移动成功: {service_file}")
# 设置权限
if self.password:
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S chmod 644 {service_file}")
stdin.write(f"{self.password}\n")
stdin.flush()
else:
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo chmod 644 {service_file}")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
logger.error(f"权限设置失败: {error}")
self.result_ready.emit(False, f"权限设置失败: {error}")
sftp.close()
return
logger.info(f"服务文件权限设置成功: {service_file}")
# 验证服务文件是否存在
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {service_file}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
file_info = stdout.read().decode().strip()
logger.info(f"服务文件验证成功: {file_info}")
else:
error = stderr.read().decode()
logger.error(f"服务文件验证失败: {error}")
self.result_ready.emit(False, f"服务文件验证失败: {error}")
sftp.close()
return
# 重新加载systemd
if self.password:
stdin, stdout, stderr = self.ssh_client.exec_command("sudo -S systemctl daemon-reload")
stdin.write(f"{self.password}\n")
stdin.flush()
else:
stdin, stdout, stderr = self.ssh_client.exec_command("sudo systemctl daemon-reload")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
logger.error(f"systemd重新加载失败: {error}")
self.result_ready.emit(False, f"systemd重新加载失败: {error}")
sftp.close()
return
logger.info(f"systemd重新加载成功")
# 验证服务是否被systemd识别
stdin, stdout, stderr = self.ssh_client.exec_command(f"systemctl list-unit-files | grep {self.service_name}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
service_info = stdout.read().decode().strip()
logger.info(f"服务被systemd识别: {service_info}")
else:
error = stderr.read().decode()
logger.warning(f"服务未被systemd识别: {error}")
sftp.close()
self.result_ready.emit(True, f"服务文件上传成功: {service_file}")
logger.info(f"服务文件上传成功: {service_file}")
except Exception as e:
error_msg = str(e)
logger.error(f"服务文件上传异常: {error_msg}")
self.result_ready.emit(False, error_msg)
class ManageGunicornServiceThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, service_name, action, password=None):
super().__init__()
self.ssh_client = ssh_client
self.service_name = service_name
self.action = action
self.password = password
def run(self):
try:
# 构建systemd命令
# 注意systemd命令中不需要.service后缀
service_name_for_systemd = self.service_name
if service_name_for_systemd.endswith('.service'):
service_name_for_systemd = service_name_for_systemd[:-8] # 移除.service后缀
cmd = f"systemctl {self.action} {service_name_for_systemd}"
logger.info(f"执行服务管理命令: {cmd}")
# 首先检查服务文件是否存在
service_file = f"/etc/systemd/system/{self.service_name}.service"
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {service_file}")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
logger.error(f"服务文件不存在: {service_file}, 错误: {error}")
self.result_ready.emit(False, f"服务文件不存在: {self.service_name}.service")
return
file_info = stdout.read().decode().strip()
logger.info(f"服务文件存在: {file_info}")
if self.action == "status":
# 状态命令需要特殊处理
if self.password:
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S {cmd}")
stdin.write(f"{self.password}\n")
stdin.flush()
else:
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo {cmd}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
output = stdout.read().decode()
logger.info(f"服务状态查询成功: {service_name_for_systemd}")
self.result_ready.emit(True, output)
else:
error = stderr.read().decode()
logger.error(f"服务状态查询失败: {error}")
self.result_ready.emit(False, error)
else:
# 其他操作start, stop, restart, enable, disable
if self.password:
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S {cmd}")
stdin.write(f"{self.password}\n")
stdin.flush()
else:
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo {cmd}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
logger.info(f"服务{self.action}成功: {service_name_for_systemd}")
self.result_ready.emit(True, f"服务{self.action}成功: {service_name_for_systemd}")
else:
error = stderr.read().decode()
logger.error(f"服务{self.action}失败: {error}")
self.result_ready.emit(False, error)
except Exception as e:
error_msg = str(e)
logger.error(f"服务管理异常: {error_msg}")
self.result_ready.emit(False, error_msg)