import os import time from loguru import logger from PySide6.QtCore import QThread, Signal import paramiko class PasswordDialog: def __init__(self, parent=None): self.parent = parent self.password = None def get_password(self, prompt="请输入sudo密码:"): # 这里简化实现,实际应该使用QInputDialog if self.parent: from PySide6.QtWidgets import QInputDialog password, ok = QInputDialog.getText(self.parent, "sudo密码", prompt, echo=QInputDialog.Password) if ok: self.password = password return password return None class SSHConnectionThread(QThread): connection_status = Signal(bool, str) def __init__(self, host, username, password, port=22): super().__init__() self.host = host self.username = username self.password = password self.port = port def run(self): try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(self.host, port=self.port, username=self.username, password=self.password, timeout=10) # 测试连接 stdin, stdout, stderr = ssh.exec_command("echo 'Connection test'") output = stdout.read().decode().strip() if output == "Connection test": self.connection_status.emit(True, f"成功连接到 {self.host}") logger.info(f"SSH连接成功: {self.host}") else: self.connection_status.emit(False, "连接测试失败") logger.error("SSH连接测试失败") ssh.close() except Exception as e: error_msg = str(e) self.connection_status.emit(False, error_msg) logger.error(f"SSH连接失败: {error_msg}") class GitInstallThread(QThread): result_ready = Signal(bool, str) def __init__(self, ssh_client): super().__init__() self.ssh_client = ssh_client def run(self): try: # 检查Git是否已安装 stdin, stdout, stderr = self.ssh_client.exec_command("git --version") git_version = stdout.read().decode().strip() if git_version: self.result_ready.emit(True, f"Git已安装: {git_version}") logger.info(f"Git已安装: {git_version}") return # 安装Git stdin, stdout, stderr = self.ssh_client.exec_command("sudo apt update && sudo apt install -y git") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: stdin, stdout, stderr = self.ssh_client.exec_command("git --version") git_version = stdout.read().decode().strip() self.result_ready.emit(True, f"Git安装成功: {git_version}") logger.info(f"Git安装成功: {git_version}") else: error = stderr.read().decode() self.result_ready.emit(False, f"Git安装失败: {error}") logger.error(f"Git安装失败: {error}") except Exception as e: error_msg = str(e) self.result_ready.emit(False, error_msg) logger.error(f"Git安装异常: {error_msg}") class GitCloneThread(QThread): result_ready = Signal(bool, str) def __init__(self, ssh_client, git_url, project_path): super().__init__() self.ssh_client = ssh_client self.git_url = git_url self.project_path = project_path def run(self): try: # 检查目录是否存在 stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.project_path}") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: self.result_ready.emit(False, f"目录 {self.project_path} 已存在") logger.warning(f"目录已存在: {self.project_path}") return # 创建父目录 parent_dir = os.path.dirname(self.project_path) if parent_dir: stdin, stdout, stderr = self.ssh_client.exec_command(f"mkdir -p {parent_dir}") # 克隆仓库 stdin, stdout, stderr = self.ssh_client.exec_command(f"cd {parent_dir} && git clone {self.git_url} {os.path.basename(self.project_path)}") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: self.result_ready.emit(True, f"Git克隆成功: {self.project_path}") logger.info(f"Git克隆成功: {self.project_path}") else: error = stderr.read().decode() self.result_ready.emit(False, f"Git克隆失败: {error}") logger.error(f"Git克隆失败: {error}") except Exception as e: error_msg = str(e) self.result_ready.emit(False, error_msg) logger.error(f"Git克隆异常: {error_msg}") class ListDirectoryThread(QThread): result_ready = Signal(bool, str) def __init__(self, ssh_client, path): super().__init__() self.ssh_client = ssh_client self.path = path def run(self): try: stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.path}") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: output = stdout.read().decode() self.result_ready.emit(True, output) logger.info(f"目录列表成功: {self.path}") else: error = stderr.read().decode() self.result_ready.emit(False, error) logger.error(f"目录列表失败: {error}") except Exception as e: error_msg = str(e) self.result_ready.emit(False, error_msg) logger.error(f"目录列表异常: {error_msg}") class DeleteDirectoryThread(QThread): result_ready = Signal(bool, str) def __init__(self, ssh_client, path): super().__init__() self.ssh_client = ssh_client self.path = path def run(self): try: stdin, stdout, stderr = self.ssh_client.exec_command(f"rm -rf {self.path}") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: self.result_ready.emit(True, f"目录删除成功: {self.path}") logger.info(f"目录删除成功: {self.path}") else: error = stderr.read().decode() self.result_ready.emit(False, error) logger.error(f"目录删除失败: {error}") except Exception as e: error_msg = str(e) self.result_ready.emit(False, error_msg) logger.error(f"目录删除异常: {error_msg}") class SetTimezoneAndRestartThread(QThread): result_ready = Signal(bool, str) def __init__(self, ssh_client, password): super().__init__() self.ssh_client = ssh_client self.password = password def run(self): try: # 设置时区,使用-S选项从标准输入读取密码 stdin, stdout, stderr = self.ssh_client.exec_command(f"echo '{self.password}' | sudo -S timedatectl set-timezone Asia/Shanghai") exit_status = stdout.channel.recv_exit_status() if exit_status != 0: error = stderr.read().decode() self.result_ready.emit(False, f"设置时区失败: {error}") logger.error(f"设置时区失败: {error}") return # 重启服务器,使用-S选项从标准输入读取密码 stdin, stdout, stderr = self.ssh_client.exec_command(f"echo '{self.password}' | sudo -S reboot") self.result_ready.emit(True, "时区设置成功,服务器正在重启") logger.info("时区设置成功,服务器正在重启") except Exception as e: error_msg = str(e) self.result_ready.emit(False, error_msg) logger.error(f"时区设置异常: {error_msg}") class CheckFirewallThread(QThread): result_ready = Signal(bool, str) def __init__(self, ssh_client, password): super().__init__() self.ssh_client = ssh_client self.password = password def run(self): try: # 检查UFW状态,使用-S选项从标准输入读取密码 stdin, stdout, stderr = self.ssh_client.exec_command(f"echo '{self.password}' | sudo -S ufw status") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: output = stdout.read().decode() self.result_ready.emit(True, output) logger.info("防火墙状态检查成功") else: error = stderr.read().decode() self.result_ready.emit(False, error) logger.error(f"防火墙状态检查失败: {error}") except Exception as e: error_msg = str(e) self.result_ready.emit(False, error_msg) logger.error(f"防火墙状态检查异常: {error_msg}") class OpenPortThread(QThread): result_ready = Signal(bool, str) def __init__(self, ssh_client, port, password): super().__init__() self.ssh_client = ssh_client self.port = port self.password = password def run(self): try: # 开放端口,使用-S选项从标准输入读取密码 stdin, stdout, stderr = self.ssh_client.exec_command(f"echo '{self.password}' | sudo -S ufw allow {self.port}") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: # 重新加载防火墙,使用-S选项从标准输入读取密码 stdin, stdout, stderr = self.ssh_client.exec_command(f"echo '{self.password}' | sudo -S ufw reload") self.result_ready.emit(True, f"端口 {self.port} 开放成功") logger.info(f"端口 {self.port} 开放成功") else: error = stderr.read().decode() self.result_ready.emit(False, error) logger.error(f"端口开放失败: {error}") except Exception as e: error_msg = str(e) self.result_ready.emit(False, error_msg) logger.error(f"端口开放异常: {error_msg}") class DjangoInstallThread(QThread): result_ready = Signal(bool, str) progress_updated = Signal(int) def __init__(self, ssh_client, password): super().__init__() self.ssh_client = ssh_client self.password = password def run(self): try: self.progress_updated.emit(10) # 检查Django是否已安装 stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version") django_version = stdout.read().decode().strip() if django_version: self.result_ready.emit(True, f"Django已安装: {django_version}") logger.info(f"Django已安装: {django_version}") return self.progress_updated.emit(30) # 尝试使用pip安装 stdin, stdout, stderr = self.ssh_client.exec_command("pip3 install --break-system-packages django") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: self.progress_updated.emit(90) stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version") django_version = stdout.read().decode().strip() self.result_ready.emit(True, f"Django安装成功: {django_version}") logger.info(f"Django安装成功: {django_version}") return self.progress_updated.emit(50) # 如果pip安装失败,尝试使用apt安装,使用-S选项从标准输入读取密码 stdin, stdout, stderr = self.ssh_client.exec_command(f"echo '{self.password}' | sudo -S apt update && echo '{self.password}' | sudo -S apt install -y python3-django") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: self.progress_updated.emit(90) stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version") django_version = stdout.read().decode().strip() self.result_ready.emit(True, f"Django安装成功: {django_version}") logger.info(f"Django安装成功: {django_version}") else: error = stderr.read().decode() self.result_ready.emit(False, f"Django安装失败: {error}") logger.error(f"Django安装失败: {error}") except Exception as e: error_msg = str(e) self.result_ready.emit(False, error_msg) logger.error(f"Django安装异常: {error_msg}") class DjangoTestThread(QThread): result_ready = Signal(bool, str) progress_updated = Signal(int) def __init__(self, ssh_client, django_path): super().__init__() self.ssh_client = ssh_client self.django_path = django_path def run(self): try: self.progress_updated.emit(10) # 检查Django项目是否存在 stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/manage.py") exit_status = stdout.channel.recv_exit_status() if exit_status != 0: self.result_ready.emit(False, f"Django项目不存在: {self.django_path}") logger.error(f"Django项目不存在: {self.django_path}") return self.progress_updated.emit(30) # 检查requirements.txt stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/requirements.txt") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: self.progress_updated.emit(50) # 安装依赖 stdin, stdout, stderr = self.ssh_client.exec_command(f"cd {self.django_path} && pip3 install --break-system-packages -r requirements.txt") exit_status = stdout.channel.recv_exit_status() if exit_status != 0: error = stderr.read().decode() self.result_ready.emit(False, f"依赖安装失败: {error}") logger.error(f"依赖安装失败: {error}") return self.progress_updated.emit(70) # 运行Django测试服务器 stdin, stdout, stderr = self.ssh_client.exec_command(f"cd {self.django_path} && timeout 10 python3 manage.py runserver 0.0.0.0:8000 --noreload &") time.sleep(3) # 等待服务器启动 # 检查服务器是否运行 stdin, stdout, stderr = self.ssh_client.exec_command("ps aux | grep 'manage.py runserver'") output = stdout.read().decode() if "manage.py runserver" in output: self.progress_updated.emit(80) logger.info("Django测试服务器启动成功,开始检查端口访问情况") # 检查8000端口是否真的在监听 self.progress_updated.emit(85) stdin, stdout, stderr = self.ssh_client.exec_command("ss -tulnp | grep 8000") ss_output = stdout.read().decode() logger.info(f"ss命令检查端口8000结果: {ss_output}") # 如果ss命令没有结果,尝试使用netstat if not ss_output: stdin, stdout, stderr = self.ssh_client.exec_command("netstat -tulnp | grep 8000") netstat_output = stdout.read().decode() logger.info(f"netstat命令检查端口8000结果: {netstat_output}") port_check_output = netstat_output else: port_check_output = ss_output # 检查端口是否在LISTEN状态 if "LISTEN" in port_check_output and "8000" in port_check_output: self.progress_updated.emit(90) logger.info("端口8000处于LISTEN状态,开始本地请求测试") # 本地发起请求测试 stdin, stdout, stderr = self.ssh_client.exec_command("curl -s http://127.0.0.1:8000") curl_output = stdout.read().decode() curl_exit_status = stdout.channel.recv_exit_status() logger.info(f"curl本地请求测试退出状态: {curl_exit_status}") logger.info(f"curl本地请求测试输出: {curl_output[:200]}..." if len(curl_output) > 200 else f"curl本地请求测试输出: {curl_output}") if curl_exit_status == 0: self.progress_updated.emit(100) self.result_ready.emit(True, "Django测试服务器启动成功,端口访问正常") logger.info("Django测试服务器启动成功,端口访问正常") else: # 检查错误类型 if "Failed to connect" in curl_output: error_msg = "Django测试服务器启动成功,但本地请求超时,Gunicorn进程可能未正常响应" elif "Bad Request (400)" in curl_output: error_msg = "Django测试服务器启动成功,但返回400错误,可能是ALLOWED_HOSTS配置问题" elif "500" in curl_output: error_msg = "Django测试服务器启动成功,但返回500错误,可能是应用内部错误" else: error_msg = f"Django测试服务器启动成功,但本地请求失败: {curl_output[:100]}" self.result_ready.emit(False, error_msg) logger.error(error_msg) else: self.progress_updated.emit(100) self.result_ready.emit(False, "Django测试服务器启动成功,但端口8000未处于LISTEN状态") logger.error("Django测试服务器启动成功,但端口8000未处于LISTEN状态") else: self.result_ready.emit(False, "Django测试服务器启动失败") logger.error("Django测试服务器启动失败") except Exception as e: error_msg = str(e) self.result_ready.emit(False, error_msg) logger.error(f"Django测试异常: {error_msg}") class DownloadSettingsThread(QThread): result_ready = Signal(bool, str) def __init__(self, ssh_client, django_path): super().__init__() self.ssh_client = ssh_client self.django_path = django_path def run(self): try: # 查找settings.py文件 stdin, stdout, stderr = self.ssh_client.exec_command(f"find {self.django_path} -name settings.py") exit_status = stdout.channel.recv_exit_status() if exit_status != 0: self.result_ready.emit(False, "未找到settings.py文件") logger.error("未找到settings.py文件") return settings_path = stdout.read().decode().strip() # 下载settings.py内容 stdin, stdout, stderr = self.ssh_client.exec_command(f"cat {settings_path}") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: settings_content = stdout.read().decode() self.result_ready.emit(True, settings_content) logger.info("settings.py下载成功") else: error = stderr.read().decode() self.result_ready.emit(False, error) logger.error(f"settings.py下载失败: {error}") except Exception as e: error_msg = str(e) self.result_ready.emit(False, error_msg) logger.error(f"settings.py下载异常: {error_msg}") class UploadSettingsThread(QThread): result_ready = Signal(bool, str) def __init__(self, ssh_client, django_path, settings_content): super().__init__() self.ssh_client = ssh_client self.django_path = django_path self.settings_content = settings_content def run(self): try: # 查找settings.py文件 stdin, stdout, stderr = self.ssh_client.exec_command(f"find {self.django_path} -name settings.py") exit_status = stdout.channel.recv_exit_status() if exit_status != 0: self.result_ready.emit(False, "未找到settings.py文件") logger.error("未找到settings.py文件") return settings_path = stdout.read().decode().strip() # 创建临时文件 temp_file = "/tmp/settings_upload.py" sftp = self.ssh_client.open_sftp() with sftp.file(temp_file, 'w') as f: f.write(self.settings_content) # 移动到目标位置 stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo mv {temp_file} {settings_path}") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: self.result_ready.emit(True, "settings.py上传成功") logger.info("settings.py上传成功") else: error = stderr.read().decode() self.result_ready.emit(False, error) logger.error(f"settings.py上传失败: {error}") sftp.close() except Exception as e: error_msg = str(e) self.result_ready.emit(False, error_msg) logger.error(f"settings.py上传异常: {error_msg}") class CollectStaticThread(QThread): result_ready = Signal(bool, str) progress_updated = Signal(int) def __init__(self, ssh_client, django_path): super().__init__() self.ssh_client = ssh_client self.django_path = django_path def run(self): try: self.progress_updated.emit(10) # 检查Django项目是否存在 stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/manage.py") exit_status = stdout.channel.recv_exit_status() if exit_status != 0: self.result_ready.emit(False, f"Django项目不存在: {self.django_path}") logger.error(f"Django项目不存在: {self.django_path}") return self.progress_updated.emit(30) # 收集静态文件 stdin, stdout, stderr = self.ssh_client.exec_command(f"cd {self.django_path} && python3 manage.py collectstatic --noinput") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: self.progress_updated.emit(100) output = stdout.read().decode() self.result_ready.emit(True, f"静态文件收集成功: {output}") logger.info("静态文件收集成功") else: error = stderr.read().decode() self.result_ready.emit(False, error) logger.error(f"静态文件收集失败: {error}") except Exception as e: error_msg = str(e) self.result_ready.emit(False, error_msg) logger.error(f"静态文件收集异常: {error_msg}") class CheckDjangoStatusThread(QThread): result_ready = Signal(bool, str) def __init__(self, ssh_client, django_path): super().__init__() self.ssh_client = ssh_client self.django_path = django_path def run(self): try: status_info = [] # 检查Django项目是否存在 stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/manage.py") exit_status = stdout.channel.recv_exit_status() if exit_status != 0: self.result_ready.emit(False, f"Django项目不存在: {self.django_path}") logger.error(f"Django项目不存在: {self.django_path}") return status_info.append("✓ Django项目存在") # 检查Django版本 stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version") django_version = stdout.read().decode().strip() status_info.append(f"✓ Django版本: {django_version}") # 检查requirements.txt stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/requirements.txt") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: status_info.append("✓ requirements.txt存在") else: status_info.append("✗ requirements.txt不存在") # 检查settings.py stdin, stdout, stderr = self.ssh_client.exec_command(f"find {self.django_path} -name settings.py") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: status_info.append("✓ settings.py存在") else: status_info.append("✗ settings.py不存在") # 检查静态文件目录 stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/static") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: status_info.append("✓ static目录存在") else: status_info.append("✗ static目录不存在") status_text = "\n".join(status_info) self.result_ready.emit(True, status_text) logger.info("Django状态检查完成") except Exception as e: error_msg = str(e) self.result_ready.emit(False, error_msg) logger.error(f"Django状态检查异常: {error_msg}") class GunicornInstallThread(QThread): result_ready = Signal(bool, str) progress_updated = Signal(int) def __init__(self, ssh_client, password=None): super().__init__() self.ssh_client = ssh_client self.password = password def run(self): try: self.progress_updated.emit(10) # 检查Gunicorn是否已安装 logger.info("检查Gunicorn是否已安装") stdin, stdout, stderr = self.ssh_client.exec_command("gunicorn --version") exit_status = stdout.channel.recv_exit_status() gunicorn_version = stdout.read().decode().strip() version_error = stderr.read().decode() logger.info(f"Gunicorn版本检查状态: {exit_status}") logger.info(f"Gunicorn版本信息: {gunicorn_version}") if version_error: logger.error(f"Gunicorn版本检查错误: {version_error}") if gunicorn_version: self.result_ready.emit(True, f"Gunicorn已安装: {gunicorn_version}") logger.info(f"Gunicorn已安装: {gunicorn_version}") return self.progress_updated.emit(30) # 尝试使用pip安装 logger.info("开始使用pip安装Gunicorn") stdin, stdout, stderr = self.ssh_client.exec_command("pip3 install gunicorn") exit_status = stdout.channel.recv_exit_status() install_output = stdout.read().decode() install_error = stderr.read().decode() logger.info(f"Gunicorn pip安装命令执行状态: {exit_status}") logger.info(f"Gunicorn pip安装输出: {install_output}") if install_error: logger.error(f"Gunicorn pip安装错误: {install_error}") if exit_status == 0: self.progress_updated.emit(90) # 验证安装 logger.info("验证Gunicorn安装") stdin, stdout, stderr = self.ssh_client.exec_command("gunicorn --version") version_exit_status = stdout.channel.recv_exit_status() gunicorn_version = stdout.read().decode().strip() version_error = stderr.read().decode() logger.info(f"Gunicorn版本检查状态: {version_exit_status}") logger.info(f"Gunicorn版本信息: {gunicorn_version}") if version_error: logger.error(f"Gunicorn版本检查错误: {version_error}") if gunicorn_version: self.result_ready.emit(True, f"Gunicorn安装成功: {gunicorn_version}") logger.info(f"Gunicorn安装成功: {gunicorn_version}") else: self.result_ready.emit(False, "Gunicorn安装后无法获取版本信息") logger.error("Gunicorn安装后无法获取版本信息") return self.progress_updated.emit(50) # 如果pip安装失败,尝试使用apt安装 logger.info("pip安装失败,尝试使用apt安装Gunicorn") if self.password: logger.info("使用密码进行sudo apt安装") stdin, stdout, stderr = self.ssh_client.exec_command("sudo -S apt update && sudo -S apt install -y gunicorn") stdin.write(f"{self.password}\n") stdin.flush() else: logger.info("无密码进行sudo apt安装") stdin, stdout, stderr = self.ssh_client.exec_command("sudo apt update && sudo apt install -y gunicorn") exit_status = stdout.channel.recv_exit_status() apt_output = stdout.read().decode() apt_error = stderr.read().decode() logger.info(f"Gunicorn apt安装命令执行状态: {exit_status}") logger.info(f"Gunicorn apt安装输出: {apt_output}") if apt_error: logger.error(f"Gunicorn apt安装错误: {apt_error}") if exit_status == 0: self.progress_updated.emit(90) # 验证安装 logger.info("验证apt安装的Gunicorn") stdin, stdout, stderr = self.ssh_client.exec_command("gunicorn --version") version_exit_status = stdout.channel.recv_exit_status() gunicorn_version = stdout.read().decode().strip() version_error = stderr.read().decode() logger.info(f"Gunicorn版本检查状态: {version_exit_status}") logger.info(f"Gunicorn版本信息: {gunicorn_version}") if version_error: logger.error(f"Gunicorn版本检查错误: {version_error}") if gunicorn_version: self.result_ready.emit(True, f"Gunicorn安装成功: {gunicorn_version}") logger.info(f"Gunicorn安装成功: {gunicorn_version}") else: self.result_ready.emit(False, "Gunicorn安装后无法获取版本信息") logger.error("Gunicorn安装后无法获取版本信息") else: error = stderr.read().decode() self.result_ready.emit(False, f"Gunicorn安装失败: {error}") logger.error(f"Gunicorn安装失败: {error}") except Exception as e: error_msg = str(e) self.result_ready.emit(False, error_msg) logger.error(f"Gunicorn安装异常: {error_msg}") class GunicornTestThread(QThread): result_ready = Signal(bool, str) progress_updated = Signal(int) def __init__(self, ssh_client, django_path, port="8000"): super().__init__() self.ssh_client = ssh_client self.django_path = django_path self.port = port logger.info(f"GunicornTestThread初始化 - Django路径: {django_path}, 端口: {port}") def run(self): try: self.progress_updated.emit(10) # 检查Django项目是否存在 stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {self.django_path}/manage.py") exit_status = stdout.channel.recv_exit_status() if exit_status != 0: self.result_ready.emit(False, f"Django项目不存在: {self.django_path}") logger.error(f"Django项目不存在: {self.django_path}") return self.progress_updated.emit(30) # 检查wsgi.py文件 - 先尝试项目根目录 stdin, stdout, stderr = self.ssh_client.exec_command(f"find {self.django_path} -name wsgi.py -type f") wsgi_files = stdout.read().decode().strip().split('\n') if not wsgi_files or wsgi_files == ['']: self.result_ready.emit(False, f"未找到wsgi.py文件") logger.error(f"未找到wsgi.py文件") return # 使用第一个找到的wsgi.py文件 wsgi_file = wsgi_files[0].strip() logger.info(f"找到wsgi.py文件: {wsgi_file}") # 获取项目目录的相对路径 project_dir = os.path.dirname(wsgi_file) project_name = os.path.basename(project_dir) # 如果wsgi.py在子目录中,调整项目名 if project_dir != self.django_path.rstrip('/'): # wsgi.py在子目录中,使用子目录名作为项目名 project_name = os.path.basename(project_dir) else: # wsgi.py在项目根目录中,使用manage.py所在目录名作为项目名 project_name = os.path.basename(self.django_path.rstrip('/')) logger.info(f"使用项目名: {project_name}, 工作目录: {self.django_path}") self.progress_updated.emit(50) # 在启动Gunicorn前,先检查端口是否被占用 logger.info(f"检查端口{self.port}是否被其他进程占用") stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo lsof -i :{self.port}") lsof_output = stdout.read().decode() lsof_exit_status = stdout.channel.recv_exit_status() if lsof_exit_status == 0 and lsof_output: logger.warning(f"端口{self.port}已被其他进程占用: {lsof_output}") # 尝试杀死占用端口的进程 stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo lsof -t -i :{self.port} | xargs sudo kill -9") time.sleep(1) # 等待进程被杀死 logger.info(f"已尝试杀死占用端口{self.port}的进程") else: logger.info(f"端口{self.port}未被占用") # 测试Gunicorn启动,使用构造函数中传入的端口参数,指定4个worker test_command = f"cd {self.django_path} && gunicorn --workers 4 --bind 0.0.0.0:{self.port} {project_name}.wsgi:application --timeout 5" logger.info(f"执行Gunicorn测试命令: {test_command}") stdin, stdout, stderr = self.ssh_client.exec_command(test_command) time.sleep(3) # 等待Gunicorn启动 # 检查Gunicorn进程状态 stdin, stdout, stderr = self.ssh_client.exec_command("ps aux | grep gunicorn") output = stdout.read().decode() logger.info(f"Gunicorn进程检查结果: {output}") # 检查Worker进程状态 worker_count = output.count('gunicorn') - 1 # 减去grep进程本身 logger.info(f"检测到Gunicorn Worker进程数量: {worker_count}") # 详细分析进程结构 lines = output.strip().split('\n') master_process = None worker_processes = [] for line in lines: if 'grep' not in line and 'gunicorn' in line: parts = line.split() if len(parts) >= 11: pid = parts[1] command = ' '.join(parts[10:]) if 'master' in command or 'gunicorn: master' in command: master_process = (pid, command) elif 'worker' in command: worker_processes.append((pid, command)) if master_process: logger.info(f"检测到Gunicorn主进程 - PID: {master_process[0]}, 命令: {master_process[1]}") else: logger.warning(f"未检测到Gunicorn主进程") if worker_processes: logger.info(f"检测到{len(worker_processes)}个Gunicorn Worker进程:") for pid, command in worker_processes: logger.info(f" - Worker PID: {pid}, 命令: {command}") else: logger.warning(f"未检测到Gunicorn Worker进程") # 检查进程数量是否符合预期(1个主进程+4个worker进程) if master_process and len(worker_processes) == 4: logger.info(f"Gunicorn进程结构正常: 1个主进程 + 4个worker进程") elif master_process and len(worker_processes) > 0: logger.warning(f"Gunicorn进程结构部分正常: 1个主进程 + {len(worker_processes)}个worker进程(预期4个)") else: logger.error(f"Gunicorn进程结构异常: 未检测到完整的进程结构") if worker_count < 1: logger.error(f"Gunicorn主进程存在但Worker进程未启动,可能是Django代码/配置错误") # 尝试手动启动Gunicorn并获取详细错误日志 logger.info(f"尝试手动启动Gunicorn以获取详细错误日志") stdin, stdout, stderr = self.ssh_client.exec_command(f"cd {self.django_path} && gunicorn --workers 4 --bind 0.0.0.0:{self.port} {project_name}.wsgi:application --timeout 5 --error-logfile -") time.sleep(2) error_output = stderr.read().decode() logger.error(f"手动启动Gunicorn错误日志: {error_output}") # 清理测试进程 stdin, stdout, stderr = self.ssh_client.exec_command(f"pkill -f 'gunicorn.*{self.port}'") if "gunicorn" in output and f":{self.port}" in output: self.progress_updated.emit(80) logger.info(f"Gunicorn进程运行正常,开始检查端口{self.port}访问情况") # 检查端口是否真的在监听 self.progress_updated.emit(85) stdin, stdout, stderr = self.ssh_client.exec_command(f"ss -tulnp | grep {self.port}") ss_output = stdout.read().decode() logger.info(f"ss命令检查端口{self.port}结果: {ss_output}") # 如果ss命令没有结果,尝试使用netstat if not ss_output: stdin, stdout, stderr = self.ssh_client.exec_command(f"netstat -tulnp | grep {self.port}") netstat_output = stdout.read().decode() logger.info(f"netstat命令检查端口{self.port}结果: {netstat_output}") port_check_output = netstat_output else: port_check_output = ss_output # 检查端口是否在LISTEN状态 if "LISTEN" in port_check_output and f":{self.port}" in port_check_output: self.progress_updated.emit(90) logger.info(f"端口{self.port}处于LISTEN状态,开始本地请求测试") # 本地发起请求测试 stdin, stdout, stderr = self.ssh_client.exec_command(f"curl -s http://127.0.0.1:{self.port}") curl_output = stdout.read().decode() curl_exit_status = stdout.channel.recv_exit_status() logger.info(f"curl本地请求测试退出状态: {curl_exit_status}") logger.info(f"curl本地请求测试输出: {curl_output[:200]}..." if len(curl_output) > 200 else f"curl本地请求测试输出: {curl_output}") if curl_exit_status == 0: self.progress_updated.emit(100) self.result_ready.emit(True, f"Gunicorn测试成功 - 项目: {project_name}, 端口: {self.port}, 访问正常") logger.info(f"Gunicorn测试成功 - 项目: {project_name}, 端口: {self.port}, 访问正常") else: # 检查错误类型 if "Failed to connect" in curl_output: error_msg = f"Gunicorn进程运行正常,但本地请求超时,端口{self.port}未响应" elif "Bad Request (400)" in curl_output: error_msg = f"Gunicorn进程运行正常,但返回400错误,可能是ALLOWED_HOSTS配置问题" elif "500" in curl_output: error_msg = f"Gunicorn进程运行正常,但返回500错误,可能是应用内部错误" else: error_msg = f"Gunicorn进程运行正常,但本地请求失败: {curl_output[:100]}" self.result_ready.emit(False, error_msg) logger.error(error_msg) else: self.progress_updated.emit(100) self.result_ready.emit(False, f"Gunicorn进程运行正常,但端口{self.port}未处于LISTEN状态") logger.error(f"Gunicorn进程运行正常,但端口{self.port}未处于LISTEN状态") else: # 尝试更简单的测试方式 self.progress_updated.emit(80) test_command2 = f"cd {self.django_path} && python3 -c \"import {project_name}.wsgi; print('WSGI导入成功')\"" logger.info(f"执行WSGI导入测试: {test_command2}") stdin, stdout, stderr = self.ssh_client.exec_command(test_command2) exit_status = stdout.channel.recv_exit_status() if exit_status == 0: wsgi_output = stdout.read().decode() logger.info(f"WSGI导入测试成功: {wsgi_output}") self.result_ready.emit(True, f"WSGI配置验证成功 - 项目: {project_name}") logger.info(f"WSGI配置验证成功 - 项目: {project_name}") else: error_output = stderr.read().decode() logger.error(f"Gunicorn测试失败: {error_output}") self.result_ready.emit(False, f"Gunicorn测试失败: {error_output}") except Exception as e: error_msg = str(e) self.result_ready.emit(False, error_msg) logger.error(f"Gunicorn测试异常: {error_msg}") class UploadGunicornServiceThread(QThread): result_ready = Signal(bool, str) def __init__(self, ssh_client, service_name, service_content, password=None): super().__init__() self.ssh_client = ssh_client self.service_name = service_name self.service_content = service_content self.password = password def run(self): try: # 创建服务文件 service_file = f"/etc/systemd/system/{self.service_name}.service" temp_file = f"/tmp/{self.service_name}.service" logger.info(f"准备上传服务文件: {service_file}") sftp = self.ssh_client.open_sftp() with sftp.file(temp_file, 'w') as f: f.write(self.service_content) logger.info(f"临时服务文件创建成功: {temp_file}") # 移动到systemd目录 if self.password: stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S mv {temp_file} {service_file}") stdin.write(f"{self.password}\n") stdin.flush() else: stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo mv {temp_file} {service_file}") exit_status = stdout.channel.recv_exit_status() if exit_status != 0: error = stderr.read().decode() logger.error(f"服务文件移动失败: {error}") self.result_ready.emit(False, f"服务文件移动失败: {error}") sftp.close() return logger.info(f"服务文件移动成功: {service_file}") # 设置权限 if self.password: stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S chmod 644 {service_file}") stdin.write(f"{self.password}\n") stdin.flush() else: stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo chmod 644 {service_file}") exit_status = stdout.channel.recv_exit_status() if exit_status != 0: error = stderr.read().decode() logger.error(f"权限设置失败: {error}") self.result_ready.emit(False, f"权限设置失败: {error}") sftp.close() return logger.info(f"服务文件权限设置成功: {service_file}") # 验证服务文件是否存在 stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {service_file}") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: file_info = stdout.read().decode().strip() logger.info(f"服务文件验证成功: {file_info}") else: error = stderr.read().decode() logger.error(f"服务文件验证失败: {error}") self.result_ready.emit(False, f"服务文件验证失败: {error}") sftp.close() return # 重新加载systemd if self.password: stdin, stdout, stderr = self.ssh_client.exec_command("sudo -S systemctl daemon-reload") stdin.write(f"{self.password}\n") stdin.flush() else: stdin, stdout, stderr = self.ssh_client.exec_command("sudo systemctl daemon-reload") exit_status = stdout.channel.recv_exit_status() if exit_status != 0: error = stderr.read().decode() logger.error(f"systemd重新加载失败: {error}") self.result_ready.emit(False, f"systemd重新加载失败: {error}") sftp.close() return logger.info(f"systemd重新加载成功") # 验证服务是否被systemd识别 stdin, stdout, stderr = self.ssh_client.exec_command(f"systemctl list-unit-files | grep {self.service_name}") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: service_info = stdout.read().decode().strip() logger.info(f"服务被systemd识别: {service_info}") else: error = stderr.read().decode() logger.warning(f"服务未被systemd识别: {error}") sftp.close() self.result_ready.emit(True, f"服务文件上传成功: {service_file}") logger.info(f"服务文件上传成功: {service_file}") except Exception as e: error_msg = str(e) logger.error(f"服务文件上传异常: {error_msg}") self.result_ready.emit(False, error_msg) class ManageGunicornServiceThread(QThread): result_ready = Signal(bool, str) def __init__(self, ssh_client, service_name, action, password=None, port="8000"): super().__init__() self.ssh_client = ssh_client self.service_name = service_name self.action = action self.password = password self.port = port logger.info(f"ManageGunicornServiceThread初始化 - 服务名: {service_name}, 操作: {action}, 端口: {port}") def run(self): try: # 构建systemd命令 # 注意:systemd命令中不需要.service后缀 service_name_for_systemd = self.service_name if service_name_for_systemd.endswith('.service'): service_name_for_systemd = service_name_for_systemd[:-8] # 移除.service后缀 cmd = f"systemctl {self.action} {service_name_for_systemd}" logger.info(f"执行服务管理命令: {cmd}") # 首先检查服务文件是否存在 service_file = f"/etc/systemd/system/{self.service_name}.service" stdin, stdout, stderr = self.ssh_client.exec_command(f"ls -la {service_file}") exit_status = stdout.channel.recv_exit_status() if exit_status != 0: error = stderr.read().decode() logger.error(f"服务文件不存在: {service_file}, 错误: {error}") self.result_ready.emit(False, f"服务文件不存在: {self.service_name}.service") return file_info = stdout.read().decode().strip() logger.info(f"服务文件存在: {file_info}") if self.action == "status": # 状态命令需要特殊处理 if self.password: stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S {cmd}") # 立即写入密码并关闭stdin stdin.write(f"{self.password}\n") stdin.close() # 等待命令执行完成 exit_status = stdout.channel.recv_exit_status() logger.info(f"服务状态查询命令执行完成,退出状态: {exit_status}") # 读取输出和错误 output = stdout.read().decode() error = stderr.read().decode() # 检查是否仍然提示输入密码 if "password for" in error.lower() and exit_status != 0: logger.error(f"密码传递失败,仍然提示输入密码: {error}") self.result_ready.emit(False, error) return else: stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo {cmd}") exit_status = stdout.channel.recv_exit_status() # 读取输出和错误 output = stdout.read().decode() error = stderr.read().decode() if exit_status == 0: logger.info(f"服务状态查询成功: {service_name_for_systemd}") self.result_ready.emit(True, output) else: logger.error(f"服务状态查询失败: {error}") self.result_ready.emit(False, error) else: # 其他操作(start, stop, restart, enable, disable) if self.password: stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S {cmd}") # 立即写入密码并关闭stdin stdin.write(f"{self.password}\n") stdin.close() # 等待命令执行完成 exit_status = stdout.channel.recv_exit_status() logger.info(f"服务{self.action}命令执行完成,退出状态: {exit_status}") # 读取错误输出 error = stderr.read().decode() # 检查是否仍然提示输入密码 if "password for" in error.lower() and exit_status != 0: logger.error(f"密码传递失败,仍然提示输入密码: {error}") self.result_ready.emit(False, error) return else: stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo {cmd}") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: logger.info(f"服务{self.action}成功: {service_name_for_systemd}") # 如果是启动操作,额外检查服务状态 if self.action == "start": time.sleep(2) # 等待服务完全启动 stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo systemctl status {service_name_for_systemd}") status_exit_status = stdout.channel.recv_exit_status() status_output = stdout.read().decode() status_error = stderr.read().decode() if status_exit_status == 0: logger.info(f"服务启动后状态检查成功: {status_output}") # 检查端口是否被监听,从服务名称中提取端口 port = "8000" # 默认端口 if hasattr(self, 'port') and self.port: port = self.port # 在检查端口监听前,先检查端口占用情况 logger.info(f"检查端口{port}占用情况") stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo lsof -i :{port}") lsof_output = stdout.read().decode() lsof_exit_status = stdout.channel.recv_exit_status() if lsof_exit_status == 0 and lsof_output: logger.warning(f"端口{port}被占用: {lsof_output}") else: logger.info(f"端口{port}未被占用") # 检查Gunicorn进程状态 stdin, stdout, stderr = self.ssh_client.exec_command("ps aux | grep gunicorn") ps_output = stdout.read().decode() logger.info(f"Gunicorn进程状态: {ps_output}") # 详细分析进程结构 lines = ps_output.strip().split('\n') master_process = None worker_processes = [] for line in lines: if 'grep' not in line and 'gunicorn' in line: parts = line.split() if len(parts) >= 11: pid = parts[1] command = ' '.join(parts[10:]) if 'master' in command or 'gunicorn: master' in command: master_process = (pid, command) elif 'worker' in command: worker_processes.append((pid, command)) if master_process: logger.info(f"检测到Gunicorn主进程 - PID: {master_process[0]}, 命令: {master_process[1]}") else: logger.warning(f"未检测到Gunicorn主进程") if worker_processes: logger.info(f"检测到{len(worker_processes)}个Gunicorn Worker进程:") for pid, command in worker_processes: logger.info(f" - Worker PID: {pid}, 命令: {command}") else: logger.warning(f"未检测到Gunicorn Worker进程") # 检查进程数量是否符合预期(1个主进程+4个worker进程) if master_process and len(worker_processes) == 4: logger.info(f"Gunicorn进程结构正常: 1个主进程 + 4个worker进程") elif master_process and len(worker_processes) > 0: logger.warning(f"Gunicorn进程结构部分正常: 1个主进程 + {len(worker_processes)}个worker进程(预期4个)") else: logger.error(f"Gunicorn进程结构异常: 未检测到完整的进程结构") # 检查Worker进程状态 worker_count = ps_output.count('gunicorn') - 1 # 减去grep进程本身 logger.info(f"检测到Gunicorn Worker进程数量: {worker_count}") if worker_count < 1: logger.error(f"Gunicorn主进程存在但Worker进程未启动,可能是Django代码/配置错误") # 使用ss命令检查端口监听状态 stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo ss -tulnp | grep :{port}") ss_output = stdout.read().decode() logger.info(f"ss命令检查端口{port}监听状态: {ss_output}") # 如果ss命令没有结果,尝试使用netstat if not ss_output: stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo netstat -tlnp | grep :{port}") netstat_output = stdout.read().decode() logger.info(f"netstat命令检查端口{port}监听状态: {netstat_output}") port_check_output = netstat_output else: port_check_output = ss_output # 检查端口是否在LISTEN状态 if "LISTEN" in port_check_output and f":{port}" in port_check_output: logger.info(f"端口{port}处于LISTEN状态,开始本地请求测试") # 本地发起请求测试 stdin, stdout, stderr = self.ssh_client.exec_command(f"curl -s http://127.0.0.1:{port}") curl_output = stdout.read().decode() curl_exit_status = stdout.channel.recv_exit_status() logger.info(f"curl本地请求测试退出状态: {curl_exit_status}") logger.info(f"curl本地请求测试输出: {curl_output[:200]}..." if len(curl_output) > 200 else f"curl本地请求测试输出: {curl_output}") if curl_exit_status == 0: logger.info(f"服务启动成功,端口{port}访问正常") else: # 检查错误类型 if "Failed to connect" in curl_output: logger.warning(f"服务启动成功,但本地请求超时,端口{port}未响应") elif "Bad Request (400)" in curl_output: logger.warning(f"服务启动成功,但返回400错误,可能是ALLOWED_HOSTS配置问题") elif "500" in curl_output: logger.warning(f"服务启动成功,但返回500错误,可能是应用内部错误") else: logger.warning(f"服务启动成功,但本地请求失败: {curl_output[:100]}") else: logger.warning(f"服务启动成功,但端口{port}未处于LISTEN状态") # 检查SELinux状态 logger.info(f"检查SELinux状态") stdin, stdout, stderr = self.ssh_client.exec_command("getenforce") selinux_output = stdout.read().decode().strip() logger.info(f"SELinux状态: {selinux_output}") if selinux_output == "Enforcing": logger.warning(f"SELinux处于Enforcing状态,可能限制了端口绑定") # 检查项目目录权限 logger.info(f"检查项目目录权限") stdin, stdout, stderr = self.ssh_client.exec_command("ls -ld /home/xiaji/") home_dir_output = stdout.read().decode().strip() logger.info(f"/home/xiaji/目录权限: {home_dir_output}") # 尝试更换端口测试 logger.info(f"尝试更换端口测试") test_port = "8001" stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo ss -tulnp | grep :{test_port}") test_port_output = stdout.read().decode() if not test_port_output: logger.info(f"端口{test_port}未被占用,尝试用该端口启动Gunicorn") # 这里只是记录日志,不实际更换端口 else: logger.warning(f"服务启动后状态检查失败: {status_error}") self.result_ready.emit(True, f"服务{self.action}成功: {service_name_for_systemd}") else: error = stderr.read().decode() logger.error(f"服务{self.action}失败: {error}") self.result_ready.emit(False, error) except Exception as e: error_msg = str(e) logger.error(f"服务管理异常: {error_msg}") self.result_ready.emit(False, error_msg)