Files
django.remote/gunicorn_tab.py

1197 lines
54 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import sys
import datetime
from PySide6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QPushButton,
QLabel, QTextEdit, QFileDialog, QMessageBox,
QLineEdit, QDialog, QDialogButtonBox)
from PySide6.QtCore import QThread, Signal
from loguru import logger
class PasswordDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("输入密码")
self.setMinimumWidth(300)
layout = QVBoxLayout()
# 提示标签
label = QLabel("请输入sudo密码:")
layout.addWidget(label)
# 密码输入框
self.password_input = QLineEdit()
self.password_input.setEchoMode(QLineEdit.Password)
layout.addWidget(self.password_input)
# 按钮
button_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel)
button_box.accepted.connect(self.accept)
button_box.rejected.connect(self.reject)
layout.addWidget(button_box)
self.setLayout(layout)
def get_password(self):
return self.password_input.text()
class GunicornInstallThread(QThread):
"""安装Gunicorn的线程"""
result_ready = Signal(bool, str)
progress_updated = Signal(int)
def __init__(self, ssh_client, password=None):
super().__init__()
self.ssh_client = ssh_client
self.password = password
def run(self):
try:
self.progress_updated.emit(10)
# 检查Gunicorn是否已安装
logger.info("检查Gunicorn是否已安装")
stdin, stdout, stderr = self.ssh_client.exec_command("gunicorn --version")
exit_status = stdout.channel.recv_exit_status()
gunicorn_version = stdout.read().decode().strip()
version_error = stderr.read().decode()
logger.info(f"Gunicorn版本检查状态: {exit_status}")
logger.info(f"Gunicorn版本信息: {gunicorn_version}")
if version_error:
logger.error(f"Gunicorn版本检查错误: {version_error}")
if gunicorn_version:
self.result_ready.emit(True, f"Gunicorn已安装: {gunicorn_version}")
logger.info(f"Gunicorn已安装: {gunicorn_version}")
return
self.progress_updated.emit(30)
# 尝试使用pip安装
logger.info("开始使用pip安装Gunicorn")
stdin, stdout, stderr = self.ssh_client.exec_command("pip3 install gunicorn")
exit_status = stdout.channel.recv_exit_status()
install_output = stdout.read().decode()
install_error = stderr.read().decode()
logger.info(f"Gunicorn pip安装命令执行状态: {exit_status}")
logger.info(f"Gunicorn pip安装输出: {install_output}")
if install_error:
logger.error(f"Gunicorn pip安装错误: {install_error}")
if exit_status == 0:
self.progress_updated.emit(90)
# 验证安装
logger.info("验证Gunicorn安装")
stdin, stdout, stderr = self.ssh_client.exec_command("gunicorn --version")
version_exit_status = stdout.channel.recv_exit_status()
gunicorn_version = stdout.read().decode().strip()
version_error = stderr.read().decode()
logger.info(f"Gunicorn版本检查状态: {version_exit_status}")
logger.info(f"Gunicorn版本信息: {gunicorn_version}")
if version_error:
logger.error(f"Gunicorn版本检查错误: {version_error}")
if gunicorn_version:
self.result_ready.emit(True, f"Gunicorn安装成功: {gunicorn_version}")
logger.info(f"Gunicorn安装成功: {gunicorn_version}")
else:
self.result_ready.emit(False, "Gunicorn安装后无法获取版本信息")
logger.error("Gunicorn安装后无法获取版本信息")
return
self.progress_updated.emit(50)
# 如果pip安装失败尝试使用apt安装
logger.info("pip安装失败尝试使用apt安装Gunicorn")
if self.password:
logger.info("使用密码进行sudo apt安装")
# 使用bash -c将命令组合在一起确保密码对所有sudo命令有效
stdin, stdout, stderr = self.ssh_client.exec_command(f"bash -c 'echo \"{self.password}\" | sudo -S apt update && echo \"{self.password}\" | sudo -S apt install -y gunicorn'")
else:
logger.info("无密码进行sudo apt安装")
stdin, stdout, stderr = self.ssh_client.exec_command("sudo apt update && sudo apt install -y gunicorn")
exit_status = stdout.channel.recv_exit_status()
apt_output = stdout.read().decode()
apt_error = stderr.read().decode()
logger.info(f"Gunicorn apt安装命令执行状态: {exit_status}")
logger.info(f"Gunicorn apt安装输出: {apt_output}")
if apt_error:
logger.error(f"Gunicorn apt安装错误: {apt_error}")
if exit_status == 0:
self.progress_updated.emit(90)
# 验证安装
logger.info("验证apt安装的Gunicorn")
stdin, stdout, stderr = self.ssh_client.exec_command("gunicorn --version")
version_exit_status = stdout.channel.recv_exit_status()
gunicorn_version = stdout.read().decode().strip()
version_error = stderr.read().decode()
logger.info(f"Gunicorn版本检查状态: {version_exit_status}")
logger.info(f"Gunicorn版本信息: {gunicorn_version}")
if version_error:
logger.error(f"Gunicorn版本检查错误: {version_error}")
if gunicorn_version:
self.result_ready.emit(True, f"Gunicorn安装成功: {gunicorn_version}")
logger.info(f"Gunicorn安装成功: {gunicorn_version}")
else:
self.result_ready.emit(False, "Gunicorn安装后无法获取版本信息")
logger.error("Gunicorn安装后无法获取版本信息")
else:
error = stderr.read().decode()
self.result_ready.emit(False, f"Gunicorn安装失败: {error}")
logger.error(f"Gunicorn安装失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Gunicorn安装异常: {error_msg}")
class GunicornTestThread(QThread):
"""测试Gunicorn的线程"""
result_ready = Signal(bool, str)
def __init__(self, ssh_client, project_name, django_path, password):
super().__init__()
self.ssh_client = ssh_client
self.project_name = project_name
self.django_path = django_path
self.password = password
def run(self):
try:
logger.info(f"开始测试Gunicorn使用的Django路径: {self.django_path}, 项目名: {self.project_name}")
# 首先检查项目目录结构
logger.info("检查项目目录结构...")
list_command = f"ls -la {self.django_path}{self.project_name}"
stdin, stdout, stderr = self.ssh_client.exec_command(list_command)
dir_content = stdout.read().decode()
dir_error = stderr.read().decode()
logger.info(f"项目目录内容: {dir_content}")
if dir_error:
logger.error(f"列出目录内容错误: {dir_error}")
# 检查是否存在manage.py文件这通常位于Django项目的根目录
manage_check = f"find {self.django_path} -name manage.py"
stdin, stdout, stderr = self.ssh_client.exec_command(manage_check)
manage_files = stdout.read().decode()
manage_error = stderr.read().decode()
logger.info(f"找到的manage.py文件: {manage_files}")
if manage_error:
logger.error(f"查找manage.py文件错误: {manage_error}")
# 如果找到manage.py文件检查其所在目录
if manage_files.strip():
manage_path = manage_files.strip().split('\n')[0]
manage_dir = manage_path.rsplit('/', 1)[0] # 获取manage.py所在的目录
logger.info(f"manage.py所在目录: {manage_dir}")
# 检查该目录下的内容
list_manage_dir = f"ls -la {manage_dir}"
stdin, stdout, stderr = self.ssh_client.exec_command(list_manage_dir)
manage_dir_content = stdout.read().decode()
manage_dir_error = stderr.read().decode()
logger.info(f"manage.py所在目录内容: {manage_dir_content}")
if manage_dir_error:
logger.error(f"列出manage.py所在目录内容错误: {manage_dir_error}")
# 检查是否存在wsgi.py文件
wsgi_check = f"find {self.django_path} -name wsgi.py"
stdin, stdout, stderr = self.ssh_client.exec_command(wsgi_check)
wsgi_files = stdout.read().decode()
wsgi_error = stderr.read().decode()
logger.info(f"找到的wsgi.py文件: {wsgi_files}")
if wsgi_error:
logger.error(f"查找wsgi.py文件错误: {wsgi_error}")
# 检查settings.py文件特别是INSTALLED_APPS配置
settings_check = f"find {self.django_path} -name settings.py"
stdin, stdout, stderr = self.ssh_client.exec_command(settings_check)
settings_files = stdout.read().decode()
settings_error = stderr.read().decode()
logger.info(f"找到的settings.py文件: {settings_files}")
if settings_error:
logger.error(f"查找settings.py文件错误: {settings_error}")
# 如果找到settings.py文件检查其内容
if settings_files.strip():
settings_path = settings_files.strip().split('\n')[0]
cat_settings = f"cat {settings_path}"
stdin, stdout, stderr = self.ssh_client.exec_command(cat_settings)
settings_content = stdout.read().decode()
settings_content_error = stderr.read().decode()
logger.info(f"settings.py文件内容: {settings_content[:500]}...") # 只显示前500个字符避免日志过长
if settings_content_error:
logger.error(f"读取settings.py文件错误: {settings_content_error}")
# 测试运行Gunicorn
# 尝试使用不同的路径来运行Gunicorn
# 首先尝试使用manage.py所在的目录作为工作目录
if manage_files.strip():
manage_path = manage_files.strip().split('\n')[0]
manage_dir = manage_path.rsplit('/', 1)[0] # 获取manage.py所在的目录
logger.info(f"尝试使用manage.py所在目录作为工作目录: {manage_dir}")
# 检查manage_dir中是否有与项目同名的子目录
project_subdir = f"{manage_dir}/{self.project_name}"
check_subdir = f"ls -la {project_subdir}"
stdin, stdout, stderr = self.ssh_client.exec_command(check_subdir)
subdir_content = stdout.read().decode()
subdir_error = stderr.read().decode()
if not subdir_error and "wsgi.py" in subdir_content:
logger.info(f"找到项目子目录尝试使用该目录运行Gunicorn: {project_subdir}")
command = f"cd {manage_dir} && bash -c 'echo \"{self.password}\" | sudo -S gunicorn --pythonpath {manage_dir} {self.project_name}.wsgi:application --bind 0.0.0.0:8000' &"
else:
logger.info(f"未找到项目子目录尝试使用Django路径运行Gunicorn")
# 使用Django路径作为工作目录但项目模块在子目录中
command = f"cd {self.django_path} && bash -c 'echo \"{self.password}\" | sudo -S gunicorn --pythonpath {self.django_path} {self.project_name}.wsgi:application --bind 0.0.0.0:8000' &"
else:
logger.info(f"未找到manage.py文件尝试使用原始路径运行Gunicorn")
# 使用Django路径作为工作目录但项目模块在子目录中
command = f"cd {self.django_path} && bash -c 'echo \"{self.password}\" | sudo -S gunicorn --pythonpath {self.django_path} {self.project_name}.wsgi:application --bind 0.0.0.0:8000' &"
logger.info(f"执行Gunicorn测试命令: {command}")
stdin, stdout, stderr = self.ssh_client.exec_command(command)
# 记录Gunicorn启动命令的输出和错误
start_output = stdout.read().decode()
start_error = stderr.read().decode()
logger.info(f"Gunicorn启动输出: {start_output}")
if start_error:
logger.error(f"Gunicorn启动错误: {start_error}")
# 等待一段时间让Gunicorn启动
logger.info("等待Gunicorn启动...")
self.msleep(3000)
# 检查Gunicorn进程是否在运行
logger.info("检查Gunicorn进程状态...")
stdin, stdout, stderr = self.ssh_client.exec_command("ps aux | grep gunicorn")
processes = stdout.read().decode()
logger.info(f"Gunicorn进程检查结果: {processes}")
if self.project_name in processes:
self.result_ready.emit(True, "Gunicorn测试运行成功")
logger.info("Gunicorn测试运行成功")
# 停止Gunicorn进程
logger.info("停止Gunicorn进程...")
stdin, stdout, stderr = self.ssh_client.exec_command("pkill -f gunicorn")
stop_result = stdout.read().decode()
stop_error = stderr.read().decode()
logger.info(f"停止Gunicorn进程结果: {stop_result}")
if stop_error:
logger.error(f"停止Gunicorn进程错误: {stop_error}")
else:
self.result_ready.emit(False, "Gunicorn测试运行失败")
logger.error("Gunicorn测试运行失败")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Gunicorn测试运行异常: {error_msg}")
class GunicornServiceUploadThread(QThread):
"""上传Gunicorn服务文件的线程"""
result_ready = Signal(bool, str)
def __init__(self, ssh_client, service_content, service_name, password):
super().__init__()
self.ssh_client = ssh_client
self.service_content = service_content
self.service_name = service_name
self.password = password
def run(self):
try:
# 创建临时文件
temp_file = f"/tmp/{self.service_name}"
sftp = self.ssh_client.open_sftp()
with sftp.file(temp_file, 'w') as f:
f.write(self.service_content)
# 移动到目标位置
stdin, stdout, stderr = self.ssh_client.exec_command(f"bash -c 'echo \"{self.password}\" | sudo -S mv {temp_file} /etc/systemd/system/{self.service_name}'")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.result_ready.emit(True, f"Gunicorn服务文件上传成功: {self.service_name}")
logger.info(f"Gunicorn服务文件上传成功: {self.service_name}")
else:
error = stderr.read().decode()
self.result_ready.emit(False, f"Gunicorn服务文件上传失败: {error}")
logger.error(f"Gunicorn服务文件上传失败: {error}")
sftp.close()
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Gunicorn服务文件上传异常: {error_msg}")
class GunicornServiceControlThread(QThread):
"""控制Gunicorn服务的线程"""
result_ready = Signal(bool, str)
def __init__(self, ssh_client, service_name, password, action):
super().__init__()
self.ssh_client = ssh_client
self.service_name = service_name
self.password = password
self.action = action # "daemon-reload", "start", "enable"
def run(self):
try:
if self.action == "daemon-reload":
command = f"bash -c 'echo \"{self.password}\" | sudo -S systemctl daemon-reload'"
success_msg = "系统服务配置重新加载成功"
error_msg = "系统服务配置重新加载失败"
elif self.action == "start":
command = f"bash -c 'echo \"{self.password}\" | sudo -S systemctl start {self.service_name}'"
success_msg = f"Gunicorn服务启动成功: {self.service_name}"
error_msg = f"Gunicorn服务启动失败: {self.service_name}"
elif self.action == "enable":
command = f"bash -c 'echo \"{self.password}\" | sudo -S systemctl enable {self.service_name}'"
success_msg = f"Gunicorn服务设置开机自启动成功: {self.service_name}"
error_msg = f"Gunicorn服务设置开机自启动失败: {self.service_name}"
elif self.action == "restart":
command = f"bash -c 'echo \"{self.password}\" | sudo -S systemctl restart {self.service_name}'"
success_msg = f"Gunicorn服务重启成功: {self.service_name}"
error_msg = f"Gunicorn服务重启失败: {self.service_name}"
elif self.action == "status":
command = f"bash -c 'echo \"{self.password}\" | sudo -S systemctl status {self.service_name}'"
success_msg = f"Gunicorn服务状态查询成功: {self.service_name}"
error_msg = f"Gunicorn服务状态查询失败: {self.service_name}"
else:
self.result_ready.emit(False, "未知的操作")
logger.error("未知的Gunicorn服务操作")
return
stdin, stdout, stderr = self.ssh_client.exec_command(command)
exit_status = stdout.channel.recv_exit_status()
# 对于status操作即使exit_status不为0我们也需要获取输出
output = stdout.read().decode()
error = stderr.read().decode()
if self.action == "status":
# 对于status操作我们将输出作为结果返回无论exit_status如何
if output:
self.result_ready.emit(True, f"{success_msg}\n{output}")
logger.info(f"{success_msg}\n{output}")
else:
self.result_ready.emit(False, f"{error_msg}: {error}")
logger.error(f"{error_msg}: {error}")
elif exit_status == 0:
self.result_ready.emit(True, success_msg)
logger.info(success_msg)
else:
self.result_ready.emit(False, f"{error_msg}: {error}")
logger.error(f"{error_msg}: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Gunicorn服务控制异常: {error_msg}")
class GunicornLogThread(QThread):
"""查看Gunicorn服务日志的线程"""
result_ready = Signal(bool, str)
def __init__(self, ssh_client, service_name, password, lines=100):
super().__init__()
self.ssh_client = ssh_client
self.service_name = service_name
self.password = password
self.lines = lines # 要查看的日志行数
def run(self):
try:
# 使用journalctl查看Gunicorn服务的日志
command = f"bash -c 'echo \"{self.password}\" | sudo -S journalctl -u {self.service_name} -n {self.lines}'"
logger.info(f"查看Gunicorn服务日志: {command}")
stdin, stdout, stderr = self.ssh_client.exec_command(command)
exit_status = stdout.channel.recv_exit_status()
output = stdout.read().decode()
error = stderr.read().decode()
if exit_status == 0:
self.result_ready.emit(True, output)
logger.info(f"Gunicorn服务日志查看成功")
else:
self.result_ready.emit(False, f"查看日志失败: {error}")
logger.error(f"Gunicorn服务日志查看失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Gunicorn服务日志查看异常: {error_msg}")
class GunicornLogCheckThread(QThread):
"""检查并创建Gunicorn日志文件目录和文件的线程"""
result_ready = Signal(bool, str)
def __init__(self, ssh_client, username, git_url, password):
super().__init__()
self.ssh_client = ssh_client
self.username = username
self.git_url = git_url
self.password = password
def run(self):
try:
# 从git_url中提取项目名去掉.git后缀
project_name = self.git_url.split('/')[-1].replace('.git', '')
logger.info(f"从git_url提取的项目名: {project_name}")
# 构建日志目录路径
log_dir = f"/home/{self.username}/{project_name}/logs"
access_log_path = f"{log_dir}/gunicorn_access.log"
error_log_path = f"{log_dir}/gunicorn_error.log"
logger.info(f"检查日志目录: {log_dir}")
# 检查并创建日志目录
check_dir_cmd = f"bash -c 'echo \"{self.password}\" | sudo -S mkdir -p {log_dir}'"
stdin, stdout, stderr = self.ssh_client.exec_command(check_dir_cmd)
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
self.result_ready.emit(False, f"创建日志目录失败: {error}")
logger.error(f"创建日志目录失败: {error}")
return
# 设置日志目录权限
chmod_cmd = f"bash -c 'echo \"{self.password}\" | sudo -S chmod 755 {log_dir}'"
stdin, stdout, stderr = self.ssh_client.exec_command(chmod_cmd)
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
self.result_ready.emit(False, f"设置日志目录权限失败: {error}")
logger.error(f"设置日志目录权限失败: {error}")
return
# 检查并创建访问日志文件
check_access_log_cmd = f"bash -c 'echo \"{self.password}\" | sudo -S touch {access_log_path}'"
stdin, stdout, stderr = self.ssh_client.exec_command(check_access_log_cmd)
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
self.result_ready.emit(False, f"创建访问日志文件失败: {error}")
logger.error(f"创建访问日志文件失败: {error}")
return
# 设置访问日志文件权限
chmod_access_cmd = f"bash -c 'echo \"{self.password}\" | sudo -S chmod 644 {access_log_path}'"
stdin, stdout, stderr = self.ssh_client.exec_command(chmod_access_cmd)
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
self.result_ready.emit(False, f"设置访问日志文件权限失败: {error}")
logger.error(f"设置访问日志文件权限失败: {error}")
return
# 检查并创建错误日志文件
check_error_log_cmd = f"bash -c 'echo \"{self.password}\" | sudo -S touch {error_log_path}'"
stdin, stdout, stderr = self.ssh_client.exec_command(check_error_log_cmd)
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
self.result_ready.emit(False, f"创建错误日志文件失败: {error}")
logger.error(f"创建错误日志文件失败: {error}")
return
# 设置错误日志文件权限
chmod_error_cmd = f"bash -c 'echo \"{self.password}\" | sudo -S chmod 644 {error_log_path}'"
stdin, stdout, stderr = self.ssh_client.exec_command(chmod_error_cmd)
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
self.result_ready.emit(False, f"设置错误日志文件权限失败: {error}")
logger.error(f"设置错误日志文件权限失败: {error}")
return
# 设置日志文件所有者为用户
chown_cmd = f"bash -c 'echo \"{self.password}\" | sudo -S chown -R {self.username}:{self.username} {log_dir}'"
stdin, stdout, stderr = self.ssh_client.exec_command(chown_cmd)
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
self.result_ready.emit(False, f"设置日志文件所有者失败: {error}")
logger.error(f"设置日志文件所有者失败: {error}")
return
result_msg = f"日志目录和文件创建成功:\n目录: {log_dir}\n访问日志: {access_log_path}\n错误日志: {error_log_path}"
self.result_ready.emit(True, result_msg)
logger.info("日志目录和文件创建成功")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"检查并创建日志文件异常: {error_msg}")
class ServerControlThread(QThread):
"""控制服务器设置的线程"""
result_ready = Signal(bool, str)
def __init__(self, ssh_client, password):
super().__init__()
self.ssh_client = ssh_client
self.password = password
def run(self):
try:
# 设置时区,使用-S选项从标准输入读取密码
logger.info("开始设置服务器时区为Asia/Shanghai")
stdin, stdout, stderr = self.ssh_client.exec_command(f"bash -c 'echo \"{self.password}\" | sudo -S timedatectl set-timezone Asia/Shanghai'")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
self.result_ready.emit(False, f"设置时区失败: {error}")
logger.error(f"设置时区失败: {error}")
return
# 重启服务器,使用-S选项从标准输入读取密码
logger.info("开始重启服务器")
stdin, stdout, stderr = self.ssh_client.exec_command(f"bash -c 'echo \"{self.password}\" | sudo -S reboot'")
self.result_ready.emit(True, "时区设置成功,服务器正在重启")
logger.info("时区设置成功,服务器正在重启")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"服务器控制异常: {error_msg}")
class GunicornCommandThread(QThread):
"""执行Gunicorn命令的线程"""
result_ready = Signal(bool, str)
def __init__(self, ssh_client, command, password, django_path="", project_name=""):
super().__init__()
self.ssh_client = ssh_client
self.command = command
self.password = password
self.django_path = django_path
self.project_name = project_name
def run(self):
try:
logger.info(f"开始执行Gunicorn命令: {self.command}")
# 如果有django_path则切换到该目录执行命令
if self.django_path:
# 检查命令中是否已经包含--pythonpath参数
if "--pythonpath" not in self.command:
# 如果没有--pythonpath参数则添加
if "gunicorn" in self.command:
# 在gunicorn命令后添加--pythonpath参数
modified_command = self.command.replace("gunicorn", f"gunicorn --pythonpath {self.django_path}", 1)
logger.info(f"添加--pythonpath参数后的命令: {modified_command}")
self.command = modified_command
# 切换到django_path目录执行命令
full_command = f"cd {self.django_path} && bash -c 'echo \"{self.password}\" | sudo -S {self.command}'"
logger.info(f"在目录 {self.django_path} 中执行命令: {full_command}")
else:
# 如果没有django_path则直接执行命令
full_command = f"bash -c 'echo \"{self.password}\" | sudo -S {self.command}'"
logger.info(f"直接执行命令: {full_command}")
# 执行命令
stdin, stdout, stderr = self.ssh_client.exec_command(full_command)
exit_status = stdout.channel.recv_exit_status()
output = stdout.read().decode()
error = stderr.read().decode()
if exit_status == 0:
self.result_ready.emit(True, f"命令执行成功\n{output}")
logger.info(f"Gunicorn命令执行成功: {output}")
else:
self.result_ready.emit(False, f"命令执行失败\n{error}")
logger.error(f"Gunicorn命令执行失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Gunicorn命令执行异常: {error_msg}")
class GunicornTab(QWidget):
def __init__(self):
super().__init__()
self.ssh_client = None
self.username = ""
self.project_name = ""
self.django_path = ""
self.init_ui()
def init_ui(self):
layout = QVBoxLayout()
# Gunicorn管理区域
manage_layout = QHBoxLayout()
# 安装Gunicorn按钮
self.install_gunicorn_btn = QPushButton("安装Gunicorn")
self.install_gunicorn_btn.clicked.connect(self.install_gunicorn)
manage_layout.addWidget(self.install_gunicorn_btn)
# 测试运行按钮
self.test_gunicorn_btn = QPushButton("测试运行")
self.test_gunicorn_btn.clicked.connect(self.test_gunicorn)
manage_layout.addWidget(self.test_gunicorn_btn)
layout.addLayout(manage_layout)
# 服务文件编辑区域
service_layout = QVBoxLayout()
service_layout.addWidget(QLabel("Gunicorn服务文件编辑器:"))
# 服务文件编辑文本框
self.service_editor = QTextEdit()
self.service_editor.setReadOnly(False)
service_layout.addWidget(self.service_editor)
# 服务文件操作按钮
service_btn_layout = QHBoxLayout()
# 上传服务文件按钮
self.upload_service_btn = QPushButton("上传服务文件")
self.upload_service_btn.clicked.connect(self.upload_service)
service_btn_layout.addWidget(self.upload_service_btn)
# 查看服务状态按钮
self.check_status_btn = QPushButton("查看服务状态")
self.check_status_btn.clicked.connect(self.check_service_status)
service_btn_layout.addWidget(self.check_status_btn)
# 查看服务日志按钮
self.view_logs_btn = QPushButton("查看服务日志")
self.view_logs_btn.clicked.connect(self.view_service_logs)
service_btn_layout.addWidget(self.view_logs_btn)
# 检查日志文件按钮
self.check_log_files_btn = QPushButton("检查日志文件")
self.check_log_files_btn.clicked.connect(self.check_log_files)
service_btn_layout.addWidget(self.check_log_files_btn)
service_layout.addLayout(service_btn_layout)
layout.addLayout(service_layout)
# Gunicorn命令编辑区域
command_layout = QVBoxLayout()
command_layout.addWidget(QLabel("Gunicorn命令编辑器:"))
# 命令编辑文本框
self.command_editor = QTextEdit()
self.command_editor.setReadOnly(False)
self.command_editor.setPlainText("gunicorn --workers 3 --bind 0.0.0.0:8000 myproject.wsgi:application")
command_layout.addWidget(self.command_editor)
# 命令执行按钮
command_btn_layout = QHBoxLayout()
# 运行命令按钮
self.run_command_btn = QPushButton("运行以上命令")
self.run_command_btn.clicked.connect(self.run_gunicorn_command)
command_btn_layout.addWidget(self.run_command_btn)
command_layout.addLayout(command_btn_layout)
layout.addLayout(command_layout)
# 服务器控制区域
server_control_layout = QHBoxLayout()
# 设置时区并重启按钮
self.set_timezone_btn = QPushButton("设置时区并重启")
self.set_timezone_btn.clicked.connect(self.set_timezone_and_reboot)
server_control_layout.addWidget(self.set_timezone_btn)
layout.addLayout(server_control_layout)
# 输出区域
self.output_text = QTextEdit()
self.output_text.setReadOnly(True)
layout.addWidget(self.output_text)
self.setLayout(layout)
# 初始化服务文件内容
self.init_service_content()
def init_service_content(self):
"""初始化服务文件内容"""
default_content = "[Unit]\n"
default_content += "Description=Gunicorn Daemon for statuspage Project\n"
default_content += "After=network.target\n\n"
default_content += "[Service]\n"
default_content += "# 以xiaji用户运行确保对/home/xiaji有完全权限\n"
default_content += "User=xiaji\n"
default_content += "Group=xiaji\n"
default_content += "# 项目工作目录\n"
default_content += "WorkingDirectory=/home/xiaji/webstatus\n\n"
default_content += "# 启动前预处理先删除旧socket避免残留再创建目录如果不存在\n"
default_content += "ExecStartPre=/bin/rm -f /home/xiaji/webstatus/sock/gunicorn.sock\n"
default_content += "ExecStartPre=/bin/mkdir -p /home/xiaji/webstatus/sock\n\n"
default_content += "# 单行ExecStart无反斜杠避免格式错误\n"
default_content += "ExecStart=/usr/bin/gunicorn --pythonpath /home/xiaji/webstatus --workers 3 --bind unix:/home/xiaji/webstatus/sock/gunicorn.sock --access-logfile /home/xiaji/webstatus/logs/gunicorn_access.log --error-logfile /home/xiaji/webstatus/logs/gunicorn_error.log statuspage.wsgi:application\n\n"
default_content += "[Install]\n"
default_content += "WantedBy=multi-user.target"
self.service_editor.setPlainText(default_content)
def set_ssh_client(self, ssh_client):
"""设置SSH客户端"""
self.ssh_client = ssh_client
logger.info("Gunicorn标签页已设置SSH客户端")
def set_username(self, username):
"""设置用户名"""
self.username = username
logger.info(f"Gunicorn标签页已设置用户名: {username}")
def set_project_info(self, project_name, django_path):
"""设置项目信息"""
self.project_name = project_name
self.django_path = django_path
logger.info(f"Gunicorn标签页已设置项目信息: {project_name}, {django_path}")
# 更新服务文件内容
self.update_service_content()
# 更新命令编辑器中的项目名称
self.update_command_editor()
def update_service_content(self):
"""更新服务文件内容"""
content = self.service_editor.toPlainText()
# 替换占位符
content = content.replace("【用户名】", self.username)
content = content.replace("【项目名】", self.project_name)
content = content.replace("【Django路径】", self.django_path.rstrip('/'))
self.service_editor.setPlainText(content)
def update_command_editor(self):
"""更新命令编辑器中的项目名称"""
command_text = self.command_editor.toPlainText()
# 替换项目名称
command_text = command_text.replace("myproject.wsgi:application", f"{self.project_name}.wsgi:application")
self.command_editor.setPlainText(command_text)
logger.info(f"Gunicorn命令编辑器已更新项目名称: {self.project_name}")
def append_output(self, text):
"""添加输出到文本框"""
self.output_text.append(text)
def install_gunicorn(self):
"""安装Gunicorn"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
# 请求用户输入sudo密码
dialog = PasswordDialog(self)
if dialog.exec_() == QDialog.Accepted:
password = dialog.get_password()
self.append_output("正在安装Gunicorn...")
# 创建并启动Gunicorn安装线程
self.install_thread = GunicornInstallThread(self.ssh_client, password)
self.install_thread.result_ready.connect(self.on_install_result)
self.install_thread.progress_updated.connect(self.on_install_progress)
self.install_thread.start()
else:
self.append_output("用户取消了密码输入")
def on_install_progress(self, progress):
"""处理安装进度更新"""
self.append_output(f"安装进度: {progress}%")
logger.info(f"Gunicorn安装进度: {progress}%")
def on_install_result(self, success, message):
"""处理安装结果"""
if success:
self.append_output(f"安装成功: {message}")
logger.info(f"Gunicorn安装成功: {message}")
QMessageBox.information(self, "成功", message)
else:
self.append_output(f"安装失败: {message}")
logger.error(f"Gunicorn安装失败: {message}")
QMessageBox.warning(self, "错误", f"Gunicorn安装失败: {message}")
def test_gunicorn(self):
"""测试运行Gunicorn"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
if not self.project_name or not self.django_path:
self.append_output("错误: 未设置项目信息")
return
# 请求用户输入sudo密码
dialog = PasswordDialog(self)
if dialog.exec_() == QDialog.Accepted:
password = dialog.get_password()
self.append_output("正在测试运行Gunicorn...")
# 创建并启动Gunicorn测试线程
self.test_thread = GunicornTestThread(self.ssh_client, self.project_name, self.django_path, password)
self.test_thread.result_ready.connect(self.on_test_result)
self.test_thread.start()
else:
self.append_output("用户取消了密码输入")
def on_test_result(self, success, message):
"""处理测试结果"""
if success:
self.append_output(f"测试成功: {message}")
logger.info(f"Gunicorn测试成功: {message}")
QMessageBox.information(self, "成功", message)
else:
self.append_output(f"测试失败: {message}")
logger.error(f"Gunicorn测试失败: {message}")
QMessageBox.warning(self, "错误", f"Gunicorn测试失败: {message}")
def upload_service(self):
"""上传服务文件"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
if not self.project_name:
self.append_output("错误: 未设置项目名")
return
service_content = self.service_editor.toPlainText()
service_name = f"gunicorn_{self.project_name}.service"
# 请求用户输入sudo密码
dialog = PasswordDialog(self)
if dialog.exec_() == QDialog.Accepted:
password = dialog.get_password()
self.append_output(f"正在上传服务文件: {service_name}...")
# 创建并启动服务文件上传线程
self.upload_thread = GunicornServiceUploadThread(self.ssh_client, service_content, service_name, password)
self.upload_thread.result_ready.connect(self.on_upload_result)
self.upload_thread.start()
else:
self.append_output("用户取消了密码输入")
def on_upload_result(self, success, message):
"""处理上传结果"""
if success:
self.append_output(f"上传成功: {message}")
logger.info(f"Gunicorn服务文件上传成功: {message}")
QMessageBox.information(self, "成功", message)
else:
self.append_output(f"上传失败: {message}")
logger.error(f"Gunicorn服务文件上传失败: {message}")
QMessageBox.warning(self, "错误", f"Gunicorn服务文件上传失败: {message}")
def reload_daemon(self):
"""重新加载系统服务配置"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
# 请求用户输入sudo密码
dialog = PasswordDialog(self)
if dialog.exec_() == QDialog.Accepted:
password = dialog.get_password()
self.append_output("正在重新加载系统服务配置...")
# 创建并启动服务控制线程
self.control_thread = GunicornServiceControlThread(self.ssh_client, "", password, "daemon-reload")
self.control_thread.result_ready.connect(self.on_control_result)
self.control_thread.start()
else:
self.append_output("用户取消了密码输入")
def start_service(self):
"""启动Gunicorn服务"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
if not self.project_name:
self.append_output("错误: 未设置项目名")
return
service_name = f"gunicorn_{self.project_name}"
# 请求用户输入sudo密码
dialog = PasswordDialog(self)
if dialog.exec_() == QDialog.Accepted:
password = dialog.get_password()
self.append_output(f"正在启动Gunicorn服务: {service_name}...")
# 创建并启动服务控制线程
self.control_thread = GunicornServiceControlThread(self.ssh_client, service_name, password, "start")
self.control_thread.result_ready.connect(self.on_control_result)
self.control_thread.start()
else:
self.append_output("用户取消了密码输入")
def enable_service(self):
"""设置开机自启动"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
if not self.project_name:
self.append_output("错误: 未设置项目名")
return
service_name = f"gunicorn_{self.project_name}"
# 请求用户输入sudo密码
dialog = PasswordDialog(self)
if dialog.exec_() == QDialog.Accepted:
password = dialog.get_password()
self.append_output(f"正在设置Gunicorn服务开机自启动: {service_name}...")
# 创建并启动服务控制线程
self.control_thread = GunicornServiceControlThread(self.ssh_client, service_name, password, "enable")
self.control_thread.result_ready.connect(self.on_control_result)
self.control_thread.start()
else:
self.append_output("用户取消了密码输入")
def restart_service(self):
"""重启Gunicorn服务"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
if not self.project_name:
self.append_output("错误: 未设置项目名")
return
service_name = f"gunicorn_{self.project_name}"
# 请求用户输入sudo密码
dialog = PasswordDialog(self)
if dialog.exec_() == QDialog.Accepted:
password = dialog.get_password()
self.append_output(f"正在重启Gunicorn服务: {service_name}...")
# 创建并启动服务控制线程
self.control_thread = GunicornServiceControlThread(self.ssh_client, service_name, password, "restart")
self.control_thread.result_ready.connect(self.on_control_result)
self.control_thread.start()
else:
self.append_output("用户取消了密码输入")
def check_service_status(self):
"""查看Gunicorn服务状态"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
if not self.project_name:
self.append_output("错误: 未设置项目名")
return
service_name = f"gunicorn_{self.project_name}"
# 请求用户输入sudo密码
dialog = PasswordDialog(self)
if dialog.exec_() == QDialog.Accepted:
password = dialog.get_password()
self.append_output(f"正在查看Gunicorn服务状态: {service_name}...")
# 创建并启动服务控制线程
self.control_thread = GunicornServiceControlThread(self.ssh_client, service_name, password, "status")
self.control_thread.result_ready.connect(self.on_control_result)
self.control_thread.start()
else:
self.append_output("用户取消了密码输入")
def view_service_logs(self):
"""查看Gunicorn服务日志"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
if not self.project_name:
self.append_output("错误: 未设置项目名")
return
service_name = f"gunicorn_{self.project_name}"
# 请求用户输入sudo密码
dialog = PasswordDialog(self)
if dialog.exec_() == QDialog.Accepted:
password = dialog.get_password()
self.append_output(f"正在查看Gunicorn服务日志: {service_name}...")
# 创建并启动日志查看线程
self.log_thread = GunicornLogThread(self.ssh_client, service_name, password)
self.log_thread.result_ready.connect(self.on_log_result)
self.log_thread.start()
else:
self.append_output("用户取消了密码输入")
def on_log_result(self, success, message):
"""处理日志查看结果"""
if success:
self.append_output("--- Gunicorn服务日志 ---")
self.append_output(message)
self.append_output("--- 日志结束 ---")
logger.info("Gunicorn服务日志查看成功")
else:
self.append_output(f"查看日志失败: {message}")
logger.error(f"Gunicorn服务日志查看失败: {message}")
QMessageBox.warning(self, "错误", f"Gunicorn服务日志查看失败: {message}")
def check_log_files(self):
"""检查并创建Gunicorn日志文件目录和文件"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
if not self.username:
self.append_output("错误: 未设置用户名")
return
# 读取config.json获取git_url
try:
import json
with open('config.json', 'r', encoding='utf-8') as f:
config = json.load(f)
# 获取第一个服务器的配置
server_config = next(iter(config.values()))
git_url = server_config.get('git_url', '')
if not git_url:
self.append_output("错误: config.json中未找到git_url")
return
except Exception as e:
self.append_output(f"错误: 读取config.json失败: {str(e)}")
return
# 请求用户输入sudo密码
dialog = PasswordDialog(self)
if dialog.exec_() == QDialog.Accepted:
password = dialog.get_password()
self.append_output("正在检查并创建Gunicorn日志文件目录和文件...")
# 创建并启动日志检查线程
self.log_check_thread = GunicornLogCheckThread(self.ssh_client, self.username, git_url, password)
self.log_check_thread.result_ready.connect(self.on_log_check_result)
self.log_check_thread.start()
else:
self.append_output("用户取消了密码输入")
def on_log_check_result(self, success, message):
"""处理日志检查结果"""
if success:
self.append_output(message)
logger.info("Gunicorn日志文件检查和创建成功")
QMessageBox.information(self, "成功", "Gunicorn日志文件检查和创建成功")
else:
self.append_output(f"检查并创建日志文件失败: {message}")
logger.error(f"Gunicorn日志文件检查和创建失败: {message}")
QMessageBox.warning(self, "错误", f"Gunicorn日志文件检查和创建失败: {message}")
def on_control_result(self, success, message):
"""处理控制结果"""
if success:
self.append_output(f"操作成功: {message}")
logger.info(f"Gunicorn服务控制成功: {message}")
QMessageBox.information(self, "成功", message)
else:
self.append_output(f"操作失败: {message}")
logger.error(f"Gunicorn服务控制失败: {message}")
QMessageBox.warning(self, "错误", f"Gunicorn服务控制失败: {message}")
def set_timezone_and_reboot(self):
"""设置时区并重启服务器"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
# 请求用户输入sudo密码
dialog = PasswordDialog(self)
if dialog.exec_() == QDialog.Accepted:
password = dialog.get_password()
self.append_output("正在设置时区并重启服务器...")
# 创建并启动服务器控制线程
self.server_control_thread = ServerControlThread(self.ssh_client, password)
self.server_control_thread.result_ready.connect(self.on_server_control_result)
self.server_control_thread.start()
else:
self.append_output("用户取消了密码输入")
def on_server_control_result(self, success, message):
"""处理服务器控制结果"""
if success:
self.append_output(f"操作成功: {message}")
logger.info(f"服务器控制成功: {message}")
QMessageBox.information(self, "成功", message)
else:
self.append_output(f"操作失败: {message}")
logger.error(f"服务器控制失败: {message}")
QMessageBox.warning(self, "错误", f"服务器控制失败: {message}")
def run_gunicorn_command(self):
"""运行Gunicorn命令"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
command = self.command_editor.toPlainText().strip()
if not command:
self.append_output("错误: 命令不能为空")
return
# 请求用户输入sudo密码
dialog = PasswordDialog(self)
if dialog.exec_() == QDialog.Accepted:
password = dialog.get_password()
self.append_output(f"正在执行命令: {command}...")
# 创建并启动命令执行线程传递django_path和project_name参数
self.command_thread = GunicornCommandThread(self.ssh_client, command, password, self.django_path, self.project_name)
self.command_thread.result_ready.connect(self.on_command_result)
self.command_thread.start()
else:
self.append_output("用户取消了密码输入")
def on_command_result(self, success, message):
"""处理命令执行结果"""
if success:
self.append_output(f"命令执行成功: {message}")
logger.info(f"Gunicorn命令执行成功: {message}")
QMessageBox.information(self, "成功", message)
else:
self.append_output(f"命令执行失败: {message}")
logger.error(f"Gunicorn命令执行失败: {message}")
QMessageBox.warning(self, "错误", f"Gunicorn命令执行失败: {message}")