完成Django的标签的功能

This commit is contained in:
2025-08-31 13:08:06 +08:00
parent 9854a00542
commit d20ddfed59
10 changed files with 1993 additions and 4 deletions

364
django_threads.py Normal file
View File

@@ -0,0 +1,364 @@
import os
import sys
import time
from PySide6.QtCore import QThread, Signal
from loguru import logger
class DjangoInstallThread(QThread):
"""安装Django的线程"""
result_ready = Signal(bool, str)
progress_updated = Signal(int)
def __init__(self, ssh_client, password):
super().__init__()
self.ssh_client = ssh_client
self.password = password
def run(self):
try:
self.progress_updated.emit(10)
# 检查Django是否已安装
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
django_version = stdout.read().decode().strip()
if django_version:
self.result_ready.emit(True, f"Django已安装: {django_version}")
logger.info(f"Django已安装: {django_version}")
return
self.progress_updated.emit(30)
# 尝试使用pip安装
stdin, stdout, stderr = self.ssh_client.exec_command("pip3 install --break-system-packages django")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.progress_updated.emit(90)
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
django_version = stdout.read().decode().strip()
self.result_ready.emit(True, f"Django安装成功: {django_version}")
logger.info(f"Django安装成功: {django_version}")
return
self.progress_updated.emit(50)
# 如果pip安装失败尝试使用apt安装使用-S选项从标准输入读取密码
stdin, stdout, stderr = self.ssh_client.exec_command(f"echo '{self.password}' | sudo -S apt update && echo '{self.password}' | sudo -S apt install -y python3-django")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.progress_updated.emit(90)
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
django_version = stdout.read().decode().strip()
self.result_ready.emit(True, f"Django安装成功: {django_version}")
logger.info(f"Django安装成功: {django_version}")
else:
error = stderr.read().decode()
self.result_ready.emit(False, f"Django安装失败: {error}")
logger.error(f"Django安装失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Django安装异常: {error_msg}")
class DjangoCommandThread(QThread):
"""执行Django相关命令的线程"""
output_signal = Signal(str)
finished_signal = Signal()
password_request_signal = Signal() # 请求密码的信号
def __init__(self, ssh_client, command, log_file=None):
super().__init__()
self.ssh_client = ssh_client
self.command = command
self.password = None
self.waiting_for_password = False
self.log_file = log_file # 日志文件路径,如果提供则将输出重定向到该文件
def set_password(self, password):
self.password = password
self.waiting_for_password = False
def run(self):
try:
logger.info(f"执行Django命令: {self.command}")
# 如果提供了日志文件路径,修改命令以重定向输出
if self.log_file:
self.command = f"{self.command} > {self.log_file} 2>&1"
self.output_signal.emit(f"命令输出将重定向到日志文件: {self.log_file}")
# 如果命令包含sudo修改为使用-S选项从标准输入读取密码
if "sudo" in self.command:
command_with_sudo = self.command.replace("sudo", "sudo -S")
stdin, stdout, stderr = self.ssh_client.exec_command(command_with_sudo)
# 检查是否需要密码
password_prompt = False
for line in iter(stderr.readline, ""):
line_text = line.strip()
self.output_signal.emit(line_text)
if "password for" in line_text.lower() or "密码" in line_text:
password_prompt = True
break
# 如果需要密码,请求用户输入
if password_prompt:
self.waiting_for_password = True
self.password_request_signal.emit()
# 等待密码输入
while self.waiting_for_password:
self.msleep(100)
# 发送密码
if self.password:
stdin.write(self.password + "\n")
stdin.flush()
self.output_signal.emit("密码已发送")
else:
self.output_signal.emit("未提供密码,命令可能失败")
else:
stdin, stdout, stderr = self.ssh_client.exec_command(self.command)
# 对于apt命令使用特殊处理以显示进度
if "apt install" in self.command:
# 使用非交互模式并显示进度
self.output_signal.emit("正在安装软件包,请稍候...")
# 读取输出和错误,实时显示
import select
import time
# 检查是否需要密码
password_sent = False
while not stdout.channel.exit_status_ready():
# 检查是否有数据可读
r, w, e = select.select([stdout.channel, stderr.channel], [], [], 0.1)
if stdout.channel in r:
output = stdout.channel.recv(1024).decode('utf-8', errors='replace')
if output:
self.output_signal.emit(output.strip())
logger.info(f"命令输出: {output.strip()}")
if stderr.channel in r:
error = stderr.channel.recv(1024).decode('utf-8', errors='replace')
if error:
# 检查是否需要密码
if ("password for" in error.lower() or "密码" in error) and not password_sent:
self.output_signal.emit("检测到需要输入密码")
self.waiting_for_password = True
self.password_request_signal.emit()
# 等待密码输入
while self.waiting_for_password:
self.msleep(100)
# 发送密码
if self.password:
stdin.write(self.password + "\n")
stdin.flush()
password_sent = True
self.output_signal.emit("密码已发送")
else:
self.output_signal.emit("未提供密码,命令可能失败")
else:
self.output_signal.emit(error.strip())
logger.error(f"命令错误: {error.strip()}")
# 短暂休眠以避免过度占用CPU
time.sleep(0.01)
# 读取剩余输出
while True:
r, w, e = select.select([stdout.channel, stderr.channel], [], [], 0.1)
if not r:
break
if stdout.channel in r:
output = stdout.channel.recv(1024).decode('utf-8', errors='replace')
if output:
self.output_signal.emit(output.strip())
logger.info(f"命令输出: {output.strip()}")
if stderr.channel in r:
error = stderr.channel.recv(1024).decode('utf-8', errors='replace')
if error:
self.output_signal.emit(error.strip())
logger.error(f"命令错误: {error.strip()}")
else:
# 对于非apt命令使用原有的行读取方式
# 读取输出
for line in iter(stdout.readline, ""):
self.output_signal.emit(line.strip())
logger.info(f"命令输出: {line.strip()}")
# 读取错误
for line in iter(stderr.readline, ""):
line_text = line.strip()
if "password for" not in line_text.lower() and "密码" not in line_text: # 避免重复显示密码提示
self.output_signal.emit(f"错误: {line_text}")
logger.error(f"命令错误: {line_text}")
# 检查退出状态
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.output_signal.emit("命令执行成功")
logger.info(f"命令执行成功: {self.command}")
else:
self.output_signal.emit(f"命令执行失败,退出状态: {exit_status}")
logger.error(f"命令执行失败,退出状态: {exit_status}")
except Exception as e:
error_msg = f"执行命令时出错: {str(e)}"
self.output_signal.emit(error_msg)
logger.error(error_msg)
finally:
self.finished_signal.emit()
class UploadSettingsThread(QThread):
"""上传settings.py文件的线程"""
result_ready = Signal(bool, str)
password_request_signal = Signal() # 请求密码的信号
def __init__(self, ssh_client, django_path, settings_content):
super().__init__()
self.ssh_client = ssh_client
self.django_path = django_path
self.settings_content = settings_content
self.password = None
self.waiting_for_password = False
def set_password(self, password):
self.password = password
self.waiting_for_password = False
def run(self):
try:
# 查找settings.py文件
stdin, stdout, stderr = self.ssh_client.exec_command(f"find {self.django_path} -name settings.py")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
self.result_ready.emit(False, "未找到settings.py文件")
logger.error("未找到settings.py文件")
return
settings_path = stdout.read().decode().strip()
# 创建临时文件
temp_file = "/tmp/settings_upload.py"
sftp = self.ssh_client.open_sftp()
with sftp.file(temp_file, 'w') as f:
f.write(self.settings_content)
# 在覆盖前备份原文件(带时间戳)
import datetime
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = f"{settings_path}.backup_{timestamp}"
# 备份原文件
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S cp {settings_path} {backup_path}")
# 检查是否需要密码
password_sent = False
while True:
if stderr.channel.recv_ready():
error = stderr.channel.recv(1024).decode('utf-8', errors='replace')
if error:
# 检查是否需要密码
if ("password for" in error.lower() or "密码" in error) and not password_sent:
self.waiting_for_password = True
self.password_request_signal.emit()
# 等待密码输入
while self.waiting_for_password:
self.msleep(100)
# 发送密码
if self.password:
stdin.write(self.password + "\n")
stdin.flush()
password_sent = True
else:
self.result_ready.emit(False, "未提供密码,备份失败")
logger.error("未提供密码,备份失败")
sftp.close()
return
else:
logger.error(f"备份错误: {error.strip()}")
if stdout.channel.exit_status_ready():
break
# 短暂休眠以避免过度占用CPU
time.sleep(0.01)
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
self.result_ready.emit(False, f"备份原文件失败: {error}")
logger.error(f"备份原文件失败: {error}")
sftp.close()
return
logger.info(f"原文件已备份至: {backup_path}")
# 移动临时文件到目标位置
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S mv {temp_file} {settings_path}")
# 检查是否需要密码
password_sent = False
while True:
if stderr.channel.recv_ready():
error = stderr.channel.recv(1024).decode('utf-8', errors='replace')
if error:
# 检查是否需要密码
if ("password for" in error.lower() or "密码" in error) and not password_sent:
self.waiting_for_password = True
self.password_request_signal.emit()
# 等待密码输入
while self.waiting_for_password:
self.msleep(100)
# 发送密码
if self.password:
stdin.write(self.password + "\n")
stdin.flush()
password_sent = True
else:
self.result_ready.emit(False, "未提供密码,移动文件失败")
logger.error("未提供密码,移动文件失败")
sftp.close()
return
else:
logger.error(f"移动文件错误: {error.strip()}")
if stdout.channel.exit_status_ready():
break
# 短暂休眠以避免过度占用CPU
time.sleep(0.01)
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.result_ready.emit(True, f"settings.py上传成功原文件已备份为: {os.path.basename(backup_path)}")
logger.info(f"settings.py上传成功原文件已备份为: {os.path.basename(backup_path)}")
else:
error = stderr.read().decode()
self.result_ready.emit(False, error)
logger.error(f"settings.py上传失败: {error}")
sftp.close()
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"settings.py上传异常: {error_msg}")