Compare commits

..

14 Commits

Author SHA1 Message Date
f82389299a 修改了出错的时候,容易崩溃的问题。 2025-09-12 21:37:21 +08:00
6ccf937729 修改了settings.py丢失的bug 2025-09-07 22:31:16 +08:00
329cc85c87 完善了bug,修改了nginx的静态文件和权限问题。 2025-09-07 14:24:17 +08:00
99fbfdc732 增加了sock的站点配置文件修改按钮
增加了下拉站点配置文件按钮
增加了静态文件的配置内容,写入到站点配置文件的按钮
2025-09-07 12:46:29 +08:00
d3b27dfda2 Merge branch 'main' of http://192.168.3.241:3000/xiaji/django.remote 2025-09-07 12:44:01 +08:00
cc90d6f947 增加修改sock,与Gunicorn配置相对应的按钮 2025-09-07 12:43:09 +08:00
fffed99165 修改了下载站点配置的按钮,修改了增加静态文件配置的按钮 2025-09-07 12:42:12 +08:00
7d39dff0e0 远程删除,防备跟将来的更新冲突
Signed-off-by: xiaji <rembme@163.com>
2025-09-07 10:22:29 +08:00
f98c33d76b 更新ignore文件 2025-09-07 10:20:58 +08:00
14e69c2bfd 完善nginx标签。使用unix套接字。增加远程命令的运行和显示。 2025-08-31 22:16:45 +08:00
47f3669dc4 增加nginx的标签 2025-08-31 20:35:59 +08:00
7bd1764103 Gunicorn测试完成,里程碑是服务注册成功,服务器自启动的时候也能正常运行 2025-08-31 19:55:23 +08:00
d20ddfed59 完成Django的标签的功能 2025-08-31 13:08:06 +08:00
9854a00542 完成git下载目录 2025-08-31 11:45:21 +08:00
18 changed files with 11690 additions and 165 deletions

111
.gitignore vendored Normal file
View File

@@ -0,0 +1,111 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# Custom settings file
settings.py

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

7733
app.log

File diff suppressed because it is too large Load Diff

View File

@@ -7,5 +7,14 @@
"project": "statuspage",
"git_url": "http://192.168.3.241:3000/xiaji/webstatus.git",
"remote_dir": "/home/xiaji"
},
"statuspage": {
"ip": "192.168.3.157",
"username": "xiaji",
"password": "xiaji",
"port": 22,
"project": "statuspage",
"git_url": "http://192.168.3.241:3000/xiaji/webstatus.git",
"remote_dir": "/home/xiaji"
}
}

456
django_tab.py Normal file
View File

@@ -0,0 +1,456 @@
import os
import sys
import datetime
from PySide6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QPushButton,
QLabel, QTextEdit, QFileDialog, QMessageBox,
QLineEdit, QDialog, QDialogButtonBox)
from PySide6.QtCore import QThread, Signal
from loguru import logger
from django_threads import DjangoInstallThread, DjangoCommandThread, UploadSettingsThread
class PasswordDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("输入密码")
self.setMinimumWidth(300)
layout = QVBoxLayout()
# 提示标签
label = QLabel("请输入sudo密码:")
layout.addWidget(label)
# 密码输入框
self.password_input = QLineEdit()
self.password_input.setEchoMode(QLineEdit.Password)
layout.addWidget(self.password_input)
# 按钮
button_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel)
button_box.accepted.connect(self.accept)
button_box.rejected.connect(self.reject)
layout.addWidget(button_box)
self.setLayout(layout)
def get_password(self):
return self.password_input.text()
class DjangoTab(QWidget):
def __init__(self):
super().__init__()
self.ssh_client = None
self.username = ""
self.manage_py_path = ""
self.settings_py_path = ""
self.init_ui()
def init_ui(self):
layout = QVBoxLayout()
# Django管理区域
manage_layout = QHBoxLayout()
# 安装Django按钮
self.install_django_btn = QPushButton("安装Django")
self.install_django_btn.clicked.connect(self.install_django)
manage_layout.addWidget(self.install_django_btn)
# 测试启动按钮
self.test_server_btn = QPushButton("测试启动")
self.test_server_btn.clicked.connect(self.test_server)
manage_layout.addWidget(self.test_server_btn)
# 检查Django状态按钮
self.check_django_btn = QPushButton("检查Django状态")
self.check_django_btn.clicked.connect(self.check_django_status)
manage_layout.addWidget(self.check_django_btn)
# 收集静态文件按钮
self.collect_static_btn = QPushButton("收集静态文件")
self.collect_static_btn.clicked.connect(self.collect_static)
manage_layout.addWidget(self.collect_static_btn)
layout.addLayout(manage_layout)
# 文件管理区域
file_layout = QHBoxLayout()
# 查找文件按钮
self.find_files_btn = QPushButton("查找manage.py和settings.py")
self.find_files_btn.clicked.connect(self.find_django_files)
file_layout.addWidget(self.find_files_btn)
# 下载settings.py按钮
self.download_settings_btn = QPushButton("下载settings.py")
self.download_settings_btn.clicked.connect(self.download_settings)
file_layout.addWidget(self.download_settings_btn)
# 上传settings.py按钮
self.upload_settings_btn = QPushButton("上传settings.py")
self.upload_settings_btn.clicked.connect(self.upload_settings)
file_layout.addWidget(self.upload_settings_btn)
layout.addLayout(file_layout)
# 文件路径显示区域
path_layout = QHBoxLayout()
# manage.py路径
manage_path_layout = QVBoxLayout()
manage_path_layout.addWidget(QLabel("manage.py路径:"))
self.manage_path_label = QLabel("未找到")
manage_path_layout.addWidget(self.manage_path_label)
path_layout.addLayout(manage_path_layout)
# settings.py路径
settings_path_layout = QVBoxLayout()
settings_path_layout.addWidget(QLabel("settings.py路径:"))
self.settings_path_label = QLabel("未找到")
settings_path_layout.addWidget(self.settings_path_label)
path_layout.addLayout(settings_path_layout)
layout.addLayout(path_layout)
# 文件编辑区域
edit_layout = QVBoxLayout()
edit_layout.addWidget(QLabel("settings.py编辑器:"))
# 文件编辑文本框
self.settings_editor = QTextEdit()
self.settings_editor.setReadOnly(True)
edit_layout.addWidget(self.settings_editor)
# 保存按钮
self.save_settings_btn = QPushButton("保存settings.py到服务器")
self.save_settings_btn.clicked.connect(self.save_settings_to_server)
self.save_settings_btn.setEnabled(False) # 初始状态禁用
edit_layout.addWidget(self.save_settings_btn)
layout.addLayout(edit_layout)
# 输出区域
self.output_text = QTextEdit()
self.output_text.setReadOnly(True)
layout.addWidget(self.output_text)
self.setLayout(layout)
def set_ssh_client(self, ssh_client):
"""设置SSH客户端"""
self.ssh_client = ssh_client
logger.info("Django标签页已设置SSH客户端")
def set_username(self, username):
"""设置用户名"""
self.username = username
logger.info(f"Django标签页已设置用户名: {username}")
def append_output(self, text):
"""添加输出到文本框"""
self.output_text.append(text)
def on_command_finished(self):
"""命令执行完成时的处理"""
logger.info("Django命令执行完成")
def install_django(self):
"""安装Django"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
# 请求用户输入sudo密码
dialog = PasswordDialog(self)
if dialog.exec_() == QDialog.Accepted:
password = dialog.get_password()
self.append_output("正在安装Django...")
# 创建并启动Django安装线程
self.install_thread = DjangoInstallThread(self.ssh_client, password)
self.install_thread.progress_updated.connect(self.on_install_progress)
self.install_thread.result_ready.connect(self.on_install_result)
self.install_thread.start()
else:
self.append_output("用户取消了密码输入")
def on_install_progress(self, progress):
"""处理安装进度更新"""
self.append_output(f"安装进度: {progress}%")
logger.info(f"Django安装进度: {progress}%")
def on_install_result(self, success, message):
"""处理安装结果"""
if success:
self.append_output(f"安装成功: {message}")
logger.info(f"Django安装成功: {message}")
QMessageBox.information(self, "成功", message)
else:
self.append_output(f"安装失败: {message}")
logger.error(f"Django安装失败: {message}")
QMessageBox.warning(self, "错误", f"Django安装失败: {message}")
def request_password(self):
"""请求用户输入密码"""
dialog = PasswordDialog(self)
if dialog.exec_() == QDialog.Accepted:
password = dialog.get_password()
self.thread.set_password(password)
logger.info("用户已输入密码")
else:
self.thread.set_password("")
logger.info("用户取消了密码输入")
def test_server(self):
"""测试启动Django服务器"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
if not self.manage_py_path:
self.append_output("错误: 未找到manage.py文件请先查找文件")
return
# 切换到manage.py所在目录并执行命令
manage_dir = os.path.dirname(self.manage_py_path)
command = f"cd {manage_dir} && python3 manage.py runserver 0.0.0.0:8000"
self.append_output(f"执行命令: {command}")
# 创建并启动线程执行命令
self.thread = DjangoCommandThread(self.ssh_client, command)
self.thread.output_signal.connect(self.append_output)
self.thread.finished_signal.connect(self.on_command_finished)
self.thread.start()
def check_django_status(self):
"""检查Django安装状态"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
command = "pip3 list | grep Django"
self.append_output(f"执行命令: {command}")
# 创建并启动线程执行命令
self.thread = DjangoCommandThread(self.ssh_client, command)
self.thread.output_signal.connect(self.append_output)
self.thread.finished_signal.connect(self.on_command_finished)
self.thread.start()
def collect_static(self):
"""收集静态文件"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
if not self.manage_py_path:
self.append_output("错误: 未找到manage.py文件请先查找文件")
return
# 切换到manage.py所在目录并执行命令
manage_dir = os.path.dirname(self.manage_py_path)
command = f"cd {manage_dir} && python3 manage.py collectstatic --noinput"
self.append_output(f"执行命令: {command}")
# 创建并启动线程执行命令
self.thread = DjangoCommandThread(self.ssh_client, command)
self.thread.output_signal.connect(self.append_output)
self.thread.finished_signal.connect(self.on_command_finished)
self.thread.start()
def find_django_files(self):
"""查找manage.py和settings.py文件"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
if not self.username:
self.append_output("错误: 未获取到用户名")
return
self.append_output("正在查找Django项目文件...")
# 查找manage.py文件
find_manage_cmd = f"find /home/{self.username} -name \"manage.py\" 2>/dev/null | head -5"
self.append_output(f"执行命令: {find_manage_cmd}")
# 创建并启动线程执行命令
self.thread = DjangoCommandThread(self.ssh_client, find_manage_cmd)
self.thread.output_signal.connect(self.process_manage_py_result)
self.thread.finished_signal.connect(self.find_settings_py)
self.thread.start()
def process_manage_py_result(self, text):
"""处理manage.py查找结果"""
if text.startswith("/") and text.endswith("manage.py"):
self.manage_py_path = text
self.manage_path_label.setText(text)
logger.info(f"找到manage.py文件: {text}")
def find_settings_py(self):
"""查找settings.py文件"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
if not self.username:
self.append_output("错误: 未获取到用户名")
return
# 查找settings.py文件
find_settings_cmd = f"find /home/{self.username} -name \"settings.py\" 2>/dev/null | head -5"
self.append_output(f"执行命令: {find_settings_cmd}")
# 创建并启动线程执行命令
self.thread = DjangoCommandThread(self.ssh_client, find_settings_cmd)
self.thread.output_signal.connect(self.process_settings_py_result)
self.thread.finished_signal.connect(self.on_command_finished)
self.thread.start()
def process_settings_py_result(self, text):
"""处理settings.py查找结果"""
if text.startswith("/") and text.endswith("settings.py"):
self.settings_py_path = text
self.settings_path_label.setText(text)
logger.info(f"找到settings.py文件: {text}")
def download_settings(self):
"""下载settings.py文件并在编辑器中显示"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
if not self.settings_py_path:
self.append_output("错误: 未找到settings.py文件请先查找文件")
return
try:
# 使用SFTP下载文件内容
sftp = self.ssh_client.open_sftp()
with sftp.file(self.settings_py_path, 'r') as remote_file:
file_content = remote_file.read().decode('utf-8')
sftp.close()
# 在编辑器中显示文件内容
self.settings_editor.setPlainText(file_content)
self.settings_editor.setReadOnly(False) # 允许编辑
self.save_settings_btn.setEnabled(True) # 启用保存按钮
self.append_output(f"文件已加载到编辑器: {self.settings_py_path}")
logger.info(f"settings.py已加载到编辑器: {self.settings_py_path}")
except Exception as e:
error_msg = f"下载文件时出错: {str(e)}"
self.append_output(error_msg)
logger.error(error_msg)
def save_settings_to_server(self):
"""将编辑器中的settings.py内容保存到服务器"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
if not self.settings_py_path:
self.append_output("错误: 未找到settings.py文件请先查找文件")
return
try:
# 获取编辑器中的内容
file_content = self.settings_editor.toPlainText()
# 使用SFTP上传文件内容
sftp = self.ssh_client.open_sftp()
with sftp.file(self.settings_py_path, 'w') as remote_file:
remote_file.write(file_content.encode('utf-8'))
sftp.close()
self.append_output(f"文件已保存到服务器: {self.settings_py_path}")
logger.info(f"settings.py已保存到服务器: {self.settings_py_path}")
# 显示成功消息
QMessageBox.information(self, "保存成功", "settings.py文件已成功保存到服务器")
except Exception as e:
error_msg = f"保存文件时出错: {str(e)}"
self.append_output(error_msg)
logger.error(error_msg)
def upload_settings(self):
"""上传settings.py文件"""
if not self.ssh_client:
self.append_output("错误: 未连接到服务器")
return
if not self.settings_py_path:
self.append_output("错误: 未找到settings.py文件请先查找文件")
return
# 直接打开文件选择对话框让用户选择本地settings.py文件
file_path, _ = QFileDialog.getOpenFileName(
self,
"选择要上传的settings.py文件",
"",
"Python文件 (*.py);;所有文件 (*)"
)
if not file_path:
self.append_output("用户取消了文件选择")
return
# 验证选择的文件是否为settings.py
if os.path.basename(file_path) != "settings.py":
reply = QMessageBox.question(
self,
"文件名验证",
"选择的文件不是settings.py是否继续上传",
QMessageBox.Yes | QMessageBox.No
)
if reply == QMessageBox.No:
self.append_output("用户取消了上传")
return
try:
# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as f:
settings_content = f.read()
# 获取Django项目路径settings.py所在目录的父目录
django_path = os.path.dirname(os.path.dirname(self.settings_py_path))
# 创建并启动上传线程
self.upload_thread = UploadSettingsThread(self.ssh_client, django_path, settings_content)
self.upload_thread.result_ready.connect(self.on_upload_result)
self.upload_thread.password_request_signal.connect(self.request_upload_password)
self.upload_thread.start()
self.append_output(f"正在上传文件: {os.path.basename(file_path)}...")
logger.info(f"开始上传settings.py文件: {file_path}")
except Exception as e:
error_msg = f"读取文件时出错: {str(e)}"
self.append_output(error_msg)
logger.error(error_msg)
def request_upload_password(self):
"""请求用户输入密码用于上传settings.py"""
dialog = PasswordDialog(self)
if dialog.exec_() == QDialog.Accepted:
password = dialog.get_password()
self.upload_thread.set_password(password)
self.append_output("密码已发送")
logger.info("用户已输入上传密码")
else:
self.upload_thread.set_password("")
self.append_output("用户取消了密码输入")
logger.info("用户取消了上传密码输入")
def on_upload_result(self, success, message):
"""处理上传结果"""
if success:
self.append_output(f"上传成功: {message}")
logger.info(f"settings.py上传成功: {message}")
QMessageBox.information(self, "上传成功", message)
else:
self.append_output(f"上传失败: {message}")
logger.error(f"settings.py上传失败: {message}")
QMessageBox.warning(self, "上传失败", f"settings.py上传失败: {message}")

364
django_threads.py Normal file
View File

@@ -0,0 +1,364 @@
import os
import sys
import time
from PySide6.QtCore import QThread, Signal
from loguru import logger
class DjangoInstallThread(QThread):
"""安装Django的线程"""
result_ready = Signal(bool, str)
progress_updated = Signal(int)
def __init__(self, ssh_client, password):
super().__init__()
self.ssh_client = ssh_client
self.password = password
def run(self):
try:
self.progress_updated.emit(10)
# 检查Django是否已安装
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
django_version = stdout.read().decode().strip()
if django_version:
self.result_ready.emit(True, f"Django已安装: {django_version}")
logger.info(f"Django已安装: {django_version}")
return
self.progress_updated.emit(30)
# 尝试使用pip安装
stdin, stdout, stderr = self.ssh_client.exec_command("pip3 install --break-system-packages django")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.progress_updated.emit(90)
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
django_version = stdout.read().decode().strip()
self.result_ready.emit(True, f"Django安装成功: {django_version}")
logger.info(f"Django安装成功: {django_version}")
return
self.progress_updated.emit(50)
# 如果pip安装失败尝试使用apt安装使用-S选项从标准输入读取密码
stdin, stdout, stderr = self.ssh_client.exec_command(f"echo '{self.password}' | sudo -S apt update && echo '{self.password}' | sudo -S apt install -y python3-django")
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.progress_updated.emit(90)
stdin, stdout, stderr = self.ssh_client.exec_command("python3 -m django --version")
django_version = stdout.read().decode().strip()
self.result_ready.emit(True, f"Django安装成功: {django_version}")
logger.info(f"Django安装成功: {django_version}")
else:
error = stderr.read().decode()
self.result_ready.emit(False, f"Django安装失败: {error}")
logger.error(f"Django安装失败: {error}")
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"Django安装异常: {error_msg}")
class DjangoCommandThread(QThread):
"""执行Django相关命令的线程"""
output_signal = Signal(str)
finished_signal = Signal()
password_request_signal = Signal() # 请求密码的信号
def __init__(self, ssh_client, command, log_file=None):
super().__init__()
self.ssh_client = ssh_client
self.command = command
self.password = None
self.waiting_for_password = False
self.log_file = log_file # 日志文件路径,如果提供则将输出重定向到该文件
def set_password(self, password):
self.password = password
self.waiting_for_password = False
def run(self):
try:
logger.info(f"执行Django命令: {self.command}")
# 如果提供了日志文件路径,修改命令以重定向输出
if self.log_file:
self.command = f"{self.command} > {self.log_file} 2>&1"
self.output_signal.emit(f"命令输出将重定向到日志文件: {self.log_file}")
# 如果命令包含sudo修改为使用-S选项从标准输入读取密码
if "sudo" in self.command:
command_with_sudo = self.command.replace("sudo", "sudo -S")
stdin, stdout, stderr = self.ssh_client.exec_command(command_with_sudo)
# 检查是否需要密码
password_prompt = False
for line in iter(stderr.readline, ""):
line_text = line.strip()
self.output_signal.emit(line_text)
if "password for" in line_text.lower() or "密码" in line_text:
password_prompt = True
break
# 如果需要密码,请求用户输入
if password_prompt:
self.waiting_for_password = True
self.password_request_signal.emit()
# 等待密码输入
while self.waiting_for_password:
self.msleep(100)
# 发送密码
if self.password:
stdin.write(self.password + "\n")
stdin.flush()
self.output_signal.emit("密码已发送")
else:
self.output_signal.emit("未提供密码,命令可能失败")
else:
stdin, stdout, stderr = self.ssh_client.exec_command(self.command)
# 对于apt命令使用特殊处理以显示进度
if "apt install" in self.command:
# 使用非交互模式并显示进度
self.output_signal.emit("正在安装软件包,请稍候...")
# 读取输出和错误,实时显示
import select
import time
# 检查是否需要密码
password_sent = False
while not stdout.channel.exit_status_ready():
# 检查是否有数据可读
r, w, e = select.select([stdout.channel, stderr.channel], [], [], 0.1)
if stdout.channel in r:
output = stdout.channel.recv(1024).decode('utf-8', errors='replace')
if output:
self.output_signal.emit(output.strip())
logger.info(f"命令输出: {output.strip()}")
if stderr.channel in r:
error = stderr.channel.recv(1024).decode('utf-8', errors='replace')
if error:
# 检查是否需要密码
if ("password for" in error.lower() or "密码" in error) and not password_sent:
self.output_signal.emit("检测到需要输入密码")
self.waiting_for_password = True
self.password_request_signal.emit()
# 等待密码输入
while self.waiting_for_password:
self.msleep(100)
# 发送密码
if self.password:
stdin.write(self.password + "\n")
stdin.flush()
password_sent = True
self.output_signal.emit("密码已发送")
else:
self.output_signal.emit("未提供密码,命令可能失败")
else:
self.output_signal.emit(error.strip())
logger.error(f"命令错误: {error.strip()}")
# 短暂休眠以避免过度占用CPU
time.sleep(0.01)
# 读取剩余输出
while True:
r, w, e = select.select([stdout.channel, stderr.channel], [], [], 0.1)
if not r:
break
if stdout.channel in r:
output = stdout.channel.recv(1024).decode('utf-8', errors='replace')
if output:
self.output_signal.emit(output.strip())
logger.info(f"命令输出: {output.strip()}")
if stderr.channel in r:
error = stderr.channel.recv(1024).decode('utf-8', errors='replace')
if error:
self.output_signal.emit(error.strip())
logger.error(f"命令错误: {error.strip()}")
else:
# 对于非apt命令使用原有的行读取方式
# 读取输出
for line in iter(stdout.readline, ""):
self.output_signal.emit(line.strip())
logger.info(f"命令输出: {line.strip()}")
# 读取错误
for line in iter(stderr.readline, ""):
line_text = line.strip()
if "password for" not in line_text.lower() and "密码" not in line_text: # 避免重复显示密码提示
self.output_signal.emit(f"错误: {line_text}")
logger.error(f"命令错误: {line_text}")
# 检查退出状态
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.output_signal.emit("命令执行成功")
logger.info(f"命令执行成功: {self.command}")
else:
self.output_signal.emit(f"命令执行失败,退出状态: {exit_status}")
logger.error(f"命令执行失败,退出状态: {exit_status}")
except Exception as e:
error_msg = f"执行命令时出错: {str(e)}"
self.output_signal.emit(error_msg)
logger.error(error_msg)
finally:
self.finished_signal.emit()
class UploadSettingsThread(QThread):
"""上传settings.py文件的线程"""
result_ready = Signal(bool, str)
password_request_signal = Signal() # 请求密码的信号
def __init__(self, ssh_client, django_path, settings_content):
super().__init__()
self.ssh_client = ssh_client
self.django_path = django_path
self.settings_content = settings_content
self.password = None
self.waiting_for_password = False
def set_password(self, password):
self.password = password
self.waiting_for_password = False
def run(self):
try:
# 查找settings.py文件
stdin, stdout, stderr = self.ssh_client.exec_command(f"find {self.django_path} -name settings.py")
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
self.result_ready.emit(False, "未找到settings.py文件")
logger.error("未找到settings.py文件")
return
settings_path = stdout.read().decode().strip()
# 创建临时文件
temp_file = "/tmp/settings_upload.py"
sftp = self.ssh_client.open_sftp()
with sftp.file(temp_file, 'w') as f:
f.write(self.settings_content)
# 在覆盖前备份原文件(带时间戳)
import datetime
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = f"{settings_path}.backup_{timestamp}"
# 备份原文件
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S cp {settings_path} {backup_path}")
# 检查是否需要密码
password_sent = False
while True:
if stderr.channel.recv_ready():
error = stderr.channel.recv(1024).decode('utf-8', errors='replace')
if error:
# 检查是否需要密码
if ("password for" in error.lower() or "密码" in error) and not password_sent:
self.waiting_for_password = True
self.password_request_signal.emit()
# 等待密码输入
while self.waiting_for_password:
self.msleep(100)
# 发送密码
if self.password:
stdin.write(self.password + "\n")
stdin.flush()
password_sent = True
else:
self.result_ready.emit(False, "未提供密码,备份失败")
logger.error("未提供密码,备份失败")
sftp.close()
return
else:
logger.error(f"备份错误: {error.strip()}")
if stdout.channel.exit_status_ready():
break
# 短暂休眠以避免过度占用CPU
time.sleep(0.01)
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error = stderr.read().decode()
self.result_ready.emit(False, f"备份原文件失败: {error}")
logger.error(f"备份原文件失败: {error}")
sftp.close()
return
logger.info(f"原文件已备份至: {backup_path}")
# 移动临时文件到目标位置
stdin, stdout, stderr = self.ssh_client.exec_command(f"sudo -S mv {temp_file} {settings_path}")
# 检查是否需要密码
password_sent = False
while True:
if stderr.channel.recv_ready():
error = stderr.channel.recv(1024).decode('utf-8', errors='replace')
if error:
# 检查是否需要密码
if ("password for" in error.lower() or "密码" in error) and not password_sent:
self.waiting_for_password = True
self.password_request_signal.emit()
# 等待密码输入
while self.waiting_for_password:
self.msleep(100)
# 发送密码
if self.password:
stdin.write(self.password + "\n")
stdin.flush()
password_sent = True
else:
self.result_ready.emit(False, "未提供密码,移动文件失败")
logger.error("未提供密码,移动文件失败")
sftp.close()
return
else:
logger.error(f"移动文件错误: {error.strip()}")
if stdout.channel.exit_status_ready():
break
# 短暂休眠以避免过度占用CPU
time.sleep(0.01)
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
self.result_ready.emit(True, f"settings.py上传成功原文件已备份为: {os.path.basename(backup_path)}")
logger.info(f"settings.py上传成功原文件已备份为: {os.path.basename(backup_path)}")
else:
error = stderr.read().decode()
self.result_ready.emit(False, error)
logger.error(f"settings.py上传失败: {error}")
sftp.close()
except Exception as e:
error_msg = str(e)
self.result_ready.emit(False, error_msg)
logger.error(f"settings.py上传异常: {error_msg}")

1203
gunicorn_tab.py Normal file

File diff suppressed because it is too large Load Diff

137
main.py
View File

@@ -1,10 +1,14 @@
import sys
from PySide6.QtWidgets import QApplication, QMainWindow, QTabWidget
import os
from PySide6.QtWidgets import QApplication, QMainWindow, QTabWidget, QStatusBar
from PySide6.QtCore import QSize
from loguru import logger
from server_connection_tab import ServerConnectionTab
from remote_commands_tab import RemoteCommandsTab
from django_tab import DjangoTab
from gunicorn_tab import GunicornTab
from nginx_tab import NginxTab
class MainWindow(QMainWindow):
def __init__(self):
@@ -18,6 +22,15 @@ class MainWindow(QMainWindow):
self.tabs = QTabWidget()
self.setCentralWidget(self.tabs)
# 创建状态栏
self.status_bar = QStatusBar()
self.setStatusBar(self.status_bar)
# 显示当前工作目录路径
current_dir = os.getcwd()
self.status_bar.showMessage(f"当前目录: {current_dir}")
logger.info(f"设置状态栏显示当前目录: {current_dir}")
# 添加服务器连接标签页
self.server_connection_tab = ServerConnectionTab()
self.tabs.addTab(self.server_connection_tab, "服务器连接")
@@ -26,6 +39,18 @@ class MainWindow(QMainWindow):
self.remote_commands_tab = RemoteCommandsTab()
self.tabs.addTab(self.remote_commands_tab, "远程命令")
# 添加Django管理标签页
self.django_tab = DjangoTab()
self.tabs.addTab(self.django_tab, "Django")
# 添加Gunicorn管理标签页
self.gunicorn_tab = GunicornTab()
self.tabs.addTab(self.gunicorn_tab, "Gunicorn")
# 添加Nginx管理标签页
self.nginx_tab = NginxTab()
self.tabs.addTab(self.nginx_tab, "Nginx")
# 连接标签页切换信号
self.tabs.currentChanged.connect(self.on_tab_changed)
@@ -34,8 +59,14 @@ class MainWindow(QMainWindow):
def on_tab_changed(self, index):
logger.info(f"标签页切换到: {index}")
# 更新状态栏显示当前目录信息
if index == 0: # 服务器连接标签页
current_dir = os.getcwd()
self.status_bar.showMessage(f"当前目录: {current_dir}")
logger.info(f"状态栏更新为本地目录: {current_dir}")
# 当切换到远程命令标签页时传递SSH客户端和服务器配置
if index == 1: # 远程命令标签页
elif index == 1: # 远程命令标签页
ssh_client = self.server_connection_tab.get_ssh_client()
self.remote_commands_tab.set_ssh_client(ssh_client)
@@ -46,10 +77,112 @@ class MainWindow(QMainWindow):
git_url = server_config.get("git_url", "")
remote_dir = server_config.get("remote_dir", "")
self.remote_commands_tab.set_server_config(git_url, remote_dir)
# 调用set_server_info方法更新服务器信息
self.remote_commands_tab.set_server_info(server_config)
# 尝试获取远程服务器当前目录并更新状态栏
try:
if hasattr(self.remote_commands_tab, 'current_dir_display') and self.remote_commands_tab.current_dir_display.text():
current_dir = self.remote_commands_tab.current_dir_display.text()
self.status_bar.showMessage(f"远程服务器 {current_alias}: {current_dir}")
logger.info(f"状态栏更新为远程服务器目录: {current_alias}: {current_dir}")
else:
# 更新状态栏显示远程服务器信息
self.status_bar.showMessage(f"远程服务器: {current_alias} | 远程目录: {remote_dir}")
logger.info(f"状态栏更新为远程服务器: {current_alias}, 目录: {remote_dir}")
except Exception as e:
logger.error(f"获取远程目录信息失败: {str(e)}")
# 更新状态栏显示远程服务器信息
self.status_bar.showMessage(f"远程服务器: {current_alias} | 远程目录: {remote_dir}")
logger.info(f"状态栏更新为远程服务器: {current_alias}, 目录: {remote_dir}")
else:
# 如果没有配置远程目录,初始化为默认目录
self.remote_commands_tab.current_dir_display.setText("~")
self.remote_commands_tab.refresh_directory()
# 更新状态栏显示远程服务器信息
self.status_bar.showMessage(f"远程服务器: {current_alias} | 远程目录: ~")
logger.info(f"状态栏更新为远程服务器: {current_alias}, 目录: ~")
# 当切换到Django标签页时传递SSH客户端和用户名
elif index == 2: # Django标签页
ssh_client = self.server_connection_tab.get_ssh_client()
self.django_tab.set_ssh_client(ssh_client)
# 获取当前选中的服务器配置中的用户名
current_alias = self.server_connection_tab.alias_combo.currentText()
if current_alias and current_alias in self.server_connection_tab.config_data:
server_config = self.server_connection_tab.config_data[current_alias]
username = server_config.get("username", "")
self.django_tab.set_username(username)
# 更新状态栏显示Django项目信息
project_name = server_config.get("project", "")
remote_dir = server_config.get("remote_dir", "")
self.status_bar.showMessage(f"远程服务器: {current_alias} | Django项目: {project_name} | 项目目录: {remote_dir}")
logger.info(f"状态栏更新为Django项目: {project_name}, 目录: {remote_dir}")
# 当切换到Gunicorn标签页时传递SSH客户端、用户名和项目信息
elif index == 3: # Gunicorn标签页
ssh_client = self.server_connection_tab.get_ssh_client()
self.gunicorn_tab.set_ssh_client(ssh_client)
# 获取当前选中的服务器配置中的用户名和项目信息
current_alias = self.server_connection_tab.alias_combo.currentText()
if current_alias and current_alias in self.server_connection_tab.config_data:
server_config = self.server_connection_tab.config_data[current_alias]
username = server_config.get("username", "")
project_name = server_config.get("project", "")
# 获取Django路径根据用户说明路径应该是/home/[user]/[project_path]/[project_name]
# 其中user是config.json中的usernameproject_path是git_url中的webstatusproject_name是project的值
remote_dir = server_config.get("remote_dir", "")
git_url = server_config.get("git_url", "")
project_name = server_config.get("project", "")
# 从git_url中提取project_pathwebstatus
project_path = ""
if git_url:
# git_url格式为http://192.168.3.241:3000/xiaji/webstatus.git
# 提取最后一个/和.git之间的部分作为project_path
import os
git_name = os.path.basename(git_url) # 获取webstatus.git
if git_name.endswith(".git"):
project_path = git_name[:-4] # 去掉.git后缀得到webstatus
# 构建Django路径/home/[user]/[project_path]/ (statuspage的父目录)
if remote_dir and project_path:
django_path = f"{remote_dir}/{project_path}/"
else:
django_path = f"{remote_dir}/" if remote_dir else ""
logger.info(f"构建的Django路径: {django_path}, 项目名: {project_name}")
self.gunicorn_tab.set_username(username)
self.gunicorn_tab.set_project_info(project_name, django_path)
# 更新状态栏显示Gunicorn服务信息
self.status_bar.showMessage(f"远程服务器: {current_alias} | Gunicorn服务: gunicorn_{project_name} | 服务目录: {django_path}")
logger.info(f"状态栏更新为Gunicorn服务: gunicorn_{project_name}, 目录: {django_path}")
# 当切换到Nginx标签页时传递SSH客户端、用户名和项目信息
elif index == 4: # Nginx标签页
ssh_client = self.server_connection_tab.get_ssh_client()
self.nginx_tab.set_ssh_client(ssh_client)
# 获取当前选中的服务器配置中的用户名和项目信息
current_alias = self.server_connection_tab.alias_combo.currentText()
if current_alias and current_alias in self.server_connection_tab.config_data:
server_config = self.server_connection_tab.config_data[current_alias]
username = server_config.get("username", "")
project_name = server_config.get("project", "")
server_ip = server_config.get("ip", "")
self.nginx_tab.set_username(username)
self.nginx_tab.set_project_info(project_name, server_ip)
# 更新状态栏显示Nginx服务信息
self.status_bar.showMessage(f"远程服务器: {current_alias} | Nginx服务: nginx | 项目: {project_name}")
logger.info(f"状态栏更新为Nginx服务: nginx, 项目: {project_name}")
if __name__ == "__main__":
logger.add("app.log", rotation="10 MB")

1125
nginx_tab.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -52,32 +52,60 @@ class RemoteCommandThread(QThread):
try:
logger.info(f"执行远程命令: {self.command}")
# 检查SSH连接是否仍然有效
try:
# 尝试执行一个简单的命令来检查连接
transport = self.ssh_client.get_transport() if self.ssh_client else None
if not transport or not transport.is_active():
raise Exception("SSH连接已断开")
except Exception as e:
logger.error(f"SSH连接检查失败: {str(e)}")
self.output_signal.emit(f"错误: SSH连接已断开请重新连接服务器")
self.finished_signal.emit(False, f"SSH连接已断开请重新连接服务器")
return
# 在执行命令前,先输出当前目录
try:
pwd_stdin, pwd_stdout, pwd_stderr = self.ssh_client.exec_command("pwd")
pwd_output = pwd_stdout.read().decode().strip()
if pwd_output:
self.output_signal.emit(f"当前目录: {pwd_output}")
logger.info(f"当前目录: {pwd_output}")
except Exception as e:
logger.error(f"获取当前目录失败: {str(e)}")
# 如果命令包含sudo修改为使用-S选项从标准输入读取密码
if "sudo" in self.command:
command_with_sudo = self.command.replace("sudo", "sudo -S")
stdin, stdout, stderr = self.ssh_client.exec_command(command_with_sudo)
# 检查是否需要密码
password_prompt = False
for line in stderr:
self.output_signal.emit(line)
if "password for" in line.lower() or "密码" in line:
password_prompt = True
break
# 如果需要密码,请求用户输入
if password_prompt:
self.waiting_for_password = True
self.password_request_signal.emit()
# 如果预先设置了密码,直接发送
if self.password:
logger.info("使用预先设置的密码")
stdin.write(self.password + "\n")
stdin.flush()
else:
# 检查是否需要密码
password_prompt = False
for line in stderr:
self.output_signal.emit(line)
if "password for" in line.lower() or "密码" in line:
password_prompt = True
break
# 等待密码输入
while self.waiting_for_password:
self.msleep(100)
# 发送密码
if self.password:
stdin.write(self.password + "\n")
stdin.flush()
# 如果需要密码,请求用户输入
if password_prompt:
self.waiting_for_password = True
self.password_request_signal.emit()
# 等待密码输入
while self.waiting_for_password:
self.msleep(100)
# 发送密码
if self.password:
stdin.write(self.password + "\n")
stdin.flush()
else:
stdin, stdout, stderr = self.ssh_client.exec_command(self.command)
@@ -106,8 +134,15 @@ class RemoteCommandThread(QThread):
except Exception as e:
logger.error(f"执行命令时发生错误: {str(e)}")
self.output_signal.emit(f"错误: {str(e)}")
self.finished_signal.emit(False, f"执行命令时发生错误: {str(e)}")
error_msg = str(e)
# 检查是否是Socket关闭错误
if "Socket is closed" in error_msg or "SSH session not active" in error_msg:
self.output_signal.emit(f"错误: SSH连接已断开请重新连接服务器")
self.finished_signal.emit(False, "SSH连接已断开请重新连接服务器")
else:
self.output_signal.emit(f"错误: {error_msg}")
self.finished_signal.emit(False, f"执行命令时发生错误: {error_msg}")
class RemoteCommandsTab(QWidget):
def __init__(self):
@@ -153,6 +188,15 @@ class RemoteCommandsTab(QWidget):
self.clone_button = QPushButton("克隆项目")
self.clone_button.clicked.connect(self.clone_repository)
right_layout.addWidget(self.clone_button)
self.pull_button = QPushButton("拉取更新")
self.pull_button.clicked.connect(self.pull_repository)
right_layout.addWidget(self.pull_button)
self.handle_changes_button = QPushButton("处理本地更改")
self.handle_changes_button.clicked.connect(self.handle_local_changes)
right_layout.addWidget(self.handle_changes_button)
right_layout.addStretch()
clone_layout.addLayout(right_layout)
@@ -161,6 +205,32 @@ class RemoteCommandsTab(QWidget):
git_group.setLayout(git_layout)
main_layout.addWidget(git_group)
# 系统管理组
system_group = QGroupBox("系统管理")
system_layout = QVBoxLayout()
# 自定义命令执行区域
custom_command_layout = QVBoxLayout()
custom_command_layout.addWidget(QLabel("自定义命令:"))
# 命令输入框
self.custom_command_input = QTextEdit()
self.custom_command_input.setMaximumHeight(100)
custom_command_layout.addWidget(self.custom_command_input)
# 执行按钮
execute_button_layout = QHBoxLayout()
self.execute_command_button = QPushButton("执行命令")
self.execute_command_button.clicked.connect(self.execute_custom_command)
execute_button_layout.addWidget(self.execute_command_button)
execute_button_layout.addStretch()
custom_command_layout.addLayout(execute_button_layout)
system_layout.addLayout(custom_command_layout)
system_group.setLayout(system_layout)
main_layout.addWidget(system_group)
# 目录管理组
dir_group = QGroupBox("目录管理")
dir_layout = QVBoxLayout()
@@ -172,7 +242,7 @@ class RemoteCommandsTab(QWidget):
# 允许用户输入当前目录路径
# self.current_dir_display.setReadOnly(True)
# 添加回车键刷新目录功能
self.current_dir_display.returnPressed.connect(self.refresh_directory)
self.current_dir_display.returnPressed.connect(self.on_current_dir_entered)
current_dir_layout.addWidget(self.current_dir_display)
# 刷新目录按钮
@@ -228,6 +298,26 @@ class RemoteCommandsTab(QWidget):
self.setLayout(main_layout)
logger.info("远程命令标签页UI初始化完成")
def set_server_info(self, server_info):
"""设置服务器信息"""
logger.info(f"设置服务器信息: {server_info}")
self.server_info = server_info
# 更新主窗口状态栏显示当前目录
try:
main_window = self.parent().parent()
if hasattr(main_window, 'status_bar') and self.ssh_client:
current_dir = self.current_dir_display.text().strip()
server_host = self.server_info.get('host', '未知')
if current_dir:
main_window.status_bar.showMessage(f"远程服务器 {server_host}: {current_dir}")
logger.info(f"主窗口状态栏更新为远程服务器目录: {server_host}: {current_dir}")
else:
main_window.status_bar.showMessage(f"远程服务器: {server_host}")
logger.info(f"主窗口状态栏更新为远程服务器: {server_host}")
except Exception as e:
logger.error(f"更新主窗口状态栏失败: {str(e)}")
def set_ssh_client(self, ssh_client):
logger.info("设置SSH客户端")
self.ssh_client = ssh_client
@@ -235,9 +325,33 @@ class RemoteCommandsTab(QWidget):
if self.ssh_client:
self.status_label.setText("已连接到服务器")
self.status_label.setStyleSheet("color: green;")
# 更新主窗口状态栏显示当前目录
try:
main_window = self.parent().parent()
if hasattr(main_window, 'status_bar'):
current_dir = self.current_dir_display.text().strip()
server_host = self.server_info.get('host', '未知') if hasattr(self, 'server_info') else '未知'
if current_dir:
main_window.status_bar.showMessage(f"远程服务器 {server_host}: {current_dir}")
logger.info(f"主窗口状态栏更新为远程服务器目录: {server_host}: {current_dir}")
else:
main_window.status_bar.showMessage(f"远程服务器: {server_host}")
logger.info(f"主窗口状态栏更新为远程服务器: {server_host}")
except Exception as e:
logger.error(f"更新主窗口状态栏失败: {str(e)}")
else:
self.status_label.setText("未连接到服务器")
self.status_label.setStyleSheet("color: red;")
# 更新主窗口状态栏显示未连接状态
try:
main_window = self.parent().parent()
if hasattr(main_window, 'status_bar'):
main_window.status_bar.showMessage("未连接到远程服务器")
logger.info("主窗口状态栏更新为未连接状态")
except Exception as e:
logger.error(f"更新主窗口状态栏失败: {str(e)}")
def set_server_config(self, git_url, remote_dir):
logger.info(f"设置服务器配置: git_url={git_url}, remote_dir={remote_dir}")
@@ -248,6 +362,16 @@ class RemoteCommandsTab(QWidget):
if remote_dir:
self.current_dir_display.setText(remote_dir)
self.refresh_directory()
# 更新主窗口状态栏显示当前目录
try:
main_window = self.parent().parent()
if hasattr(main_window, 'status_bar'):
server_host = self.server_info['host'] if hasattr(self, 'server_info') and 'host' in self.server_info else '未知'
main_window.status_bar.showMessage(f"远程服务器 {server_host}: {remote_dir}")
logger.info(f"主窗口状态栏更新为远程服务器目录: {server_host}: {remote_dir}")
except Exception as e:
logger.error(f"更新主窗口状态栏失败: {str(e)}")
def install_git(self):
logger.info("安装Git")
@@ -301,15 +425,149 @@ class RemoteCommandsTab(QWidget):
# 构建克隆命令
if remote_dir:
# 如果指定了远程目录,先创建目录(如果不存在),然后克隆到指定目录
clone_command = f"mkdir -p {remote_dir} && cd {remote_dir} && git clone {repo_url}"
clone_command = f"mkdir -p {remote_dir} && cd {remote_dir} && git clone --verbose {repo_url}"
else:
clone_command = f"git clone {repo_url}"
clone_command = f"git clone --verbose {repo_url}"
# 创建并启动线程执行命令
self.command_thread = RemoteCommandThread(self.ssh_client, clone_command)
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(self.on_command_finished)
self.command_thread.start()
def pull_repository(self):
logger.info("拉取仓库更新")
if not self.ssh_client:
QMessageBox.warning(self, "警告", "请先连接到服务器")
return
remote_dir = self.remote_dir_display.text().strip()
if not remote_dir:
QMessageBox.warning(self, "警告", "请先设置远程目录")
return
# 从仓库URL中提取项目名称
repo_url = self.repo_url_input.text().strip()
if not repo_url:
QMessageBox.warning(self, "警告", "请输入仓库URL")
return
# 从URL中提取项目名例如http://example.com/repo.git -> repo
project_name = repo_url.split('/')[-1].replace('.git', '')
project_path = f"{remote_dir}/{project_name}"
self.output_text.clear()
self.status_label.setText("正在拉取仓库更新...")
# 构建拉取命令
pull_command = f"cd {project_path} && git pull --verbose"
# 创建并启动线程执行命令
self.command_thread = RemoteCommandThread(self.ssh_client, pull_command)
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(self.on_command_finished)
self.command_thread.start()
def handle_local_changes(self):
logger.info("处理本地更改")
if not self.ssh_client:
QMessageBox.warning(self, "警告", "请先连接到服务器")
return
remote_dir = self.remote_dir_display.text().strip()
if not remote_dir:
QMessageBox.warning(self, "警告", "请先设置远程目录")
return
# 从仓库URL中提取项目名称
repo_url = self.repo_url_input.text().strip()
if not repo_url:
QMessageBox.warning(self, "警告", "请输入仓库URL")
return
# 从URL中提取项目名例如http://example.com/repo.git -> repo
project_name = repo_url.split('/')[-1].replace('.git', '')
project_path = f"{remote_dir}/{project_name}"
self.output_text.clear()
self.status_label.setText("正在检查本地更改...")
# 创建对话框询问用户如何处理本地更改
reply = QMessageBox.question(
self,
"处理本地更改",
"检测到本地有未提交的更改,请选择处理方式:\n\n"
"是(Y) - 暂存更改(stash),拉取更新后再恢复\n"
"否(N) - 放弃本地更改,直接拉取更新\n"
"取消 - 取消操作",
QMessageBox.Yes | QMessageBox.No | QMessageBox.Cancel,
QMessageBox.Cancel
)
if reply == QMessageBox.Yes:
# 暂存更改
self.status_label.setText("正在暂存更改...")
stash_command = f"cd {project_path} && git stash push -m 'Auto-stash before pull'"
self.command_thread = RemoteCommandThread(self.ssh_client, stash_command)
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(lambda success, msg: self.on_stash_finished(success, msg, project_path))
self.command_thread.start()
elif reply == QMessageBox.No:
# 放弃本地更改,强制拉取
self.status_label.setText("正在重置本地更改...")
reset_command = f"cd {project_path} && git reset --hard HEAD && git clean -fd"
self.command_thread = RemoteCommandThread(self.ssh_client, reset_command)
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(lambda success, msg: self.on_reset_finished(success, msg, project_path))
self.command_thread.start()
# 如果选择取消,不做任何操作
def on_stash_finished(self, success, message, project_path):
"""暂存操作完成后的处理"""
if success:
self.append_output("暂存更改成功,正在拉取更新...")
# 拉取更新
pull_command = f"cd {project_path} && git pull --verbose"
self.command_thread = RemoteCommandThread(self.ssh_client, pull_command)
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(lambda success, msg: self.on_pull_after_stash_finished(success, msg, project_path))
self.command_thread.start()
else:
self.status_label.setText("暂存更改失败")
self.status_label.setStyleSheet("color: red;")
self.append_output(f"暂存更改失败: {message}")
def on_pull_after_stash_finished(self, success, message, project_path):
"""暂存后拉取更新完成后的处理"""
if success:
self.append_output("拉取更新成功,正在恢复暂存的更改...")
# 恢复暂存的更改
pop_command = f"cd {project_path} && git stash pop"
self.command_thread = RemoteCommandThread(self.ssh_client, pop_command)
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(self.on_command_finished)
self.command_thread.start()
else:
self.status_label.setText("拉取更新失败")
self.status_label.setStyleSheet("color: red;")
self.append_output(f"拉取更新失败: {message}")
def on_reset_finished(self, success, message, project_path):
"""重置操作完成后的处理"""
if success:
self.append_output("重置本地更改成功,正在拉取更新...")
# 拉取更新
pull_command = f"cd {project_path} && git pull --verbose"
self.command_thread = RemoteCommandThread(self.ssh_client, pull_command)
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(self.on_command_finished)
self.command_thread.start()
else:
self.status_label.setText("重置本地更改失败")
self.status_label.setStyleSheet("color: red;")
self.append_output(f"重置本地更改失败: {message}")
def append_output(self, text):
self.output_text.append(text)
@@ -326,7 +584,36 @@ class RemoteCommandsTab(QWidget):
else:
self.status_label.setText(message)
self.status_label.setStyleSheet("color: red;")
QMessageBox.warning(self, "错误", message)
# 检查是否是SSH连接断开错误
if "SSH连接已断开" in message:
reply = QMessageBox.question(self, "SSH连接已断开",
f"{message}\n\n是否现在重新连接服务器?",
QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes)
if reply == QMessageBox.Yes:
self.reconnect_ssh()
else:
QMessageBox.warning(self, "错误", message)
def on_current_dir_entered(self):
"""处理用户输入目录路径并按回车键的情况"""
logger.info("用户输入目录路径并按回车键")
# 更新主窗口状态栏显示当前目录
try:
main_window = self.parent().parent()
if hasattr(main_window, 'status_bar'):
current_dir = self.current_dir_display.text().strip()
server_host = self.server_info['host'] if hasattr(self, 'server_info') and 'host' in self.server_info else '未知'
if current_dir:
main_window.status_bar.showMessage(f"远程服务器 {server_host}: {current_dir}")
logger.info(f"主窗口状态栏更新为远程服务器目录: {server_host}: {current_dir}")
except Exception as e:
logger.error(f"更新主窗口状态栏失败: {str(e)}")
# 刷新目录列表
self.refresh_directory()
def refresh_directory(self):
logger.info("刷新目录列表")
@@ -354,16 +641,83 @@ class RemoteCommandsTab(QWidget):
def append_dir_output(self, text):
self.dir_list_text.append(text)
# 将目录信息输出到日志文件
logger.info(f"目录列表信息: {text.strip()}")
def on_dir_refresh_finished(self, success, message):
if success:
self.status_label.setText("目录列表已刷新")
self.status_label.setStyleSheet("color: green;")
logger.info("目录列表刷新成功")
# 更新主窗口状态栏显示当前目录
try:
main_window = self.parent().parent()
if hasattr(main_window, 'status_bar'):
current_dir = self.current_dir_display.text()
server_host = self.server_info['host'] if hasattr(self, 'server_info') and 'host' in self.server_info else '未知'
main_window.status_bar.showMessage(f"远程服务器 {server_host}: {current_dir}")
logger.info(f"主窗口状态栏更新为远程服务器目录: {server_host}: {current_dir}")
except Exception as e:
logger.error(f"更新主窗口状态栏失败: {str(e)}")
else:
self.status_label.setText("刷新目录列表失败")
self.status_label.setStyleSheet("color: red;")
logger.error(f"刷新目录列表失败: {message}")
QMessageBox.warning(self, "错误", f"刷新目录列表失败: {message}")
def execute_custom_command(self):
logger.info("执行自定义命令")
if not self.ssh_client:
QMessageBox.warning(self, "警告", "请先连接到服务器")
return
command = self.custom_command_input.toPlainText().strip()
if not command:
QMessageBox.warning(self, "警告", "请输入要执行的命令")
return
self.output_text.clear()
self.status_label.setText("正在执行命令...")
# 如果命令以sudo开头预先请求密码
if command.startswith("sudo "):
logger.info("检测到sudo命令预先请求密码")
# 创建密码输入对话框
dialog = PasswordDialog(self)
if dialog.exec() == QDialog.Accepted:
password = dialog.get_password()
if not password:
QMessageBox.warning(self, "警告", "未输入密码,取消执行命令")
return
else:
QMessageBox.warning(self, "警告", "已取消输入密码")
return
# 创建并启动线程执行命令
self.command_thread = RemoteCommandThread(self.ssh_client, command)
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(self.on_custom_command_finished)
self.command_thread.password_request_signal.connect(self.request_password)
# 如果是sudo命令且已获取密码预先设置密码
if command.startswith("sudo ") and password:
self.command_thread.set_password(password)
self.output_text.append("密码已设置\n")
self.command_thread.start()
def on_custom_command_finished(self, success, message):
if success:
self.status_label.setText("命令执行成功")
self.status_label.setStyleSheet("color: green;")
self.output_text.append(f"\n=== 命令执行成功 ===\n{message}")
else:
self.status_label.setText("命令执行失败")
self.status_label.setStyleSheet("color: red;")
self.output_text.append(f"\n=== 命令执行失败 ===\n{message}")
def delete_directory(self):
logger.info("删除目录")
@@ -396,4 +750,51 @@ class RemoteCommandsTab(QWidget):
self.command_thread = RemoteCommandThread(self.ssh_client, command)
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(self.on_command_finished)
self.command_thread.start()
self.command_thread.start()
def check_ssh_connection(self):
"""检查SSH连接是否有效"""
if not self.ssh_client:
return False
try:
transport = self.ssh_client.get_transport()
return transport and transport.is_active()
except Exception as e:
logger.error(f"检查SSH连接时发生错误: {str(e)}")
return False
def reconnect_ssh(self):
"""重新连接SSH服务器"""
logger.info("尝试重新连接SSH服务器")
# 关闭现有连接
if self.ssh_client:
try:
self.ssh_client.close()
except:
pass
self.ssh_client = None
# 切换到服务器连接标签页
main_window = self.parent().parent()
if hasattr(main_window, 'tabs'):
main_window.tabs.setCurrentIndex(0) # 切换到服务器连接标签页
# 显示提示信息
self.status_label.setText("请重新连接服务器")
self.status_label.setStyleSheet("color: orange;")
self.output_text.append("\n=== SSH连接已断开 ===\n")
self.output_text.append("请切换到\"服务器连接\"标签页重新连接服务器\n")
# 显示重新连接的对话框
reply = QMessageBox.question(self, "SSH连接已断开",
"SSH连接已断开是否现在重新连接",
QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes)
if reply == QMessageBox.Yes:
# 模拟点击连接按钮
server_connection_tab = main_window.server_connection_tab
server_connection_tab.connect_to_server()

View File

@@ -210,6 +210,19 @@ class ServerConnectionTab(QWidget):
self.status_label.setStyleSheet("color: green;")
QMessageBox.information(self, "成功", f"成功连接到服务器: {ip}")
# 更新状态栏显示服务器信息
try:
main_window = self.parent().parent()
if hasattr(main_window, 'remote_commands_tab'):
server_config = {
'host': ip,
'remote_dir': self.remote_dir_input.text()
}
main_window.remote_commands_tab.set_server_info(server_config)
logger.info(f"连接服务器后更新状态栏: {ip}")
except Exception as e:
logger.error(f"连接服务器后更新状态栏失败: {str(e)}")
except Exception as e:
logger.error(f"连接服务器失败: {str(e)}")
self.status_label.setText("连接失败")

View File

@@ -0,0 +1,113 @@
from pathlib import Path
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/4.2/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'django-insecure-$w+8+hw%p$2xi_fi+7avahc&03-y@x05e^r02-x3nt5johmk6l'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
ALLOWED_HOSTS = []
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'status',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.locale.LocaleMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'statuspage.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'statuspage.wsgi.application'
# Database
# https://docs.djangoproject.com/en/4.2/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
}
}
# Password validation
# https://docs.djangoproject.com/en/4.2/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
# https://docs.djangoproject.com/en/4.2/topics/i18n/
LANGUAGE_CODE = 'zh-hans'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/4.2/howto/static-files/
STATIC_URL = 'static/'
# Default primary key field type
# https://docs.djangoproject.com/en/4.2/ref/settings/#default-auto-field
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'

136
生产系统settings.py Normal file
View File

@@ -0,0 +1,136 @@
from pathlib import Path
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/4.2/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'django-insecure-$w+8+hw%p$2xi_fi+7avahc&03-y@x05e^r02-x3nt5johmk6l'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
ALLOWED_HOSTS = ['192.168.3.157']
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'status',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.locale.LocaleMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'statuspage.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'statuspage.wsgi.application'
# Database
# https://docs.djangoproject.com/en/4.2/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
}
}
# Password validation
# https://docs.djangoproject.com/en/4.2/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
# https://docs.djangoproject.com/en/4.2/topics/i18n/
LANGUAGE_CODE = 'zh-hans'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/4.2/howto/static-files/
STATIC_URL = 'static/'
# Default primary key field type
# https://docs.djangoproject.com/en/4.2/ref/settings/#default-auto-field
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
# REST Framework settings
REST_FRAMEWORK = {
'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
'PAGE_SIZE': 20,
'DEFAULT_AUTHENTICATION_CLASSES': [
# 初期不使用认证后续可以添加Token认证
# 'rest_framework.authentication.TokenAuthentication',
],
'DEFAULT_PERMISSION_CLASSES': [
# 初期允许所有访问,后续可以添加权限控制
'rest_framework.permissions.AllowAny',
],
'DEFAULT_RENDERER_CLASSES': [
'rest_framework.renderers.JSONRenderer',
'rest_framework.renderers.BrowsableAPIRenderer',
],
}
STATIC_ROOT = BASE_DIR / 'static' # 注意这里是 'static' 而非 '/static'
MEDIA_ROOT = BASE_DIR / 'media'