import os import sys import time from PySide6.QtCore import QThread, Signal from loguru import logger class DjangoInstallThread(QThread): """安装Django的线程""" result_ready = Signal(bool, str) progress_updated = Signal(int) def __init__(self, ssh_client, password): super().__init__() self.ssh_client = ssh_client self.password = password def run(self): try: self.progress_updated.emit(10) # 检查Django是否已安装 stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version") django_version = stdout.read().decode().strip() if django_version: self.result_ready.emit(True, f"Django已安装: {django_version}") logger.info(f"Django已安装: {django_version}") return self.progress_updated.emit(30) # 尝试使用pip安装 stdin, stdout, stderr = self.ssh_client.exec_command("pip3 install --break-system-packages django") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: self.progress_updated.emit(90) stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version") django_version = stdout.read().decode().strip() self.result_ready.emit(True, f"Django安装成功: {django_version}") logger.info(f"Django安装成功: {django_version}") return self.progress_updated.emit(50) # 如果pip安装失败,尝试使用apt安装,使用-S选项从标准输入读取密码 stdin, stdout, stderr = self.ssh_client.exec_command(f"echo '{self.password}' | sudo -S apt update && echo '{self.password}' | sudo -S apt install -y python3-django") exit_status = stdout.channel.recv_exit_status() if exit_status == 0: self.progress_updated.emit(90) stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version") django_version = stdout.read().decode().strip() self.result_ready.emit(True, f"Django安装成功: {django_version}") logger.info(f"Django安装成功: {django_version}") else: error = stderr.read().decode() self.result_ready.emit(False, f"Django安装失败: {error}") logger.error(f"Django安装失败: {error}") except Exception as e: error_msg = str(e) self.result_ready.emit(False, error_msg) logger.error(f"Django安装异常: {error_msg}") class DjangoCommandThread(QThread): """执行Django相关命令的线程""" output_signal = Signal(str) finished_signal = Signal() password_request_signal = Signal() # 请求密码的信号 def __init__(self, ssh_client, command, log_file=None): super().__init__() self.ssh_client = ssh_client self.command = command self.password = None self.waiting_for_password = False self.log_file = log_file # 日志文件路径,如果提供则将输出重定向到该文件 def set_password(self, password): self.password = password self.waiting_for_password = False def run(self): try: logger.info(f"执行Django命令: {self.command}") # 如果提供了日志文件路径,修改命令以重定向输出 if self.log_file: self.command = f"{self.command} > {self.log_file} 2>&1" self.output_signal.emit(f"命令输出将重定向到日志文件: {self.log_file}") # 如果命令包含sudo,修改为使用-S选项从标准输入读取密码 if "sudo" in self.command: command_with_sudo = self.command.replace("sudo", "sudo -S") stdin, stdout, stderr = self.ssh_client.exec_command(command_with_sudo) # 检查是否需要密码 password_prompt = False for line in iter(stderr.readline, ""): line_text = line.strip() self.output_signal.emit(line_text) if "password for" in line_text.lower() or "密码" in line_text: password_prompt = True break # 如果需要密码,请求用户输入 if password_prompt: self.waiting_for_password = True self.password_request_signal.emit() # 等待密码输入 while self.waiting_for_password: self.msleep(100) # 发送密码 if self.password: stdin.write(self.password + "\n") stdin.flush() self.output_signal.emit("密码已发送") else: self.output_signal.emit("未提供密码,命令可能失败") else: stdin, stdout, stderr = self.ssh_client.exec_command(self.command) # 对于apt命令,使用特殊处理以显示进度 if "apt install" in self.command: # 使用非交互模式并显示进度 self.output_signal.emit("正在安装软件包,请稍候...") # 读取输出和错误,实时显示 import select import time # 检查是否需要密码 password_sent = False while not stdout.channel.exit_status_ready(): # 检查是否有数据可读 r, w, e = select.select([stdout.channel, stderr.channel], [], [], 0.1) if stdout.channel in r: output = stdout.channel.recv(1024).decode('utf-8', errors='replace') if output: self.output_signal.emit(output.strip()) logger.info(f"命令输出: {output.strip()}") if stderr.channel in r: error = stderr.channel.recv(1024).decode('utf-8', errors='replace') if error: # 检查是否需要密码 if ("password for" in error.lower() or "密码" in error) and not password_sent: self.output_signal.emit("检测到需要输入密码") self.waiting_for_password = True self.password_request_signal.emit() # 等待密码输入 while self.waiting_for_password: self.msleep(100) # 发送密码 if self.password: stdin.write(self.password + "\n") stdin.flush() password_sent = True self.output_signal.emit("密码已发送") else: self.output_signal.emit("未提供密码,命令可能失败") else: self.output_signal.emit(error.strip()) logger.error(f"命令错误: {error.strip()}") # 短暂休眠以避免过度占用CPU time.sleep(0.01) # 读取剩余输出 while True: r, w, e = select.select([stdout.channel, stderr.channel], [], [], 0.1) if not r: break if stdout.channel in r: output = stdout.channel.recv(1024).decode('utf-8', errors='replace') if output: self.output_signal.emit(output.strip()) logger.info(f"命令输出: {output.strip()}") if stderr.channel in r: error = stderr.channel.recv(1024).decode('utf-8', errors='replace') if error: self.output_signal.emit(error.strip()) logger.error(f"命令错误: {error.strip()}") else: # 对于非apt命令,使用原有的行读取方式 # 读取输出 for line in iter(stdout.readline, ""): self.output_signal.emit(line.strip()) logger.info(f"命令输出: {line.strip()}") # 读取错误 for line in iter(stderr.readline, ""): line_text = line.strip() if "password for" not in line_text.lower() and "密码" not in line_text: # 避免重复显示密码提示 self.output_signal.emit(f"错误: {line_text}") logger.error(f"命令错误: {line_text}") # 检查退出状态 exit_status = stdout.channel.recv_exit_status() if exit_status == 0: self.output_signal.emit("命令执行成功") logger.info(f"命令执行成功: {self.command}") else: self.output_signal.emit(f"命令执行失败,退出状态: {exit_status}") logger.error(f"命令执行失败,退出状态: {exit_status}") except Exception as e: error_msg = f"执行命令时出错: {str(e)}" self.output_signal.emit(error_msg) logger.error(error_msg) finally: self.finished_signal.emit() class UploadSettingsThread(QThread): """上传settings.py文件的线程""" result_ready = Signal(bool, str) password_request_signal = Signal() # 请求密码的信号 def __init__(self, ssh_client, django_path, settings_content): super().__init__() self.ssh_client = ssh_client self.django_path = django_path self.settings_content = settings_content self.password = None self.waiting_for_password = False def set_password(self, password): self.password = password self.waiting_for_password = False def run(self): try: # 查找settings.py文件 stdin, stdout, stderr = self.ssh_client.exec_command(f"find {self.django_path} -name settings.py") exit_status = stdout.channel.recv_exit_status() if exit_status != 0: self.result_ready.emit(False, "未找到settings.py文件") logger.error("未找到settings.py文件") return settings_path = stdout.read().decode().strip() # 创建临时文件 temp_file = "/tmp/settings_upload.py" sftp = self.ssh_client.open_sftp() with sftp.file(temp_file, 'w') as f: f.write(self.settings_content) # 在覆盖前备份原文件(带时间戳) import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = f"{settings_path}.backup_{timestamp}" # 备份原文件 stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S cp {settings_path} {backup_path}") # 检查是否需要密码 password_sent = False while True: if stderr.channel.recv_ready(): error = stderr.channel.recv(1024).decode('utf-8', errors='replace') if error: # 检查是否需要密码 if ("password for" in error.lower() or "密码" in error) and not password_sent: self.waiting_for_password = True self.password_request_signal.emit() # 等待密码输入 while self.waiting_for_password: self.msleep(100) # 发送密码 if self.password: stdin.write(self.password + "\n") stdin.flush() password_sent = True else: self.result_ready.emit(False, "未提供密码,备份失败") logger.error("未提供密码,备份失败") sftp.close() return else: logger.error(f"备份错误: {error.strip()}") if stdout.channel.exit_status_ready(): break # 短暂休眠以避免过度占用CPU time.sleep(0.01) exit_status = stdout.channel.recv_exit_status() if exit_status != 0: error = stderr.read().decode() self.result_ready.emit(False, f"备份原文件失败: {error}") logger.error(f"备份原文件失败: {error}") sftp.close() return logger.info(f"原文件已备份至: {backup_path}") # 移动临时文件到目标位置 stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S mv {temp_file} {settings_path}") # 检查是否需要密码 password_sent = False while True: if stderr.channel.recv_ready(): error = stderr.channel.recv(1024).decode('utf-8', errors='replace') if error: # 检查是否需要密码 if ("password for" in error.lower() or "密码" in error) and not password_sent: self.waiting_for_password = True self.password_request_signal.emit() # 等待密码输入 while self.waiting_for_password: self.msleep(100) # 发送密码 if self.password: stdin.write(self.password + "\n") stdin.flush() password_sent = True else: self.result_ready.emit(False, "未提供密码,移动文件失败") logger.error("未提供密码,移动文件失败") sftp.close() return else: logger.error(f"移动文件错误: {error.strip()}") if stdout.channel.exit_status_ready(): break # 短暂休眠以避免过度占用CPU time.sleep(0.01) exit_status = stdout.channel.recv_exit_status() if exit_status == 0: self.result_ready.emit(True, f"settings.py上传成功,原文件已备份为: {os.path.basename(backup_path)}") logger.info(f"settings.py上传成功,原文件已备份为: {os.path.basename(backup_path)}") else: error = stderr.read().decode() self.result_ready.emit(False, error) logger.error(f"settings.py上传失败: {error}") sftp.close() except Exception as e: error_msg = str(e) self.result_ready.emit(False, error_msg) logger.error(f"settings.py上传异常: {error_msg}")