Files
django.remote/django_threads.py
2025-08-31 13:08:06 +08:00

364 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import sys
import time
from PySide6.QtCore import QThread, Signal
from loguru import logger
class DjangoInstallThread(QThread):
"""安装Django的线程"""
result_ready = Signal(bool, str)
progress_updated = Signal(int)
def __init__(self, ssh_client, password):
super().__init__()
self.ssh_client = ssh_client
self.password = password
def run(self):
try:
self.progress_updated.emit(10)
# 检查Django是否已安装
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
django_version = stdout.read().decode().strip()
if django_version:
self.result_ready.emit(True, f"Django已安装: {django_version}")
logger.info(f"Django已安装: {django_version}")
return
self.progress_updated.emit(30)
# 尝试使用pip安装
stdin, stdout, stderr = self.ssh_client.exec_command("pip3 install --break-system-packages django")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.progress_updated.emit(90)
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
django_version = stdout.read().decode().strip()
self.result_ready.emit(True, f"Django安装成功: {django_version}")
logger.info(f"Django安装成功: {django_version}")
return
self.progress_updated.emit(50)
# 如果pip安装失败尝试使用apt安装使用-S选项从标准输入读取密码
stdin, stdout, stderr = self.ssh_client.exec_command(f"echo '{self.password}' | sudo -S apt update && echo '{self.password}' | sudo -S apt install -y python3-django")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.progress_updated.emit(90)
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
django_version = stdout.read().decode().strip()
self.result_ready.emit(True, f"Django安装成功: {django_version}")
logger.info(f"Django安装成功: {django_version}")
else:
error = stderr.read().decode()
self.result_ready.emit(False, f"Django安装失败: {error}")
logger.error(f"Django安装失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Django安装异常: {error_msg}")
class DjangoCommandThread(QThread):
"""执行Django相关命令的线程"""
output_signal = Signal(str)
finished_signal = Signal()
password_request_signal = Signal() # 请求密码的信号
def __init__(self, ssh_client, command, log_file=None):
super().__init__()
self.ssh_client = ssh_client
self.command = command
self.password = None
self.waiting_for_password = False
self.log_file = log_file # 日志文件路径,如果提供则将输出重定向到该文件
def set_password(self, password):
self.password = password
self.waiting_for_password = False
def run(self):
try:
logger.info(f"执行Django命令: {self.command}")
# 如果提供了日志文件路径,修改命令以重定向输出
if self.log_file:
self.command = f"{self.command} > {self.log_file} 2>&1"
self.output_signal.emit(f"命令输出将重定向到日志文件: {self.log_file}")
# 如果命令包含sudo修改为使用-S选项从标准输入读取密码
if "sudo" in self.command:
command_with_sudo = self.command.replace("sudo", "sudo -S")
stdin, stdout, stderr = self.ssh_client.exec_command(command_with_sudo)
# 检查是否需要密码
password_prompt = False
for line in iter(stderr.readline, ""):
line_text = line.strip()
self.output_signal.emit(line_text)
if "password for" in line_text.lower() or "密码" in line_text:
password_prompt = True
break
# 如果需要密码,请求用户输入
if password_prompt:
self.waiting_for_password = True
self.password_request_signal.emit()
# 等待密码输入
while self.waiting_for_password:
self.msleep(100)
# 发送密码
if self.password:
stdin.write(self.password + "\n")
stdin.flush()
self.output_signal.emit("密码已发送")
else:
self.output_signal.emit("未提供密码,命令可能失败")
else:
stdin, stdout, stderr = self.ssh_client.exec_command(self.command)
# 对于apt命令使用特殊处理以显示进度
if "apt install" in self.command:
# 使用非交互模式并显示进度
self.output_signal.emit("正在安装软件包,请稍候...")
# 读取输出和错误,实时显示
import select
import time
# 检查是否需要密码
password_sent = False
while not stdout.channel.exit_status_ready():
# 检查是否有数据可读
r, w, e = select.select([stdout.channel, stderr.channel], [], [], 0.1)
if stdout.channel in r:
output = stdout.channel.recv(1024).decode('utf-8', errors='replace')
if output:
self.output_signal.emit(output.strip())
logger.info(f"命令输出: {output.strip()}")
if stderr.channel in r:
error = stderr.channel.recv(1024).decode('utf-8', errors='replace')
if error:
# 检查是否需要密码
if ("password for" in error.lower() or "密码" in error) and not password_sent:
self.output_signal.emit("检测到需要输入密码")
self.waiting_for_password = True
self.password_request_signal.emit()
# 等待密码输入
while self.waiting_for_password:
self.msleep(100)
# 发送密码
if self.password:
stdin.write(self.password + "\n")
stdin.flush()
password_sent = True
self.output_signal.emit("密码已发送")
else:
self.output_signal.emit("未提供密码,命令可能失败")
else:
self.output_signal.emit(error.strip())
logger.error(f"命令错误: {error.strip()}")
# 短暂休眠以避免过度占用CPU
time.sleep(0.01)
# 读取剩余输出
while True:
r, w, e = select.select([stdout.channel, stderr.channel], [], [], 0.1)
if not r:
break
if stdout.channel in r:
output = stdout.channel.recv(1024).decode('utf-8', errors='replace')
if output:
self.output_signal.emit(output.strip())
logger.info(f"命令输出: {output.strip()}")
if stderr.channel in r:
error = stderr.channel.recv(1024).decode('utf-8', errors='replace')
if error:
self.output_signal.emit(error.strip())
logger.error(f"命令错误: {error.strip()}")
else:
# 对于非apt命令使用原有的行读取方式
# 读取输出
for line in iter(stdout.readline, ""):
self.output_signal.emit(line.strip())
logger.info(f"命令输出: {line.strip()}")
# 读取错误
for line in iter(stderr.readline, ""):
line_text = line.strip()
if "password for" not in line_text.lower() and "密码" not in line_text: # 避免重复显示密码提示
self.output_signal.emit(f"错误: {line_text}")
logger.error(f"命令错误: {line_text}")
# 检查退出状态
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.output_signal.emit("命令执行成功")
logger.info(f"命令执行成功: {self.command}")
else:
self.output_signal.emit(f"命令执行失败,退出状态: {exit_status}")
logger.error(f"命令执行失败,退出状态: {exit_status}")
except Exception as e:
error_msg = f"执行命令时出错: {str(e)}"
self.output_signal.emit(error_msg)
logger.error(error_msg)
finally:
self.finished_signal.emit()
class UploadSettingsThread(QThread):
"""上传settings.py文件的线程"""
result_ready = Signal(bool, str)
password_request_signal = Signal() # 请求密码的信号
def __init__(self, ssh_client, django_path, settings_content):
super().__init__()
self.ssh_client = ssh_client
self.django_path = django_path
self.settings_content = settings_content
self.password = None
self.waiting_for_password = False
def set_password(self, password):
self.password = password
self.waiting_for_password = False
def run(self):
try:
# 查找settings.py文件
stdin, stdout, stderr = self.ssh_client.exec_command(f"find {self.django_path} -name settings.py")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
self.result_ready.emit(False, "未找到settings.py文件")
logger.error("未找到settings.py文件")
return
settings_path = stdout.read().decode().strip()
# 创建临时文件
temp_file = "/tmp/settings_upload.py"
sftp = self.ssh_client.open_sftp()
with sftp.file(temp_file, 'w') as f:
f.write(self.settings_content)
# 在覆盖前备份原文件(带时间戳)
import datetime
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = f"{settings_path}.backup_{timestamp}"
# 备份原文件
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S cp {settings_path} {backup_path}")
# 检查是否需要密码
password_sent = False
while True:
if stderr.channel.recv_ready():
error = stderr.channel.recv(1024).decode('utf-8', errors='replace')
if error:
# 检查是否需要密码
if ("password for" in error.lower() or "密码" in error) and not password_sent:
self.waiting_for_password = True
self.password_request_signal.emit()
# 等待密码输入
while self.waiting_for_password:
self.msleep(100)
# 发送密码
if self.password:
stdin.write(self.password + "\n")
stdin.flush()
password_sent = True
else:
self.result_ready.emit(False, "未提供密码,备份失败")
logger.error("未提供密码,备份失败")
sftp.close()
return
else:
logger.error(f"备份错误: {error.strip()}")
if stdout.channel.exit_status_ready():
break
# 短暂休眠以避免过度占用CPU
time.sleep(0.01)
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
self.result_ready.emit(False, f"备份原文件失败: {error}")
logger.error(f"备份原文件失败: {error}")
sftp.close()
return
logger.info(f"原文件已备份至: {backup_path}")
# 移动临时文件到目标位置
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S mv {temp_file} {settings_path}")
# 检查是否需要密码
password_sent = False
while True:
if stderr.channel.recv_ready():
error = stderr.channel.recv(1024).decode('utf-8', errors='replace')
if error:
# 检查是否需要密码
if ("password for" in error.lower() or "密码" in error) and not password_sent:
self.waiting_for_password = True
self.password_request_signal.emit()
# 等待密码输入
while self.waiting_for_password:
self.msleep(100)
# 发送密码
if self.password:
stdin.write(self.password + "\n")
stdin.flush()
password_sent = True
else:
self.result_ready.emit(False, "未提供密码,移动文件失败")
logger.error("未提供密码,移动文件失败")
sftp.close()
return
else:
logger.error(f"移动文件错误: {error.strip()}")
if stdout.channel.exit_status_ready():
break
# 短暂休眠以避免过度占用CPU
time.sleep(0.01)
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.result_ready.emit(True, f"settings.py上传成功原文件已备份为: {os.path.basename(backup_path)}")
logger.info(f"settings.py上传成功原文件已备份为: {os.path.basename(backup_path)}")
else:
error = stderr.read().decode()
self.result_ready.emit(False, error)
logger.error(f"settings.py上传失败: {error}")
sftp.close()
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"settings.py上传异常: {error_msg}")