Files
djangohelper/threads.py

1387 lines
66 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import time
from loguru import logger
from PySide6.QtCore import QThread, Signal
import paramiko
class PasswordDialog:
def __init__(self, parent=None):
self.parent = parent
self.password = None
def get_password(self, prompt="请输入sudo密码:"):
# 这里简化实现实际应该使用QInputDialog
if self.parent:
from PySide6.QtWidgets import QInputDialog
password, ok = QInputDialog.getText(self.parent, "sudo密码", prompt,
echo=QInputDialog.Password)
if ok:
self.password = password
return password
return None
class SSHConnectionThread(QThread):
connection_status = Signal(bool, str)
def __init__(self, host, username, password, port=22):
super().__init__()
self.host = host
self.username = username
self.password = password
self.port = port
def run(self):
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self.host, port=self.port, username=self.username, password=self.password, timeout=10)
# 测试连接
stdin, stdout, stderr = ssh.exec_command("echo 'Connection test'")
output = stdout.read().decode().strip()
if output == "Connection test":
self.connection_status.emit(True, f"成功连接到 {self.host}")
logger.info(f"SSH连接成功: {self.host}")
else:
self.connection_status.emit(False, "连接测试失败")
logger.error("SSH连接测试失败")
ssh.close()
except Exception as e:
error_msg = str(e)
self.connection_status.emit(False, error_msg)
logger.error(f"SSH连接失败: {error_msg}")
class GitInstallThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client):
super().__init__()
self.ssh_client = ssh_client
def run(self):
try:
# 检查Git是否已安装
stdin, stdout, stderr = self.ssh_client.exec_command("git --version")
git_version = stdout.read().decode().strip()
if git_version:
self.result_ready.emit(True, f"Git已安装: {git_version}")
logger.info(f"Git已安装: {git_version}")
return
# 安装Git
stdin, stdout, stderr = self.ssh_client.exec_command("sudo apt update && sudo apt install -y git")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
stdin, stdout, stderr = self.ssh_client.exec_command("git --version")
git_version = stdout.read().decode().strip()
self.result_ready.emit(True, f"Git安装成功: {git_version}")
logger.info(f"Git安装成功: {git_version}")
else:
error = stderr.read().decode()
self.result_ready.emit(False, f"Git安装失败: {error}")
logger.error(f"Git安装失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Git安装异常: {error_msg}")
class GitCloneThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, git_url, project_path):
super().__init__()
self.ssh_client = ssh_client
self.git_url = git_url
self.project_path = project_path
def run(self):
try:
# 检查目录是否存在
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.project_path}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.result_ready.emit(False, f"目录 {self.project_path} 已存在")
logger.warning(f"目录已存在: {self.project_path}")
return
# 创建父目录
parent_dir = os.path.dirname(self.project_path)
if parent_dir:
stdin, stdout, stderr = self.ssh_client.exec_command(f"mkdir -p {parent_dir}")
# 克隆仓库
stdin, stdout, stderr = self.ssh_client.exec_command(f"cd {parent_dir} && git clone {self.git_url} {os.path.basename(self.project_path)}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.result_ready.emit(True, f"Git克隆成功: {self.project_path}")
logger.info(f"Git克隆成功: {self.project_path}")
else:
error = stderr.read().decode()
self.result_ready.emit(False, f"Git克隆失败: {error}")
logger.error(f"Git克隆失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Git克隆异常: {error_msg}")
class ListDirectoryThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, path):
super().__init__()
self.ssh_client = ssh_client
self.path = path
def run(self):
try:
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.path}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
output = stdout.read().decode()
self.result_ready.emit(True, output)
logger.info(f"目录列表成功: {self.path}")
else:
error = stderr.read().decode()
self.result_ready.emit(False, error)
logger.error(f"目录列表失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"目录列表异常: {error_msg}")
class DeleteDirectoryThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, path):
super().__init__()
self.ssh_client = ssh_client
self.path = path
def run(self):
try:
stdin, stdout, stderr = self.ssh_client.exec_command(f"rm -rf {self.path}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.result_ready.emit(True, f"目录删除成功: {self.path}")
logger.info(f"目录删除成功: {self.path}")
else:
error = stderr.read().decode()
self.result_ready.emit(False, error)
logger.error(f"目录删除失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"目录删除异常: {error_msg}")
class SetTimezoneAndRestartThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, password):
super().__init__()
self.ssh_client = ssh_client
self.password = password
def run(self):
try:
# 设置时区,使用-S选项从标准输入读取密码
stdin, stdout, stderr = self.ssh_client.exec_command(f"echo '{self.password}' | sudo -S timedatectl set-timezone Asia/Shanghai")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
self.result_ready.emit(False, f"设置时区失败: {error}")
logger.error(f"设置时区失败: {error}")
return
# 重启服务器,使用-S选项从标准输入读取密码
stdin, stdout, stderr = self.ssh_client.exec_command(f"echo '{self.password}' | sudo -S reboot")
self.result_ready.emit(True, "时区设置成功,服务器正在重启")
logger.info("时区设置成功,服务器正在重启")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"时区设置异常: {error_msg}")
class CheckFirewallThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, password):
super().__init__()
self.ssh_client = ssh_client
self.password = password
def run(self):
try:
# 检查UFW状态使用-S选项从标准输入读取密码
stdin, stdout, stderr = self.ssh_client.exec_command(f"echo '{self.password}' | sudo -S ufw status")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
output = stdout.read().decode()
self.result_ready.emit(True, output)
logger.info("防火墙状态检查成功")
else:
error = stderr.read().decode()
self.result_ready.emit(False, error)
logger.error(f"防火墙状态检查失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"防火墙状态检查异常: {error_msg}")
class OpenPortThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, port, password):
super().__init__()
self.ssh_client = ssh_client
self.port = port
self.password = password
def run(self):
try:
# 开放端口,使用-S选项从标准输入读取密码
stdin, stdout, stderr = self.ssh_client.exec_command(f"echo '{self.password}' | sudo -S ufw allow {self.port}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
# 重新加载防火墙,使用-S选项从标准输入读取密码
stdin, stdout, stderr = self.ssh_client.exec_command(f"echo '{self.password}' | sudo -S ufw reload")
self.result_ready.emit(True, f"端口 {self.port} 开放成功")
logger.info(f"端口 {self.port} 开放成功")
else:
error = stderr.read().decode()
self.result_ready.emit(False, error)
logger.error(f"端口开放失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"端口开放异常: {error_msg}")
class DjangoInstallThread(QThread):
result_ready = Signal(bool, str)
progress_updated = Signal(int)
def __init__(self, ssh_client, password):
super().__init__()
self.ssh_client = ssh_client
self.password = password
def run(self):
try:
self.progress_updated.emit(10)
# 检查Django是否已安装
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
django_version = stdout.read().decode().strip()
if django_version:
self.result_ready.emit(True, f"Django已安装: {django_version}")
logger.info(f"Django已安装: {django_version}")
return
self.progress_updated.emit(30)
# 尝试使用pip安装
stdin, stdout, stderr = self.ssh_client.exec_command("pip3 install --break-system-packages django")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.progress_updated.emit(90)
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
django_version = stdout.read().decode().strip()
self.result_ready.emit(True, f"Django安装成功: {django_version}")
logger.info(f"Django安装成功: {django_version}")
return
self.progress_updated.emit(50)
# 如果pip安装失败尝试使用apt安装使用-S选项从标准输入读取密码
stdin, stdout, stderr = self.ssh_client.exec_command(f"echo '{self.password}' | sudo -S apt update && echo '{self.password}' | sudo -S apt install -y python3-django")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.progress_updated.emit(90)
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
django_version = stdout.read().decode().strip()
self.result_ready.emit(True, f"Django安装成功: {django_version}")
logger.info(f"Django安装成功: {django_version}")
else:
error = stderr.read().decode()
self.result_ready.emit(False, f"Django安装失败: {error}")
logger.error(f"Django安装失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Django安装异常: {error_msg}")
class DjangoTestThread(QThread):
result_ready = Signal(bool, str)
progress_updated = Signal(int)
def __init__(self, ssh_client, django_path):
super().__init__()
self.ssh_client = ssh_client
self.django_path = django_path
def run(self):
try:
self.progress_updated.emit(10)
# 检查Django项目是否存在
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/manage.py")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
self.result_ready.emit(False, f"Django项目不存在: {self.django_path}")
logger.error(f"Django项目不存在: {self.django_path}")
return
self.progress_updated.emit(30)
# 检查requirements.txt
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/requirements.txt")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.progress_updated.emit(50)
# 安装依赖
stdin, stdout, stderr = self.ssh_client.exec_command(f"cd {self.django_path} && pip3 install --break-system-packages -r requirements.txt")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
self.result_ready.emit(False, f"依赖安装失败: {error}")
logger.error(f"依赖安装失败: {error}")
return
self.progress_updated.emit(70)
# 运行Django测试服务器
stdin, stdout, stderr = self.ssh_client.exec_command(f"cd {self.django_path} && timeout 10 python3 manage.py runserver 0.0.0.0:8000 --noreload &")
time.sleep(3) # 等待服务器启动
# 检查服务器是否运行
stdin, stdout, stderr = self.ssh_client.exec_command("ps aux | grep 'manage.py runserver'")
output = stdout.read().decode()
if "manage.py runserver" in output:
self.progress_updated.emit(80)
logger.info("Django测试服务器启动成功开始检查端口访问情况")
# 检查8000端口是否真的在监听
self.progress_updated.emit(85)
stdin, stdout, stderr = self.ssh_client.exec_command("ss -tulnp | grep 8000")
ss_output = stdout.read().decode()
logger.info(f"ss命令检查端口8000结果: {ss_output}")
# 如果ss命令没有结果尝试使用netstat
if not ss_output:
stdin, stdout, stderr = self.ssh_client.exec_command("netstat -tulnp | grep 8000")
netstat_output = stdout.read().decode()
logger.info(f"netstat命令检查端口8000结果: {netstat_output}")
port_check_output = netstat_output
else:
port_check_output = ss_output
# 检查端口是否在LISTEN状态
if "LISTEN" in port_check_output and "8000" in port_check_output:
self.progress_updated.emit(90)
logger.info("端口8000处于LISTEN状态开始本地请求测试")
# 本地发起请求测试
stdin, stdout, stderr = self.ssh_client.exec_command("curl -s http://127.0.0.1:8000")
curl_output = stdout.read().decode()
curl_exit_status = stdout.channel.recv_exit_status()
logger.info(f"curl本地请求测试退出状态: {curl_exit_status}")
logger.info(f"curl本地请求测试输出: {curl_output[:200]}..." if len(curl_output) > 200 else f"curl本地请求测试输出: {curl_output}")
if curl_exit_status == 0:
self.progress_updated.emit(100)
self.result_ready.emit(True, "Django测试服务器启动成功端口访问正常")
logger.info("Django测试服务器启动成功端口访问正常")
else:
# 检查错误类型
if "Failed to connect" in curl_output:
error_msg = "Django测试服务器启动成功但本地请求超时Gunicorn进程可能未正常响应"
elif "Bad Request (400)" in curl_output:
error_msg = "Django测试服务器启动成功但返回400错误可能是ALLOWED_HOSTS配置问题"
elif "500" in curl_output:
error_msg = "Django测试服务器启动成功但返回500错误可能是应用内部错误"
else:
error_msg = f"Django测试服务器启动成功但本地请求失败: {curl_output[:100]}"
self.result_ready.emit(False, error_msg)
logger.error(error_msg)
else:
self.progress_updated.emit(100)
self.result_ready.emit(False, "Django测试服务器启动成功但端口8000未处于LISTEN状态")
logger.error("Django测试服务器启动成功但端口8000未处于LISTEN状态")
else:
self.result_ready.emit(False, "Django测试服务器启动失败")
logger.error("Django测试服务器启动失败")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Django测试异常: {error_msg}")
class DownloadSettingsThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, django_path):
super().__init__()
self.ssh_client = ssh_client
self.django_path = django_path
def run(self):
try:
# 查找settings.py文件
stdin, stdout, stderr = self.ssh_client.exec_command(f"find {self.django_path} -name settings.py")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
self.result_ready.emit(False, "未找到settings.py文件")
logger.error("未找到settings.py文件")
return
settings_path = stdout.read().decode().strip()
# 下载settings.py内容
stdin, stdout, stderr = self.ssh_client.exec_command(f"cat {settings_path}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
settings_content = stdout.read().decode()
self.result_ready.emit(True, settings_content)
logger.info("settings.py下载成功")
else:
error = stderr.read().decode()
self.result_ready.emit(False, error)
logger.error(f"settings.py下载失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"settings.py下载异常: {error_msg}")
class UploadSettingsThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, django_path, settings_content):
super().__init__()
self.ssh_client = ssh_client
self.django_path = django_path
self.settings_content = settings_content
def run(self):
try:
# 查找settings.py文件
stdin, stdout, stderr = self.ssh_client.exec_command(f"find {self.django_path} -name settings.py")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
self.result_ready.emit(False, "未找到settings.py文件")
logger.error("未找到settings.py文件")
return
settings_path = stdout.read().decode().strip()
# 创建临时文件
temp_file = "/tmp/settings_upload.py"
sftp = self.ssh_client.open_sftp()
with sftp.file(temp_file, 'w') as f:
f.write(self.settings_content)
# 移动到目标位置
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo mv {temp_file} {settings_path}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.result_ready.emit(True, "settings.py上传成功")
logger.info("settings.py上传成功")
else:
error = stderr.read().decode()
self.result_ready.emit(False, error)
logger.error(f"settings.py上传失败: {error}")
sftp.close()
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"settings.py上传异常: {error_msg}")
class CollectStaticThread(QThread):
result_ready = Signal(bool, str)
progress_updated = Signal(int)
def __init__(self, ssh_client, django_path):
super().__init__()
self.ssh_client = ssh_client
self.django_path = django_path
def run(self):
try:
self.progress_updated.emit(10)
# 检查Django项目是否存在
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/manage.py")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
self.result_ready.emit(False, f"Django项目不存在: {self.django_path}")
logger.error(f"Django项目不存在: {self.django_path}")
return
self.progress_updated.emit(30)
# 收集静态文件
stdin, stdout, stderr = self.ssh_client.exec_command(f"cd {self.django_path} && python3 manage.py collectstatic --noinput")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.progress_updated.emit(100)
output = stdout.read().decode()
self.result_ready.emit(True, f"静态文件收集成功: {output}")
logger.info("静态文件收集成功")
else:
error = stderr.read().decode()
self.result_ready.emit(False, error)
logger.error(f"静态文件收集失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"静态文件收集异常: {error_msg}")
class CheckDjangoStatusThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, django_path):
super().__init__()
self.ssh_client = ssh_client
self.django_path = django_path
def run(self):
try:
status_info = []
# 检查Django项目是否存在
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/manage.py")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
self.result_ready.emit(False, f"Django项目不存在: {self.django_path}")
logger.error(f"Django项目不存在: {self.django_path}")
return
status_info.append("✓ Django项目存在")
# 检查Django版本
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
django_version = stdout.read().decode().strip()
status_info.append(f"✓ Django版本: {django_version}")
# 检查requirements.txt
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/requirements.txt")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
status_info.append("✓ requirements.txt存在")
else:
status_info.append("✗ requirements.txt不存在")
# 检查settings.py
stdin, stdout, stderr = self.ssh_client.exec_command(f"find {self.django_path} -name settings.py")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
status_info.append("✓ settings.py存在")
else:
status_info.append("✗ settings.py不存在")
# 检查静态文件目录
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/static")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
status_info.append("✓ static目录存在")
else:
status_info.append("✗ static目录不存在")
status_text = "\n".join(status_info)
self.result_ready.emit(True, status_text)
logger.info("Django状态检查完成")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Django状态检查异常: {error_msg}")
class GunicornInstallThread(QThread):
result_ready = Signal(bool, str)
progress_updated = Signal(int)
def __init__(self, ssh_client, password=None):
super().__init__()
self.ssh_client = ssh_client
self.password = password
def run(self):
try:
self.progress_updated.emit(10)
# 检查Gunicorn是否已安装
logger.info("检查Gunicorn是否已安装")
stdin, stdout, stderr = self.ssh_client.exec_command("gunicorn --version")
exit_status = stdout.channel.recv_exit_status()
gunicorn_version = stdout.read().decode().strip()
version_error = stderr.read().decode()
logger.info(f"Gunicorn版本检查状态: {exit_status}")
logger.info(f"Gunicorn版本信息: {gunicorn_version}")
if version_error:
logger.error(f"Gunicorn版本检查错误: {version_error}")
if gunicorn_version:
self.result_ready.emit(True, f"Gunicorn已安装: {gunicorn_version}")
logger.info(f"Gunicorn已安装: {gunicorn_version}")
return
self.progress_updated.emit(30)
# 尝试使用pip安装
logger.info("开始使用pip安装Gunicorn")
stdin, stdout, stderr = self.ssh_client.exec_command("pip3 install gunicorn")
exit_status = stdout.channel.recv_exit_status()
install_output = stdout.read().decode()
install_error = stderr.read().decode()
logger.info(f"Gunicorn pip安装命令执行状态: {exit_status}")
logger.info(f"Gunicorn pip安装输出: {install_output}")
if install_error:
logger.error(f"Gunicorn pip安装错误: {install_error}")
if exit_status == 0:
self.progress_updated.emit(90)
# 验证安装
logger.info("验证Gunicorn安装")
stdin, stdout, stderr = self.ssh_client.exec_command("gunicorn --version")
version_exit_status = stdout.channel.recv_exit_status()
gunicorn_version = stdout.read().decode().strip()
version_error = stderr.read().decode()
logger.info(f"Gunicorn版本检查状态: {version_exit_status}")
logger.info(f"Gunicorn版本信息: {gunicorn_version}")
if version_error:
logger.error(f"Gunicorn版本检查错误: {version_error}")
if gunicorn_version:
self.result_ready.emit(True, f"Gunicorn安装成功: {gunicorn_version}")
logger.info(f"Gunicorn安装成功: {gunicorn_version}")
else:
self.result_ready.emit(False, "Gunicorn安装后无法获取版本信息")
logger.error("Gunicorn安装后无法获取版本信息")
return
self.progress_updated.emit(50)
# 如果pip安装失败尝试使用apt安装
logger.info("pip安装失败尝试使用apt安装Gunicorn")
if self.password:
logger.info("使用密码进行sudo apt安装")
stdin, stdout, stderr = self.ssh_client.exec_command("sudo -S apt update && sudo -S apt install -y gunicorn")
stdin.write(f"{self.password}\n")
stdin.flush()
else:
logger.info("无密码进行sudo apt安装")
stdin, stdout, stderr = self.ssh_client.exec_command("sudo apt update && sudo apt install -y gunicorn")
exit_status = stdout.channel.recv_exit_status()
apt_output = stdout.read().decode()
apt_error = stderr.read().decode()
logger.info(f"Gunicorn apt安装命令执行状态: {exit_status}")
logger.info(f"Gunicorn apt安装输出: {apt_output}")
if apt_error:
logger.error(f"Gunicorn apt安装错误: {apt_error}")
if exit_status == 0:
self.progress_updated.emit(90)
# 验证安装
logger.info("验证apt安装的Gunicorn")
stdin, stdout, stderr = self.ssh_client.exec_command("gunicorn --version")
version_exit_status = stdout.channel.recv_exit_status()
gunicorn_version = stdout.read().decode().strip()
version_error = stderr.read().decode()
logger.info(f"Gunicorn版本检查状态: {version_exit_status}")
logger.info(f"Gunicorn版本信息: {gunicorn_version}")
if version_error:
logger.error(f"Gunicorn版本检查错误: {version_error}")
if gunicorn_version:
self.result_ready.emit(True, f"Gunicorn安装成功: {gunicorn_version}")
logger.info(f"Gunicorn安装成功: {gunicorn_version}")
else:
self.result_ready.emit(False, "Gunicorn安装后无法获取版本信息")
logger.error("Gunicorn安装后无法获取版本信息")
else:
error = stderr.read().decode()
self.result_ready.emit(False, f"Gunicorn安装失败: {error}")
logger.error(f"Gunicorn安装失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Gunicorn安装异常: {error_msg}")
class GunicornTestThread(QThread):
result_ready = Signal(bool, str)
progress_updated = Signal(int)
def __init__(self, ssh_client, django_path, port="8000", host="127.0.0.1"):
super().__init__()
self.ssh_client = ssh_client
self.django_path = django_path
self.port = port
self.host = host
logger.info(f"GunicornTestThread初始化 - Django路径: {django_path}, 端口: {port}, 主机: {host}")
def run(self):
try:
self.progress_updated.emit(10)
# 检查Django项目是否存在
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/manage.py")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
self.result_ready.emit(False, f"Django项目不存在: {self.django_path}")
logger.error(f"Django项目不存在: {self.django_path}")
return
self.progress_updated.emit(30)
# 检查wsgi.py文件 - 先尝试项目根目录
stdin, stdout, stderr = self.ssh_client.exec_command(f"find {self.django_path} -name wsgi.py -type f")
wsgi_files = stdout.read().decode().strip().split('\n')
if not wsgi_files or wsgi_files == ['']:
self.result_ready.emit(False, f"未找到wsgi.py文件")
logger.error(f"未找到wsgi.py文件")
return
# 使用第一个找到的wsgi.py文件
wsgi_file = wsgi_files[0].strip()
logger.info(f"找到wsgi.py文件: {wsgi_file}")
# 获取项目目录的相对路径
project_dir = os.path.dirname(wsgi_file)
project_name = os.path.basename(project_dir)
# 如果wsgi.py在子目录中调整项目名
if project_dir != self.django_path.rstrip('/'):
# wsgi.py在子目录中使用子目录名作为项目名
project_name = os.path.basename(project_dir)
else:
# wsgi.py在项目根目录中使用manage.py所在目录名作为项目名
project_name = os.path.basename(self.django_path.rstrip('/'))
logger.info(f"使用项目名: {project_name}, 工作目录: {self.django_path}")
self.progress_updated.emit(50)
# 在启动Gunicorn前先检查端口是否被占用
logger.info(f"检查端口{self.port}是否被其他进程占用")
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo lsof -i :{self.port}")
lsof_output = stdout.read().decode()
lsof_exit_status = stdout.channel.recv_exit_status()
if lsof_exit_status == 0 and lsof_output:
logger.warning(f"端口{self.port}已被其他进程占用: {lsof_output}")
# 尝试杀死占用端口的进程
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo lsof -t -i :{self.port} | xargs sudo kill -9")
time.sleep(1) # 等待进程被杀死
logger.info(f"已尝试杀死占用端口{self.port}的进程")
else:
logger.info(f"端口{self.port}未被占用")
# 测试Gunicorn启动使用构造函数中传入的端口参数指定4个worker
test_command = f"cd {self.django_path} && gunicorn --workers 4 --bind 0.0.0.0:{self.port} {project_name}.wsgi:application --timeout 5 --log-level debug --access-logfile - --error-logfile -"
logger.info(f"执行Gunicorn测试命令: {test_command}")
stdin, stdout, stderr = self.ssh_client.exec_command(test_command)
time.sleep(3) # 等待Gunicorn启动
# 捕获Gunicorn启动日志
gunicorn_stdout = stdout.read().decode()
gunicorn_stderr = stderr.read().decode()
# 记录Gunicorn启动日志
if gunicorn_stdout:
logger.info(f"Gunicorn标准输出: {gunicorn_stdout[:500]}..." if len(gunicorn_stdout) > 500 else f"Gunicorn标准输出: {gunicorn_stdout}")
if gunicorn_stderr:
logger.error(f"Gunicorn错误输出: {gunicorn_stderr[:500]}..." if len(gunicorn_stderr) > 500 else f"Gunicorn错误输出: {gunicorn_stderr}")
# 检查错误日志中的关键词
error_keywords = ["ImportError", "ModuleNotFoundError", "DatabaseError", "ConnectionError", "OperationalError", "ProgrammingError", "Exception", "Error", "Failed"]
found_errors = []
for keyword in error_keywords:
if keyword in gunicorn_stderr:
found_errors.append(keyword)
logger.error(f"在Gunicorn错误日志中发现关键词: {keyword}")
if found_errors:
error_msg = f"Gunicorn启动失败发现错误关键词: {', '.join(found_errors)}"
logger.error(error_msg)
self.result_ready.emit(False, error_msg)
return
# 检查Gunicorn进程状态
stdin, stdout, stderr = self.ssh_client.exec_command("ps aux | grep gunicorn")
output = stdout.read().decode()
logger.info(f"Gunicorn进程检查结果: {output}")
# 检查Worker进程状态
worker_count = output.count('gunicorn') - 1 # 减去grep进程本身
logger.info(f"检测到Gunicorn Worker进程数量: {worker_count}")
# 详细分析进程结构
lines = output.strip().split('\n')
master_process = None
worker_processes = []
for line in lines:
if 'grep' not in line and 'gunicorn' in line:
parts = line.split()
if len(parts) >= 11:
pid = parts[1]
command = ' '.join(parts[10:])
if 'master' in command or 'gunicorn: master' in command:
master_process = (pid, command)
elif 'worker' in command:
worker_processes.append((pid, command))
if master_process:
logger.info(f"检测到Gunicorn主进程 - PID: {master_process[0]}, 命令: {master_process[1]}")
else:
logger.warning(f"未检测到Gunicorn主进程")
if worker_processes:
logger.info(f"检测到{len(worker_processes)}个Gunicorn Worker进程:")
for pid, command in worker_processes:
logger.info(f" - Worker PID: {pid}, 命令: {command}")
else:
logger.warning(f"未检测到Gunicorn Worker进程")
# 检查进程数量是否符合预期1个主进程+4个worker进程
if master_process and len(worker_processes) == 4:
logger.info(f"Gunicorn进程结构正常: 1个主进程 + 4个worker进程")
elif master_process and len(worker_processes) > 0:
logger.warning(f"Gunicorn进程结构部分正常: 1个主进程 + {len(worker_processes)}个worker进程预期4个")
else:
logger.error(f"Gunicorn进程结构异常: 未检测到完整的进程结构")
if worker_count < 1:
logger.error(f"Gunicorn主进程存在但Worker进程未启动可能是Django代码/配置错误")
# 尝试手动启动Gunicorn并获取详细错误日志
logger.info(f"尝试手动启动Gunicorn以获取详细错误日志")
stdin, stdout, stderr = self.ssh_client.exec_command(f"cd {self.django_path} && gunicorn --workers 4 --bind 0.0.0.0:{self.port} {project_name}.wsgi:application --timeout 5 --log-level debug --access-logfile - --error-logfile -")
time.sleep(2)
error_output = stderr.read().decode()
logger.error(f"手动启动Gunicorn错误日志: {error_output}")
# 检查错误日志中的关键词
error_keywords = ["ImportError", "ModuleNotFoundError", "DatabaseError", "ConnectionError", "OperationalError", "ProgrammingError", "Exception", "Error", "Failed"]
found_errors = []
for keyword in error_keywords:
if keyword in error_output:
found_errors.append(keyword)
logger.error(f"在手动启动Gunicorn错误日志中发现关键词: {keyword}")
if found_errors:
error_msg = f"Gunicorn Worker进程未启动发现错误关键词: {', '.join(found_errors)}"
logger.error(error_msg)
self.result_ready.emit(False, error_msg)
return
# 清理测试进程
stdin, stdout, stderr = self.ssh_client.exec_command(f"pkill -f 'gunicorn.*{self.port}'")
if "gunicorn" in output and f":{self.port}" in output:
self.progress_updated.emit(80)
logger.info(f"Gunicorn进程运行正常开始检查端口{self.port}访问情况")
# 检查端口是否真的在监听
self.progress_updated.emit(85)
stdin, stdout, stderr = self.ssh_client.exec_command(f"ss -tulnp | grep {self.port}")
ss_output = stdout.read().decode()
logger.info(f"ss命令检查端口{self.port}结果: {ss_output}")
# 如果ss命令没有结果尝试使用netstat
if not ss_output:
stdin, stdout, stderr = self.ssh_client.exec_command(f"netstat -tulnp | grep {self.port}")
netstat_output = stdout.read().decode()
logger.info(f"netstat命令检查端口{self.port}结果: {netstat_output}")
port_check_output = netstat_output
else:
port_check_output = ss_output
# 检查端口是否在LISTEN状态
if "LISTEN" in port_check_output and f":{self.port}" in port_check_output:
self.progress_updated.emit(90)
logger.info(f"端口{self.port}处于LISTEN状态开始本地请求测试")
# 本地发起请求测试
stdin, stdout, stderr = self.ssh_client.exec_command(f"curl -s http://{self.host}:{self.port}")
curl_output = stdout.read().decode()
curl_exit_status = stdout.channel.recv_exit_status()
logger.info(f"curl本地请求测试退出状态: {curl_exit_status}")
logger.info(f"curl本地请求测试输出: {curl_output[:200]}..." if len(curl_output) > 200 else f"curl本地请求测试输出: {curl_output}")
if curl_exit_status == 0:
self.progress_updated.emit(100)
self.result_ready.emit(True, f"Gunicorn测试成功 - 项目: {project_name}, 端口: {self.port}, 访问正常")
logger.info(f"Gunicorn测试成功 - 项目: {project_name}, 端口: {self.port}, 访问正常")
else:
# 检查错误类型
if "Failed to connect" in curl_output:
error_msg = f"Gunicorn进程运行正常但本地请求超时端口{self.port}未响应"
elif "Bad Request (400)" in curl_output:
error_msg = f"Gunicorn进程运行正常但返回400错误可能是ALLOWED_HOSTS配置问题"
elif "500" in curl_output:
error_msg = f"Gunicorn进程运行正常但返回500错误可能是应用内部错误"
else:
error_msg = f"Gunicorn进程运行正常但本地请求失败: {curl_output[:100]}"
self.result_ready.emit(False, error_msg)
logger.error(error_msg)
else:
self.progress_updated.emit(100)
self.result_ready.emit(False, f"Gunicorn进程运行正常但端口{self.port}未处于LISTEN状态")
logger.error(f"Gunicorn进程运行正常但端口{self.port}未处于LISTEN状态")
else:
# 尝试更简单的测试方式
self.progress_updated.emit(80)
test_command2 = f"cd {self.django_path} && python3 -c \"import {project_name}.wsgi; print('WSGI导入成功')\""
logger.info(f"执行WSGI导入测试: {test_command2}")
stdin, stdout, stderr = self.ssh_client.exec_command(test_command2)
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
wsgi_output = stdout.read().decode()
logger.info(f"WSGI导入测试成功: {wsgi_output}")
self.result_ready.emit(True, f"WSGI配置验证成功 - 项目: {project_name}")
logger.info(f"WSGI配置验证成功 - 项目: {project_name}")
else:
error_output = stderr.read().decode()
logger.error(f"Gunicorn测试失败: {error_output}")
self.result_ready.emit(False, f"Gunicorn测试失败: {error_output}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Gunicorn测试异常: {error_msg}")
class UploadGunicornServiceThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, service_name, service_content, password=None):
super().__init__()
self.ssh_client = ssh_client
self.service_name = service_name
self.service_content = service_content
self.password = password
def run(self):
try:
# 创建服务文件
service_file = f"/etc/systemd/system/{self.service_name}.service"
temp_file = f"/tmp/{self.service_name}.service"
logger.info(f"准备上传服务文件: {service_file}")
sftp = self.ssh_client.open_sftp()
with sftp.file(temp_file, 'w') as f:
f.write(self.service_content)
logger.info(f"临时服务文件创建成功: {temp_file}")
# 移动到systemd目录
if self.password:
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S mv {temp_file} {service_file}")
stdin.write(f"{self.password}\n")
stdin.flush()
else:
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo mv {temp_file} {service_file}")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
logger.error(f"服务文件移动失败: {error}")
self.result_ready.emit(False, f"服务文件移动失败: {error}")
sftp.close()
return
logger.info(f"服务文件移动成功: {service_file}")
# 设置权限
if self.password:
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S chmod 644 {service_file}")
stdin.write(f"{self.password}\n")
stdin.flush()
else:
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo chmod 644 {service_file}")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
logger.error(f"权限设置失败: {error}")
self.result_ready.emit(False, f"权限设置失败: {error}")
sftp.close()
return
logger.info(f"服务文件权限设置成功: {service_file}")
# 验证服务文件是否存在
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {service_file}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
file_info = stdout.read().decode().strip()
logger.info(f"服务文件验证成功: {file_info}")
else:
error = stderr.read().decode()
logger.error(f"服务文件验证失败: {error}")
self.result_ready.emit(False, f"服务文件验证失败: {error}")
sftp.close()
return
# 重新加载systemd
if self.password:
stdin, stdout, stderr = self.ssh_client.exec_command("sudo -S systemctl daemon-reload")
stdin.write(f"{self.password}\n")
stdin.flush()
else:
stdin, stdout, stderr = self.ssh_client.exec_command("sudo systemctl daemon-reload")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
logger.error(f"systemd重新加载失败: {error}")
self.result_ready.emit(False, f"systemd重新加载失败: {error}")
sftp.close()
return
logger.info(f"systemd重新加载成功")
# 验证服务是否被systemd识别
stdin, stdout, stderr = self.ssh_client.exec_command(f"systemctl list-unit-files | grep {self.service_name}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
service_info = stdout.read().decode().strip()
logger.info(f"服务被systemd识别: {service_info}")
else:
error = stderr.read().decode()
logger.warning(f"服务未被systemd识别: {error}")
sftp.close()
self.result_ready.emit(True, f"服务文件上传成功: {service_file}")
logger.info(f"服务文件上传成功: {service_file}")
except Exception as e:
error_msg = str(e)
logger.error(f"服务文件上传异常: {error_msg}")
self.result_ready.emit(False, error_msg)
class ManageGunicornServiceThread(QThread):
result_ready = Signal(bool, str)
def __init__(self, ssh_client, service_name, action, password=None, port="8000"):
super().__init__()
self.ssh_client = ssh_client
self.service_name = service_name
self.action = action
self.password = password
self.port = port
logger.info(f"ManageGunicornServiceThread初始化 - 服务名: {service_name}, 操作: {action}, 端口: {port}")
def run(self):
try:
# 构建systemd命令
# 注意systemd命令中不需要.service后缀
service_name_for_systemd = self.service_name
if service_name_for_systemd.endswith('.service'):
service_name_for_systemd = service_name_for_systemd[:-8] # 移除.service后缀
cmd = f"systemctl {self.action} {service_name_for_systemd}"
logger.info(f"执行服务管理命令: {cmd}")
# 首先检查服务文件是否存在
service_file = f"/etc/systemd/system/{self.service_name}.service"
stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {service_file}")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
logger.error(f"服务文件不存在: {service_file}, 错误: {error}")
self.result_ready.emit(False, f"服务文件不存在: {self.service_name}.service")
return
file_info = stdout.read().decode().strip()
logger.info(f"服务文件存在: {file_info}")
if self.action == "status":
# 状态命令需要特殊处理
if self.password:
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S {cmd}")
# 立即写入密码并关闭stdin
stdin.write(f"{self.password}\n")
stdin.close()
# 等待命令执行完成
exit_status = stdout.channel.recv_exit_status()
logger.info(f"服务状态查询命令执行完成,退出状态: {exit_status}")
# 读取输出和错误
output = stdout.read().decode()
error = stderr.read().decode()
# 检查是否仍然提示输入密码
if "password for" in error.lower() and exit_status != 0:
logger.error(f"密码传递失败,仍然提示输入密码: {error}")
self.result_ready.emit(False, error)
return
else:
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo {cmd}")
exit_status = stdout.channel.recv_exit_status()
# 读取输出和错误
output = stdout.read().decode()
error = stderr.read().decode()
if exit_status == 0:
logger.info(f"服务状态查询成功: {service_name_for_systemd}")
self.result_ready.emit(True, output)
else:
logger.error(f"服务状态查询失败: {error}")
self.result_ready.emit(False, error)
else:
# 其他操作start, stop, restart, enable, disable
if self.password:
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S {cmd}")
# 立即写入密码并关闭stdin
stdin.write(f"{self.password}\n")
stdin.close()
# 等待命令执行完成
exit_status = stdout.channel.recv_exit_status()
logger.info(f"服务{self.action}命令执行完成,退出状态: {exit_status}")
# 读取错误输出
error = stderr.read().decode()
# 检查是否仍然提示输入密码
if "password for" in error.lower() and exit_status != 0:
logger.error(f"密码传递失败,仍然提示输入密码: {error}")
self.result_ready.emit(False, error)
return
else:
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo {cmd}")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
logger.info(f"服务{self.action}成功: {service_name_for_systemd}")
# 如果是启动操作,额外检查服务状态
if self.action == "start":
time.sleep(2) # 等待服务完全启动
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo systemctl status {service_name_for_systemd}")
status_exit_status = stdout.channel.recv_exit_status()
status_output = stdout.read().decode()
status_error = stderr.read().decode()
if status_exit_status == 0:
logger.info(f"服务启动后状态检查成功: {status_output}")
# 检查端口是否被监听,从服务名称中提取端口
port = "8000" # 默认端口
if hasattr(self, 'port') and self.port:
port = self.port
# 在检查端口监听前,先检查端口占用情况
logger.info(f"检查端口{port}占用情况")
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo lsof -i :{port}")
lsof_output = stdout.read().decode()
lsof_exit_status = stdout.channel.recv_exit_status()
if lsof_exit_status == 0 and lsof_output:
logger.warning(f"端口{port}被占用: {lsof_output}")
else:
logger.info(f"端口{port}未被占用")
# 检查Gunicorn进程状态
stdin, stdout, stderr = self.ssh_client.exec_command("ps aux | grep gunicorn")
ps_output = stdout.read().decode()
logger.info(f"Gunicorn进程状态: {ps_output}")
# 详细分析进程结构
lines = ps_output.strip().split('\n')
master_process = None
worker_processes = []
for line in lines:
if 'grep' not in line and 'gunicorn' in line:
parts = line.split()
if len(parts) >= 11:
pid = parts[1]
command = ' '.join(parts[10:])
if 'master' in command or 'gunicorn: master' in command:
master_process = (pid, command)
elif 'worker' in command:
worker_processes.append((pid, command))
if master_process:
logger.info(f"检测到Gunicorn主进程 - PID: {master_process[0]}, 命令: {master_process[1]}")
else:
logger.warning(f"未检测到Gunicorn主进程")
if worker_processes:
logger.info(f"检测到{len(worker_processes)}个Gunicorn Worker进程:")
for pid, command in worker_processes:
logger.info(f" - Worker PID: {pid}, 命令: {command}")
else:
logger.warning(f"未检测到Gunicorn Worker进程")
# 检查进程数量是否符合预期1个主进程+4个worker进程
if master_process and len(worker_processes) == 4:
logger.info(f"Gunicorn进程结构正常: 1个主进程 + 4个worker进程")
elif master_process and len(worker_processes) > 0:
logger.warning(f"Gunicorn进程结构部分正常: 1个主进程 + {len(worker_processes)}个worker进程预期4个")
else:
logger.error(f"Gunicorn进程结构异常: 未检测到完整的进程结构")
# 检查Worker进程状态
worker_count = ps_output.count('gunicorn') - 1 # 减去grep进程本身
logger.info(f"检测到Gunicorn Worker进程数量: {worker_count}")
if worker_count < 1:
logger.error(f"Gunicorn主进程存在但Worker进程未启动可能是Django代码/配置错误")
# 使用ss命令检查端口监听状态
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo ss -tulnp | grep :{port}")
ss_output = stdout.read().decode()
logger.info(f"ss命令检查端口{port}监听状态: {ss_output}")
# 如果ss命令没有结果尝试使用netstat
if not ss_output:
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo netstat -tlnp | grep :{port}")
netstat_output = stdout.read().decode()
logger.info(f"netstat命令检查端口{port}监听状态: {netstat_output}")
port_check_output = netstat_output
else:
port_check_output = ss_output
# 检查端口是否在LISTEN状态
if "LISTEN" in port_check_output and f":{port}" in port_check_output:
logger.info(f"端口{port}处于LISTEN状态开始本地请求测试")
# 本地发起请求测试
stdin, stdout, stderr = self.ssh_client.exec_command(f"curl -s http://127.0.0.1:{port}")
curl_output = stdout.read().decode()
curl_exit_status = stdout.channel.recv_exit_status()
logger.info(f"curl本地请求测试退出状态: {curl_exit_status}")
logger.info(f"curl本地请求测试输出: {curl_output[:200]}..." if len(curl_output) > 200 else f"curl本地请求测试输出: {curl_output}")
if curl_exit_status == 0:
logger.info(f"服务启动成功,端口{port}访问正常")
else:
# 检查错误类型
if "Failed to connect" in curl_output:
logger.warning(f"服务启动成功,但本地请求超时,端口{port}未响应")
elif "Bad Request (400)" in curl_output:
logger.warning(f"服务启动成功但返回400错误可能是ALLOWED_HOSTS配置问题")
elif "500" in curl_output:
logger.warning(f"服务启动成功但返回500错误可能是应用内部错误")
else:
logger.warning(f"服务启动成功,但本地请求失败: {curl_output[:100]}")
else:
logger.warning(f"服务启动成功,但端口{port}未处于LISTEN状态")
# 检查SELinux状态
logger.info(f"检查SELinux状态")
stdin, stdout, stderr = self.ssh_client.exec_command("getenforce")
selinux_output = stdout.read().decode().strip()
logger.info(f"SELinux状态: {selinux_output}")
if selinux_output == "Enforcing":
logger.warning(f"SELinux处于Enforcing状态可能限制了端口绑定")
# 检查项目目录权限
logger.info(f"检查项目目录权限")
stdin, stdout, stderr = self.ssh_client.exec_command("ls -ld /home/xiaji/")
home_dir_output = stdout.read().decode().strip()
logger.info(f"/home/xiaji/目录权限: {home_dir_output}")
# 尝试更换端口测试
logger.info(f"尝试更换端口测试")
test_port = "8001"
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo ss -tulnp | grep :{test_port}")
test_port_output = stdout.read().decode()
if not test_port_output:
logger.info(f"端口{test_port}未被占用尝试用该端口启动Gunicorn")
# 这里只是记录日志,不实际更换端口
else:
logger.warning(f"服务启动后状态检查失败: {status_error}")
self.result_ready.emit(True, f"服务{self.action}成功: {service_name_for_systemd}")
else:
error = stderr.read().decode()
logger.error(f"服务{self.action}失败: {error}")
self.result_ready.emit(False, error)
except Exception as e:
error_msg = str(e)
logger.error(f"服务管理异常: {error_msg}")
self.result_ready.emit(False, error_msg)