364 lines
16 KiB
Python
364 lines
16 KiB
Python
import os
|
||
import sys
|
||
import time
|
||
from PySide6.QtCore import QThread, Signal
|
||
from loguru import logger
|
||
|
||
class DjangoInstallThread(QThread):
|
||
"""安装Django的线程"""
|
||
result_ready = Signal(bool, str)
|
||
progress_updated = Signal(int)
|
||
|
||
def __init__(self, ssh_client, password):
|
||
super().__init__()
|
||
self.ssh_client = ssh_client
|
||
self.password = password
|
||
|
||
def run(self):
|
||
try:
|
||
self.progress_updated.emit(10)
|
||
|
||
# 检查Django是否已安装
|
||
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
|
||
django_version = stdout.read().decode().strip()
|
||
|
||
if django_version:
|
||
self.result_ready.emit(True, f"Django已安装: {django_version}")
|
||
logger.info(f"Django已安装: {django_version}")
|
||
return
|
||
|
||
self.progress_updated.emit(30)
|
||
|
||
# 尝试使用pip安装
|
||
stdin, stdout, stderr = self.ssh_client.exec_command("pip3 install --break-system-packages django")
|
||
exit_status = stdout.channel.recv_exit_status()
|
||
|
||
if exit_status == 0:
|
||
self.progress_updated.emit(90)
|
||
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
|
||
django_version = stdout.read().decode().strip()
|
||
self.result_ready.emit(True, f"Django安装成功: {django_version}")
|
||
logger.info(f"Django安装成功: {django_version}")
|
||
return
|
||
|
||
self.progress_updated.emit(50)
|
||
|
||
# 如果pip安装失败,尝试使用apt安装,使用-S选项从标准输入读取密码
|
||
stdin, stdout, stderr = self.ssh_client.exec_command(f"echo '{self.password}' | sudo -S apt update && echo '{self.password}' | sudo -S apt install -y python3-django")
|
||
exit_status = stdout.channel.recv_exit_status()
|
||
|
||
if exit_status == 0:
|
||
self.progress_updated.emit(90)
|
||
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
|
||
django_version = stdout.read().decode().strip()
|
||
self.result_ready.emit(True, f"Django安装成功: {django_version}")
|
||
logger.info(f"Django安装成功: {django_version}")
|
||
else:
|
||
error = stderr.read().decode()
|
||
self.result_ready.emit(False, f"Django安装失败: {error}")
|
||
logger.error(f"Django安装失败: {error}")
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
self.result_ready.emit(False, error_msg)
|
||
logger.error(f"Django安装异常: {error_msg}")
|
||
|
||
class DjangoCommandThread(QThread):
|
||
"""执行Django相关命令的线程"""
|
||
output_signal = Signal(str)
|
||
finished_signal = Signal()
|
||
password_request_signal = Signal() # 请求密码的信号
|
||
|
||
def __init__(self, ssh_client, command, log_file=None):
|
||
super().__init__()
|
||
self.ssh_client = ssh_client
|
||
self.command = command
|
||
self.password = None
|
||
self.waiting_for_password = False
|
||
self.log_file = log_file # 日志文件路径,如果提供则将输出重定向到该文件
|
||
|
||
def set_password(self, password):
|
||
self.password = password
|
||
self.waiting_for_password = False
|
||
|
||
def run(self):
|
||
try:
|
||
logger.info(f"执行Django命令: {self.command}")
|
||
|
||
# 如果提供了日志文件路径,修改命令以重定向输出
|
||
if self.log_file:
|
||
self.command = f"{self.command} > {self.log_file} 2>&1"
|
||
self.output_signal.emit(f"命令输出将重定向到日志文件: {self.log_file}")
|
||
|
||
# 如果命令包含sudo,修改为使用-S选项从标准输入读取密码
|
||
if "sudo" in self.command:
|
||
command_with_sudo = self.command.replace("sudo", "sudo -S")
|
||
stdin, stdout, stderr = self.ssh_client.exec_command(command_with_sudo)
|
||
|
||
# 检查是否需要密码
|
||
password_prompt = False
|
||
for line in iter(stderr.readline, ""):
|
||
line_text = line.strip()
|
||
self.output_signal.emit(line_text)
|
||
if "password for" in line_text.lower() or "密码" in line_text:
|
||
password_prompt = True
|
||
break
|
||
|
||
# 如果需要密码,请求用户输入
|
||
if password_prompt:
|
||
self.waiting_for_password = True
|
||
self.password_request_signal.emit()
|
||
|
||
# 等待密码输入
|
||
while self.waiting_for_password:
|
||
self.msleep(100)
|
||
|
||
# 发送密码
|
||
if self.password:
|
||
stdin.write(self.password + "\n")
|
||
stdin.flush()
|
||
self.output_signal.emit("密码已发送")
|
||
else:
|
||
self.output_signal.emit("未提供密码,命令可能失败")
|
||
else:
|
||
stdin, stdout, stderr = self.ssh_client.exec_command(self.command)
|
||
|
||
# 对于apt命令,使用特殊处理以显示进度
|
||
if "apt install" in self.command:
|
||
# 使用非交互模式并显示进度
|
||
self.output_signal.emit("正在安装软件包,请稍候...")
|
||
|
||
# 读取输出和错误,实时显示
|
||
import select
|
||
import time
|
||
|
||
# 检查是否需要密码
|
||
password_sent = False
|
||
|
||
while not stdout.channel.exit_status_ready():
|
||
# 检查是否有数据可读
|
||
r, w, e = select.select([stdout.channel, stderr.channel], [], [], 0.1)
|
||
|
||
if stdout.channel in r:
|
||
output = stdout.channel.recv(1024).decode('utf-8', errors='replace')
|
||
if output:
|
||
self.output_signal.emit(output.strip())
|
||
logger.info(f"命令输出: {output.strip()}")
|
||
|
||
if stderr.channel in r:
|
||
error = stderr.channel.recv(1024).decode('utf-8', errors='replace')
|
||
if error:
|
||
# 检查是否需要密码
|
||
if ("password for" in error.lower() or "密码" in error) and not password_sent:
|
||
self.output_signal.emit("检测到需要输入密码")
|
||
self.waiting_for_password = True
|
||
self.password_request_signal.emit()
|
||
|
||
# 等待密码输入
|
||
while self.waiting_for_password:
|
||
self.msleep(100)
|
||
|
||
# 发送密码
|
||
if self.password:
|
||
stdin.write(self.password + "\n")
|
||
stdin.flush()
|
||
password_sent = True
|
||
self.output_signal.emit("密码已发送")
|
||
else:
|
||
self.output_signal.emit("未提供密码,命令可能失败")
|
||
else:
|
||
self.output_signal.emit(error.strip())
|
||
logger.error(f"命令错误: {error.strip()}")
|
||
|
||
# 短暂休眠以避免过度占用CPU
|
||
time.sleep(0.01)
|
||
|
||
# 读取剩余输出
|
||
while True:
|
||
r, w, e = select.select([stdout.channel, stderr.channel], [], [], 0.1)
|
||
if not r:
|
||
break
|
||
|
||
if stdout.channel in r:
|
||
output = stdout.channel.recv(1024).decode('utf-8', errors='replace')
|
||
if output:
|
||
self.output_signal.emit(output.strip())
|
||
logger.info(f"命令输出: {output.strip()}")
|
||
|
||
if stderr.channel in r:
|
||
error = stderr.channel.recv(1024).decode('utf-8', errors='replace')
|
||
if error:
|
||
self.output_signal.emit(error.strip())
|
||
logger.error(f"命令错误: {error.strip()}")
|
||
else:
|
||
# 对于非apt命令,使用原有的行读取方式
|
||
# 读取输出
|
||
for line in iter(stdout.readline, ""):
|
||
self.output_signal.emit(line.strip())
|
||
logger.info(f"命令输出: {line.strip()}")
|
||
|
||
# 读取错误
|
||
for line in iter(stderr.readline, ""):
|
||
line_text = line.strip()
|
||
if "password for" not in line_text.lower() and "密码" not in line_text: # 避免重复显示密码提示
|
||
self.output_signal.emit(f"错误: {line_text}")
|
||
logger.error(f"命令错误: {line_text}")
|
||
|
||
# 检查退出状态
|
||
exit_status = stdout.channel.recv_exit_status()
|
||
if exit_status == 0:
|
||
self.output_signal.emit("命令执行成功")
|
||
logger.info(f"命令执行成功: {self.command}")
|
||
else:
|
||
self.output_signal.emit(f"命令执行失败,退出状态: {exit_status}")
|
||
logger.error(f"命令执行失败,退出状态: {exit_status}")
|
||
except Exception as e:
|
||
error_msg = f"执行命令时出错: {str(e)}"
|
||
self.output_signal.emit(error_msg)
|
||
logger.error(error_msg)
|
||
finally:
|
||
self.finished_signal.emit()
|
||
|
||
class UploadSettingsThread(QThread):
|
||
"""上传settings.py文件的线程"""
|
||
result_ready = Signal(bool, str)
|
||
password_request_signal = Signal() # 请求密码的信号
|
||
|
||
def __init__(self, ssh_client, django_path, settings_content):
|
||
super().__init__()
|
||
self.ssh_client = ssh_client
|
||
self.django_path = django_path
|
||
self.settings_content = settings_content
|
||
self.password = None
|
||
self.waiting_for_password = False
|
||
|
||
def set_password(self, password):
|
||
self.password = password
|
||
self.waiting_for_password = False
|
||
|
||
def run(self):
|
||
try:
|
||
# 查找settings.py文件
|
||
stdin, stdout, stderr = self.ssh_client.exec_command(f"find {self.django_path} -name settings.py")
|
||
exit_status = stdout.channel.recv_exit_status()
|
||
|
||
if exit_status != 0:
|
||
self.result_ready.emit(False, "未找到settings.py文件")
|
||
logger.error("未找到settings.py文件")
|
||
return
|
||
|
||
settings_path = stdout.read().decode().strip()
|
||
|
||
# 创建临时文件
|
||
temp_file = "/tmp/settings_upload.py"
|
||
sftp = self.ssh_client.open_sftp()
|
||
|
||
with sftp.file(temp_file, 'w') as f:
|
||
f.write(self.settings_content)
|
||
|
||
# 在覆盖前备份原文件(带时间戳)
|
||
import datetime
|
||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
backup_path = f"{settings_path}.backup_{timestamp}"
|
||
|
||
# 备份原文件
|
||
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S cp {settings_path} {backup_path}")
|
||
|
||
# 检查是否需要密码
|
||
password_sent = False
|
||
while True:
|
||
if stderr.channel.recv_ready():
|
||
error = stderr.channel.recv(1024).decode('utf-8', errors='replace')
|
||
if error:
|
||
# 检查是否需要密码
|
||
if ("password for" in error.lower() or "密码" in error) and not password_sent:
|
||
self.waiting_for_password = True
|
||
self.password_request_signal.emit()
|
||
|
||
# 等待密码输入
|
||
while self.waiting_for_password:
|
||
self.msleep(100)
|
||
|
||
# 发送密码
|
||
if self.password:
|
||
stdin.write(self.password + "\n")
|
||
stdin.flush()
|
||
password_sent = True
|
||
else:
|
||
self.result_ready.emit(False, "未提供密码,备份失败")
|
||
logger.error("未提供密码,备份失败")
|
||
sftp.close()
|
||
return
|
||
else:
|
||
logger.error(f"备份错误: {error.strip()}")
|
||
|
||
if stdout.channel.exit_status_ready():
|
||
break
|
||
|
||
# 短暂休眠以避免过度占用CPU
|
||
time.sleep(0.01)
|
||
|
||
exit_status = stdout.channel.recv_exit_status()
|
||
|
||
if exit_status != 0:
|
||
error = stderr.read().decode()
|
||
self.result_ready.emit(False, f"备份原文件失败: {error}")
|
||
logger.error(f"备份原文件失败: {error}")
|
||
sftp.close()
|
||
return
|
||
|
||
logger.info(f"原文件已备份至: {backup_path}")
|
||
|
||
# 移动临时文件到目标位置
|
||
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S mv {temp_file} {settings_path}")
|
||
|
||
# 检查是否需要密码
|
||
password_sent = False
|
||
while True:
|
||
if stderr.channel.recv_ready():
|
||
error = stderr.channel.recv(1024).decode('utf-8', errors='replace')
|
||
if error:
|
||
# 检查是否需要密码
|
||
if ("password for" in error.lower() or "密码" in error) and not password_sent:
|
||
self.waiting_for_password = True
|
||
self.password_request_signal.emit()
|
||
|
||
# 等待密码输入
|
||
while self.waiting_for_password:
|
||
self.msleep(100)
|
||
|
||
# 发送密码
|
||
if self.password:
|
||
stdin.write(self.password + "\n")
|
||
stdin.flush()
|
||
password_sent = True
|
||
else:
|
||
self.result_ready.emit(False, "未提供密码,移动文件失败")
|
||
logger.error("未提供密码,移动文件失败")
|
||
sftp.close()
|
||
return
|
||
else:
|
||
logger.error(f"移动文件错误: {error.strip()}")
|
||
|
||
if stdout.channel.exit_status_ready():
|
||
break
|
||
|
||
# 短暂休眠以避免过度占用CPU
|
||
time.sleep(0.01)
|
||
|
||
exit_status = stdout.channel.recv_exit_status()
|
||
|
||
if exit_status == 0:
|
||
self.result_ready.emit(True, f"settings.py上传成功,原文件已备份为: {os.path.basename(backup_path)}")
|
||
logger.info(f"settings.py上传成功,原文件已备份为: {os.path.basename(backup_path)}")
|
||
else:
|
||
error = stderr.read().decode()
|
||
self.result_ready.emit(False, error)
|
||
logger.error(f"settings.py上传失败: {error}")
|
||
|
||
sftp.close()
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
self.result_ready.emit(False, error_msg)
|
||
logger.error(f"settings.py上传异常: {error_msg}") |