Files
djangohelper/remote_command_tab.py
xiaji d559a85feb 最初一个版本,已经初步实现各种功能
主页连接
远程命令
django
Gunicorn操作
2025-08-28 20:44:35 +08:00

291 lines
12 KiB
Python

import os
from loguru import logger
from PySide6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit,
QPushButton, QComboBox, QMessageBox, QTextEdit,
QGroupBox, QGridLayout)
from PySide6.QtCore import Qt
from threads import (GitInstallThread, GitCloneThread, ListDirectoryThread,
DeleteDirectoryThread, SetTimezoneAndRestartThread,
CheckFirewallThread, OpenPortThread)
class RemoteCommandTab(QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.parent = parent
self.init_ui()
def init_ui(self):
layout = QVBoxLayout()
# Git管理组
git_group = QGroupBox("Git代码管理")
git_layout = QGridLayout()
git_layout.addWidget(QLabel("Git仓库URL:"), 0, 0)
self.git_url_input = QLineEdit()
self.git_url_input.setPlaceholderText("https://github.com/username/repo.git")
git_layout.addWidget(self.git_url_input, 0, 1)
git_layout.addWidget(QLabel("项目路径:"), 1, 0)
self.project_path_input = QLineEdit()
self.project_path_input.setPlaceholderText("/home/user/project")
git_layout.addWidget(self.project_path_input, 1, 1)
git_layout.addWidget(QLabel("删除目录:"), 2, 0)
self.delete_dir_input = QLineEdit()
self.delete_dir_input.setPlaceholderText("/home/user/old_project")
git_layout.addWidget(self.delete_dir_input, 2, 1)
self.install_git_btn = QPushButton("安装Git")
self.install_git_btn.clicked.connect(self.install_git)
git_layout.addWidget(self.install_git_btn, 0, 2)
self.clone_btn = QPushButton("拉取代码")
self.clone_btn.clicked.connect(self.clone_git)
git_layout.addWidget(self.clone_btn, 1, 2)
self.list_dir_btn = QPushButton("列出目录")
self.list_dir_btn.clicked.connect(self.list_directory)
git_layout.addWidget(self.list_dir_btn, 2, 2)
self.delete_dir_btn = QPushButton("删除目录")
self.delete_dir_btn.clicked.connect(self.delete_directory)
git_layout.addWidget(self.delete_dir_btn, 3, 2)
self.set_timezone_btn = QPushButton("设置时区并重启")
self.set_timezone_btn.clicked.connect(self.set_timezone_and_restart)
git_layout.addWidget(self.set_timezone_btn, 3, 0, 1, 2)
git_group.setLayout(git_layout)
layout.addWidget(git_group)
# 防火墙管理组
firewall_group = QGroupBox("防火墙管理")
firewall_layout = QGridLayout()
self.check_firewall_btn = QPushButton("检查防火墙")
self.check_firewall_btn.clicked.connect(self.check_firewall)
firewall_layout.addWidget(self.check_firewall_btn, 0, 0)
firewall_layout.addWidget(QLabel("端口:"), 0, 1)
self.port_input = QLineEdit("8000")
firewall_layout.addWidget(self.port_input, 0, 2)
self.open_port_btn = QPushButton("开放端口")
self.open_port_btn.clicked.connect(self.open_port)
firewall_layout.addWidget(self.open_port_btn, 0, 3)
firewall_group.setLayout(firewall_layout)
layout.addWidget(firewall_group)
# Settings.py编辑器
settings_group = QGroupBox("Settings.py编辑器")
settings_layout = QVBoxLayout()
self.settings_editor = QTextEdit()
self.settings_editor.setPlaceholderText("settings.py内容将在这里显示...")
settings_layout.addWidget(self.settings_editor)
settings_group.setLayout(settings_layout)
layout.addWidget(settings_group)
# 操作输出
output_group = QGroupBox("操作输出")
output_layout = QVBoxLayout()
self.output_text = QTextEdit()
self.output_text.setReadOnly(True)
self.output_text.setPlaceholderText("操作结果将在这里显示...")
output_layout.addWidget(self.output_text)
output_group.setLayout(output_layout)
layout.addWidget(output_group)
layout.addStretch()
self.setLayout(layout)
# 加载Git配置
self.load_git_config()
def load_git_config(self):
if self.parent and hasattr(self.parent, 'server_connection_tab'):
git_url = self.parent.server_connection_tab.git_url_input.text()
remote_dir = self.parent.server_connection_tab.remote_dir_input.text()
self.git_url_input.setText(git_url)
self.project_path_input.setText(remote_dir)
def install_git(self):
if not self.check_ssh_connection():
return
self.output_text.append("正在安装Git...")
self.install_git_btn.setEnabled(False)
self.git_install_thread = GitInstallThread(self.parent.ssh_client)
self.git_install_thread.result_ready.connect(self.on_git_install_result)
self.git_install_thread.start()
def on_git_install_result(self, success, message):
self.install_git_btn.setEnabled(True)
if success:
self.output_text.append(f"Git安装成功: {message}")
logger.info(f"Git安装成功: {message}")
else:
self.output_text.append(f"Git安装失败: {message}")
logger.error(f"Git安装失败: {message}")
def clone_git(self):
if not self.check_ssh_connection():
return
git_url = self.git_url_input.text().strip()
project_path = self.project_path_input.text().strip()
if not git_url or not project_path:
QMessageBox.warning(self, "警告", "请填写Git仓库URL和项目路径")
return
self.output_text.append(f"正在克隆 {git_url}{project_path}...")
self.clone_btn.setEnabled(False)
self.git_clone_thread = GitCloneThread(self.parent.ssh_client, git_url, project_path)
self.git_clone_thread.result_ready.connect(self.on_git_clone_result)
self.git_clone_thread.start()
def on_git_clone_result(self, success, message):
self.clone_btn.setEnabled(True)
if success:
self.output_text.append(f"Git克隆成功: {message}")
logger.info(f"Git克隆成功: {message}")
else:
self.output_text.append(f"Git克隆失败: {message}")
logger.error(f"Git克隆失败: {message}")
def list_directory(self):
if not self.check_ssh_connection():
return
path = self.project_path_input.text().strip()
if not path:
QMessageBox.warning(self, "警告", "请输入要列出的目录路径")
return
self.output_text.append(f"正在列出目录 {path}...")
self.list_dir_btn.setEnabled(False)
self.list_dir_thread = ListDirectoryThread(self.parent.ssh_client, path)
self.list_dir_thread.result_ready.connect(self.on_list_directory_result)
self.list_dir_thread.start()
def on_list_directory_result(self, success, message):
self.list_dir_btn.setEnabled(True)
if success:
self.output_text.append(f"目录列表:\n{message}")
logger.info(f"目录列表成功")
else:
self.output_text.append(f"列出目录失败: {message}")
logger.error(f"列出目录失败: {message}")
def delete_directory(self):
if not self.check_ssh_connection():
return
path = self.delete_dir_input.text().strip()
if not path:
QMessageBox.warning(self, "警告", "请输入要删除的目录路径")
return
reply = QMessageBox.question(self, "确认删除",
f"确定要删除目录 {path} 吗?此操作不可撤销!",
QMessageBox.Yes | QMessageBox.No)
if reply == QMessageBox.No:
return
self.output_text.append(f"正在删除目录 {path}...")
self.delete_dir_btn.setEnabled(False)
self.delete_dir_thread = DeleteDirectoryThread(self.parent.ssh_client, path)
self.delete_dir_thread.result_ready.connect(self.on_delete_directory_result)
self.delete_dir_thread.start()
def on_delete_directory_result(self, success, message):
self.delete_dir_btn.setEnabled(True)
if success:
self.output_text.append(f"目录删除成功: {message}")
logger.info(f"目录删除成功: {message}")
else:
self.output_text.append(f"目录删除失败: {message}")
logger.error(f"目录删除失败: {message}")
def set_timezone_and_restart(self):
if not self.check_ssh_connection():
return
self.output_text.append("正在设置时区为Asia/Shanghai并重启服务器...")
self.set_timezone_btn.setEnabled(False)
self.timezone_thread = SetTimezoneAndRestartThread(self.parent.ssh_client)
self.timezone_thread.result_ready.connect(self.on_set_timezone_and_restart_result)
self.timezone_thread.start()
def on_set_timezone_and_restart_result(self, success, message):
self.set_timezone_btn.setEnabled(True)
if success:
self.output_text.append(f"时区设置成功: {message}")
logger.info(f"时区设置成功: {message}")
else:
self.output_text.append(f"时区设置失败: {message}")
logger.error(f"时区设置失败: {message}")
def check_firewall(self):
if not self.check_ssh_connection():
return
self.output_text.append("正在检查防火墙状态...")
self.check_firewall_btn.setEnabled(False)
self.firewall_thread = CheckFirewallThread(self.parent.ssh_client)
self.firewall_thread.result_ready.connect(self.on_check_firewall_result)
self.firewall_thread.start()
def on_check_firewall_result(self, success, message):
self.check_firewall_btn.setEnabled(True)
if success:
self.output_text.append(f"防火墙状态:\n{message}")
logger.info(f"防火墙状态检查成功")
else:
self.output_text.append(f"防火墙状态检查失败: {message}")
logger.error(f"防火墙状态检查失败: {message}")
def open_port(self):
if not self.check_ssh_connection():
return
port = self.port_input.text().strip()
if not port:
QMessageBox.warning(self, "警告", "请输入要开放的端口号")
return
self.output_text.append(f"正在开放端口 {port}...")
self.open_port_btn.setEnabled(False)
self.open_port_thread = OpenPortThread(self.parent.ssh_client, port)
self.open_port_thread.result_ready.connect(self.on_open_port_result)
self.open_port_thread.start()
def on_open_port_result(self, success, message):
self.open_port_btn.setEnabled(True)
if success:
self.output_text.append(f"端口开放成功: {message}")
logger.info(f"端口开放成功: {message}")
else:
self.output_text.append(f"端口开放失败: {message}")
logger.error(f"端口开放失败: {message}")
def check_ssh_connection(self):
if not self.parent or not self.parent.ssh_client:
QMessageBox.warning(self, "警告", "请先连接服务器")
return False
return True