Files
django.remote/remote_commands_tab.py

800 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from PySide6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QLineEdit, QPushButton, QGroupBox, QTextEdit,
QMessageBox, QFormLayout, QDialog, QDialogButtonBox)
from PySide6.QtCore import Qt, QThread, Signal
from loguru import logger
class PasswordDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("输入密码")
self.setMinimumWidth(300)
layout = QVBoxLayout()
# 提示标签
label = QLabel("请输入sudo密码:")
layout.addWidget(label)
# 密码输入框
self.password_input = QLineEdit()
self.password_input.setEchoMode(QLineEdit.Password)
layout.addWidget(self.password_input)
# 按钮
button_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel)
button_box.accepted.connect(self.accept)
button_box.rejected.connect(self.reject)
layout.addWidget(button_box)
self.setLayout(layout)
def get_password(self):
return self.password_input.text()
class RemoteCommandThread(QThread):
output_signal = Signal(str)
finished_signal = Signal(bool, str)
password_request_signal = Signal() # 请求密码的信号
def __init__(self, ssh_client, command):
super().__init__()
self.ssh_client = ssh_client
self.command = command
self.password = None
self.waiting_for_password = False
def set_password(self, password):
self.password = password
self.waiting_for_password = False
def run(self):
try:
logger.info(f"执行远程命令: {self.command}")
# 检查SSH连接是否仍然有效
try:
# 尝试执行一个简单的命令来检查连接
transport = self.ssh_client.get_transport() if self.ssh_client else None
if not transport or not transport.is_active():
raise Exception("SSH连接已断开")
except Exception as e:
logger.error(f"SSH连接检查失败: {str(e)}")
self.output_signal.emit(f"错误: SSH连接已断开请重新连接服务器")
self.finished_signal.emit(False, f"SSH连接已断开请重新连接服务器")
return
# 在执行命令前,先输出当前目录
try:
pwd_stdin, pwd_stdout, pwd_stderr = self.ssh_client.exec_command("pwd")
pwd_output = pwd_stdout.read().decode().strip()
if pwd_output:
self.output_signal.emit(f"当前目录: {pwd_output}")
logger.info(f"当前目录: {pwd_output}")
except Exception as e:
logger.error(f"获取当前目录失败: {str(e)}")
# 如果命令包含sudo修改为使用-S选项从标准输入读取密码
if "sudo" in self.command:
command_with_sudo = self.command.replace("sudo", "sudo -S")
stdin, stdout, stderr = self.ssh_client.exec_command(command_with_sudo)
# 如果预先设置了密码,直接发送
if self.password:
logger.info("使用预先设置的密码")
stdin.write(self.password + "\n")
stdin.flush()
else:
# 检查是否需要密码
password_prompt = False
for line in stderr:
self.output_signal.emit(line)
if "password for" in line.lower() or "密码" in line:
password_prompt = True
break
# 如果需要密码,请求用户输入
if password_prompt:
self.waiting_for_password = True
self.password_request_signal.emit()
# 等待密码输入
while self.waiting_for_password:
self.msleep(100)
# 发送密码
if self.password:
stdin.write(self.password + "\n")
stdin.flush()
else:
stdin, stdout, stderr = self.ssh_client.exec_command(self.command)
# 读取输出
output = ""
for line in stdout:
output += line
self.output_signal.emit(line)
# 读取错误
error = ""
for line in stderr:
error += line
if "password for" not in line.lower() and "密码" not in line: # 避免重复显示密码提示
self.output_signal.emit(f"错误: {line}")
# 检查退出状态
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
logger.info(f"命令执行成功: {self.command}")
self.finished_signal.emit(True, "命令执行成功")
else:
logger.error(f"命令执行失败,退出状态: {exit_status}")
self.finished_signal.emit(False, f"命令执行失败,退出状态: {exit_status}")
except Exception as e:
logger.error(f"执行命令时发生错误: {str(e)}")
error_msg = str(e)
# 检查是否是Socket关闭错误
if "Socket is closed" in error_msg or "SSH session not active" in error_msg:
self.output_signal.emit(f"错误: SSH连接已断开请重新连接服务器")
self.finished_signal.emit(False, "SSH连接已断开请重新连接服务器")
else:
self.output_signal.emit(f"错误: {error_msg}")
self.finished_signal.emit(False, f"执行命令时发生错误: {error_msg}")
class RemoteCommandsTab(QWidget):
def __init__(self):
super().__init__()
logger.info("初始化远程命令标签页")
self.ssh_client = None
self.command_thread = None
self.init_ui()
def init_ui(self):
# 主布局
main_layout = QVBoxLayout()
# Git操作组
git_group = QGroupBox("Git操作")
git_layout = QVBoxLayout()
# 安装Git按钮
install_git_layout = QHBoxLayout()
self.install_git_button = QPushButton("安装Git")
self.install_git_button.clicked.connect(self.install_git)
install_git_layout.addWidget(self.install_git_button)
install_git_layout.addStretch()
git_layout.addLayout(install_git_layout)
# 克隆项目
clone_layout = QHBoxLayout()
# 左侧仓库URL和目录信息
left_layout = QFormLayout()
self.repo_url_input = QLineEdit()
left_layout.addRow("仓库URL:", self.repo_url_input)
self.remote_dir_display = QLineEdit()
self.remote_dir_display.setReadOnly(True)
left_layout.addRow("远程目录:", self.remote_dir_display)
clone_layout.addLayout(left_layout)
# 右侧:按钮
right_layout = QVBoxLayout()
self.clone_button = QPushButton("克隆项目")
self.clone_button.clicked.connect(self.clone_repository)
right_layout.addWidget(self.clone_button)
self.pull_button = QPushButton("拉取更新")
self.pull_button.clicked.connect(self.pull_repository)
right_layout.addWidget(self.pull_button)
self.handle_changes_button = QPushButton("处理本地更改")
self.handle_changes_button.clicked.connect(self.handle_local_changes)
right_layout.addWidget(self.handle_changes_button)
right_layout.addStretch()
clone_layout.addLayout(right_layout)
git_layout.addLayout(clone_layout)
git_group.setLayout(git_layout)
main_layout.addWidget(git_group)
# 系统管理组
system_group = QGroupBox("系统管理")
system_layout = QVBoxLayout()
# 自定义命令执行区域
custom_command_layout = QVBoxLayout()
custom_command_layout.addWidget(QLabel("自定义命令:"))
# 命令输入框
self.custom_command_input = QTextEdit()
self.custom_command_input.setMaximumHeight(100)
custom_command_layout.addWidget(self.custom_command_input)
# 执行按钮
execute_button_layout = QHBoxLayout()
self.execute_command_button = QPushButton("执行命令")
self.execute_command_button.clicked.connect(self.execute_custom_command)
execute_button_layout.addWidget(self.execute_command_button)
execute_button_layout.addStretch()
custom_command_layout.addLayout(execute_button_layout)
system_layout.addLayout(custom_command_layout)
system_group.setLayout(system_layout)
main_layout.addWidget(system_group)
# 目录管理组
dir_group = QGroupBox("目录管理")
dir_layout = QVBoxLayout()
# 当前目录显示
current_dir_layout = QHBoxLayout()
current_dir_layout.addWidget(QLabel("当前目录:"))
self.current_dir_display = QLineEdit()
# 允许用户输入当前目录路径
# self.current_dir_display.setReadOnly(True)
# 添加回车键刷新目录功能
self.current_dir_display.returnPressed.connect(self.on_current_dir_entered)
current_dir_layout.addWidget(self.current_dir_display)
# 刷新目录按钮
self.refresh_dir_button = QPushButton("刷新目录")
self.refresh_dir_button.clicked.connect(self.refresh_directory)
current_dir_layout.addWidget(self.refresh_dir_button)
dir_layout.addLayout(current_dir_layout)
# 目录列表
dir_list_layout = QHBoxLayout()
dir_list_layout.addWidget(QLabel("目录内容:"))
self.dir_list_text = QTextEdit()
self.dir_list_text.setReadOnly(True)
self.dir_list_text.setMaximumHeight(150)
dir_list_layout.addWidget(self.dir_list_text)
dir_layout.addLayout(dir_list_layout)
# 删除目录
delete_dir_layout = QHBoxLayout()
delete_dir_layout.addWidget(QLabel("删除目录:"))
self.delete_dir_input = QLineEdit()
delete_dir_layout.addWidget(self.delete_dir_input)
self.delete_dir_button = QPushButton("删除目录")
self.delete_dir_button.clicked.connect(self.delete_directory)
delete_dir_layout.addWidget(self.delete_dir_button)
dir_layout.addLayout(delete_dir_layout)
dir_group.setLayout(dir_layout)
main_layout.addWidget(dir_group)
# 命令输出区域
output_group = QGroupBox("命令输出")
output_layout = QVBoxLayout()
self.output_text = QTextEdit()
self.output_text.setReadOnly(True)
output_layout.addWidget(self.output_text)
output_group.setLayout(output_layout)
main_layout.addWidget(output_group)
# 状态栏
self.status_label = QLabel("就绪")
self.status_label.setAlignment(Qt.AlignCenter)
main_layout.addWidget(self.status_label)
# 添加伸缩空间
main_layout.addStretch()
self.setLayout(main_layout)
logger.info("远程命令标签页UI初始化完成")
def set_server_info(self, server_info):
"""设置服务器信息"""
logger.info(f"设置服务器信息: {server_info}")
self.server_info = server_info
# 更新主窗口状态栏显示当前目录
try:
main_window = self.parent().parent()
if hasattr(main_window, 'status_bar') and self.ssh_client:
current_dir = self.current_dir_display.text().strip()
server_host = self.server_info.get('host', '未知')
if current_dir:
main_window.status_bar.showMessage(f"远程服务器 {server_host}: {current_dir}")
logger.info(f"主窗口状态栏更新为远程服务器目录: {server_host}: {current_dir}")
else:
main_window.status_bar.showMessage(f"远程服务器: {server_host}")
logger.info(f"主窗口状态栏更新为远程服务器: {server_host}")
except Exception as e:
logger.error(f"更新主窗口状态栏失败: {str(e)}")
def set_ssh_client(self, ssh_client):
logger.info("设置SSH客户端")
self.ssh_client = ssh_client
if self.ssh_client:
self.status_label.setText("已连接到服务器")
self.status_label.setStyleSheet("color: green;")
# 更新主窗口状态栏显示当前目录
try:
main_window = self.parent().parent()
if hasattr(main_window, 'status_bar'):
current_dir = self.current_dir_display.text().strip()
server_host = self.server_info.get('host', '未知') if hasattr(self, 'server_info') else '未知'
if current_dir:
main_window.status_bar.showMessage(f"远程服务器 {server_host}: {current_dir}")
logger.info(f"主窗口状态栏更新为远程服务器目录: {server_host}: {current_dir}")
else:
main_window.status_bar.showMessage(f"远程服务器: {server_host}")
logger.info(f"主窗口状态栏更新为远程服务器: {server_host}")
except Exception as e:
logger.error(f"更新主窗口状态栏失败: {str(e)}")
else:
self.status_label.setText("未连接到服务器")
self.status_label.setStyleSheet("color: red;")
# 更新主窗口状态栏显示未连接状态
try:
main_window = self.parent().parent()
if hasattr(main_window, 'status_bar'):
main_window.status_bar.showMessage("未连接到远程服务器")
logger.info("主窗口状态栏更新为未连接状态")
except Exception as e:
logger.error(f"更新主窗口状态栏失败: {str(e)}")
def set_server_config(self, git_url, remote_dir):
logger.info(f"设置服务器配置: git_url={git_url}, remote_dir={remote_dir}")
self.repo_url_input.setText(git_url)
self.remote_dir_display.setText(remote_dir)
# 如果设置了远程目录,也设置为当前目录
if remote_dir:
self.current_dir_display.setText(remote_dir)
self.refresh_directory()
# 更新主窗口状态栏显示当前目录
try:
main_window = self.parent().parent()
if hasattr(main_window, 'status_bar'):
server_host = self.server_info['host'] if hasattr(self, 'server_info') and 'host' in self.server_info else '未知'
main_window.status_bar.showMessage(f"远程服务器 {server_host}: {remote_dir}")
logger.info(f"主窗口状态栏更新为远程服务器目录: {server_host}: {remote_dir}")
except Exception as e:
logger.error(f"更新主窗口状态栏失败: {str(e)}")
def install_git(self):
logger.info("安装Git")
if not self.ssh_client:
QMessageBox.warning(self, "警告", "请先连接到服务器")
return
self.output_text.clear()
self.status_label.setText("正在安装Git...")
# 创建并启动线程执行命令
self.command_thread = RemoteCommandThread(self.ssh_client, "sudo apt update && sudo apt install -y git")
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(self.on_command_finished)
self.command_thread.password_request_signal.connect(self.request_password)
self.command_thread.start()
def request_password(self):
logger.info("请求输入sudo密码")
# 创建密码输入对话框
dialog = PasswordDialog(self)
if dialog.exec() == QDialog.Accepted:
password = dialog.get_password()
if password:
self.command_thread.set_password(password)
self.output_text.append("密码已发送\n")
else:
self.output_text.append("已取消输入密码\n")
self.command_thread.set_password("")
self.command_thread.waiting_for_password = False
def clone_repository(self):
logger.info("克隆仓库")
if not self.ssh_client:
QMessageBox.warning(self, "警告", "请先连接到服务器")
return
repo_url = self.repo_url_input.text().strip()
if not repo_url:
QMessageBox.warning(self, "警告", "请输入仓库URL")
return
remote_dir = self.remote_dir_display.text().strip()
self.output_text.clear()
self.status_label.setText("正在克隆仓库...")
# 构建克隆命令
if remote_dir:
# 如果指定了远程目录,先创建目录(如果不存在),然后克隆到指定目录
clone_command = f"mkdir -p {remote_dir} && cd {remote_dir} && git clone --verbose {repo_url}"
else:
clone_command = f"git clone --verbose {repo_url}"
# 创建并启动线程执行命令
self.command_thread = RemoteCommandThread(self.ssh_client, clone_command)
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(self.on_command_finished)
self.command_thread.start()
def pull_repository(self):
logger.info("拉取仓库更新")
if not self.ssh_client:
QMessageBox.warning(self, "警告", "请先连接到服务器")
return
remote_dir = self.remote_dir_display.text().strip()
if not remote_dir:
QMessageBox.warning(self, "警告", "请先设置远程目录")
return
# 从仓库URL中提取项目名称
repo_url = self.repo_url_input.text().strip()
if not repo_url:
QMessageBox.warning(self, "警告", "请输入仓库URL")
return
# 从URL中提取项目名例如http://example.com/repo.git -> repo
project_name = repo_url.split('/')[-1].replace('.git', '')
project_path = f"{remote_dir}/{project_name}"
self.output_text.clear()
self.status_label.setText("正在拉取仓库更新...")
# 构建拉取命令
pull_command = f"cd {project_path} && git pull --verbose"
# 创建并启动线程执行命令
self.command_thread = RemoteCommandThread(self.ssh_client, pull_command)
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(self.on_command_finished)
self.command_thread.start()
def handle_local_changes(self):
logger.info("处理本地更改")
if not self.ssh_client:
QMessageBox.warning(self, "警告", "请先连接到服务器")
return
remote_dir = self.remote_dir_display.text().strip()
if not remote_dir:
QMessageBox.warning(self, "警告", "请先设置远程目录")
return
# 从仓库URL中提取项目名称
repo_url = self.repo_url_input.text().strip()
if not repo_url:
QMessageBox.warning(self, "警告", "请输入仓库URL")
return
# 从URL中提取项目名例如http://example.com/repo.git -> repo
project_name = repo_url.split('/')[-1].replace('.git', '')
project_path = f"{remote_dir}/{project_name}"
self.output_text.clear()
self.status_label.setText("正在检查本地更改...")
# 创建对话框询问用户如何处理本地更改
reply = QMessageBox.question(
self,
"处理本地更改",
"检测到本地有未提交的更改,请选择处理方式:\n\n"
"是(Y) - 暂存更改(stash),拉取更新后再恢复\n"
"否(N) - 放弃本地更改,直接拉取更新\n"
"取消 - 取消操作",
QMessageBox.Yes | QMessageBox.No | QMessageBox.Cancel,
QMessageBox.Cancel
)
if reply == QMessageBox.Yes:
# 暂存更改
self.status_label.setText("正在暂存更改...")
stash_command = f"cd {project_path} && git stash push -m 'Auto-stash before pull'"
self.command_thread = RemoteCommandThread(self.ssh_client, stash_command)
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(lambda success, msg: self.on_stash_finished(success, msg, project_path))
self.command_thread.start()
elif reply == QMessageBox.No:
# 放弃本地更改,强制拉取
self.status_label.setText("正在重置本地更改...")
reset_command = f"cd {project_path} && git reset --hard HEAD && git clean -fd"
self.command_thread = RemoteCommandThread(self.ssh_client, reset_command)
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(lambda success, msg: self.on_reset_finished(success, msg, project_path))
self.command_thread.start()
# 如果选择取消,不做任何操作
def on_stash_finished(self, success, message, project_path):
"""暂存操作完成后的处理"""
if success:
self.append_output("暂存更改成功,正在拉取更新...")
# 拉取更新
pull_command = f"cd {project_path} && git pull --verbose"
self.command_thread = RemoteCommandThread(self.ssh_client, pull_command)
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(lambda success, msg: self.on_pull_after_stash_finished(success, msg, project_path))
self.command_thread.start()
else:
self.status_label.setText("暂存更改失败")
self.status_label.setStyleSheet("color: red;")
self.append_output(f"暂存更改失败: {message}")
def on_pull_after_stash_finished(self, success, message, project_path):
"""暂存后拉取更新完成后的处理"""
if success:
self.append_output("拉取更新成功,正在恢复暂存的更改...")
# 恢复暂存的更改
pop_command = f"cd {project_path} && git stash pop"
self.command_thread = RemoteCommandThread(self.ssh_client, pop_command)
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(self.on_command_finished)
self.command_thread.start()
else:
self.status_label.setText("拉取更新失败")
self.status_label.setStyleSheet("color: red;")
self.append_output(f"拉取更新失败: {message}")
def on_reset_finished(self, success, message, project_path):
"""重置操作完成后的处理"""
if success:
self.append_output("重置本地更改成功,正在拉取更新...")
# 拉取更新
pull_command = f"cd {project_path} && git pull --verbose"
self.command_thread = RemoteCommandThread(self.ssh_client, pull_command)
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(self.on_command_finished)
self.command_thread.start()
else:
self.status_label.setText("重置本地更改失败")
self.status_label.setStyleSheet("color: red;")
self.append_output(f"重置本地更改失败: {message}")
def append_output(self, text):
self.output_text.append(text)
def on_command_finished(self, success, message):
if success:
self.status_label.setText(message)
self.status_label.setStyleSheet("color: green;")
QMessageBox.information(self, "成功", message)
# 如果是目录相关操作,刷新目录列表
if "删除" in message or "目录" in message:
self.refresh_directory()
else:
self.status_label.setText(message)
self.status_label.setStyleSheet("color: red;")
# 检查是否是SSH连接断开错误
if "SSH连接已断开" in message:
reply = QMessageBox.question(self, "SSH连接已断开",
f"{message}\n\n是否现在重新连接服务器?",
QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes)
if reply == QMessageBox.Yes:
self.reconnect_ssh()
else:
QMessageBox.warning(self, "错误", message)
def on_current_dir_entered(self):
"""处理用户输入目录路径并按回车键的情况"""
logger.info("用户输入目录路径并按回车键")
# 更新主窗口状态栏显示当前目录
try:
main_window = self.parent().parent()
if hasattr(main_window, 'status_bar'):
current_dir = self.current_dir_display.text().strip()
server_host = self.server_info['host'] if hasattr(self, 'server_info') and 'host' in self.server_info else '未知'
if current_dir:
main_window.status_bar.showMessage(f"远程服务器 {server_host}: {current_dir}")
logger.info(f"主窗口状态栏更新为远程服务器目录: {server_host}: {current_dir}")
except Exception as e:
logger.error(f"更新主窗口状态栏失败: {str(e)}")
# 刷新目录列表
self.refresh_directory()
def refresh_directory(self):
logger.info("刷新目录列表")
if not self.ssh_client:
QMessageBox.warning(self, "警告", "请先连接到服务器")
return
current_dir = self.current_dir_display.text().strip()
if not current_dir:
current_dir = "~" # 默认用户主目录
self.current_dir_display.setText(current_dir)
logger.info(f"使用默认目录: {current_dir}")
else:
logger.info(f"使用用户输入目录: {current_dir}")
self.status_label.setText("正在刷新目录列表...")
# 创建并启动线程执行命令
command = f"cd {current_dir} && pwd && ls -la"
self.command_thread = RemoteCommandThread(self.ssh_client, command)
self.command_thread.output_signal.connect(self.append_dir_output)
self.command_thread.finished_signal.connect(self.on_dir_refresh_finished)
self.command_thread.start()
def append_dir_output(self, text):
self.dir_list_text.append(text)
# 将目录信息输出到日志文件
logger.info(f"目录列表信息: {text.strip()}")
def on_dir_refresh_finished(self, success, message):
if success:
self.status_label.setText("目录列表已刷新")
self.status_label.setStyleSheet("color: green;")
logger.info("目录列表刷新成功")
# 更新主窗口状态栏显示当前目录
try:
main_window = self.parent().parent()
if hasattr(main_window, 'status_bar'):
current_dir = self.current_dir_display.text()
server_host = self.server_info['host'] if hasattr(self, 'server_info') and 'host' in self.server_info else '未知'
main_window.status_bar.showMessage(f"远程服务器 {server_host}: {current_dir}")
logger.info(f"主窗口状态栏更新为远程服务器目录: {server_host}: {current_dir}")
except Exception as e:
logger.error(f"更新主窗口状态栏失败: {str(e)}")
else:
self.status_label.setText("刷新目录列表失败")
self.status_label.setStyleSheet("color: red;")
logger.error(f"刷新目录列表失败: {message}")
QMessageBox.warning(self, "错误", f"刷新目录列表失败: {message}")
def execute_custom_command(self):
logger.info("执行自定义命令")
if not self.ssh_client:
QMessageBox.warning(self, "警告", "请先连接到服务器")
return
command = self.custom_command_input.toPlainText().strip()
if not command:
QMessageBox.warning(self, "警告", "请输入要执行的命令")
return
self.output_text.clear()
self.status_label.setText("正在执行命令...")
# 如果命令以sudo开头预先请求密码
if command.startswith("sudo "):
logger.info("检测到sudo命令预先请求密码")
# 创建密码输入对话框
dialog = PasswordDialog(self)
if dialog.exec() == QDialog.Accepted:
password = dialog.get_password()
if not password:
QMessageBox.warning(self, "警告", "未输入密码,取消执行命令")
return
else:
QMessageBox.warning(self, "警告", "已取消输入密码")
return
# 创建并启动线程执行命令
self.command_thread = RemoteCommandThread(self.ssh_client, command)
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(self.on_custom_command_finished)
self.command_thread.password_request_signal.connect(self.request_password)
# 如果是sudo命令且已获取密码预先设置密码
if command.startswith("sudo ") and password:
self.command_thread.set_password(password)
self.output_text.append("密码已设置\n")
self.command_thread.start()
def on_custom_command_finished(self, success, message):
if success:
self.status_label.setText("命令执行成功")
self.status_label.setStyleSheet("color: green;")
self.output_text.append(f"\n=== 命令执行成功 ===\n{message}")
else:
self.status_label.setText("命令执行失败")
self.status_label.setStyleSheet("color: red;")
self.output_text.append(f"\n=== 命令执行失败 ===\n{message}")
def delete_directory(self):
logger.info("删除目录")
if not self.ssh_client:
QMessageBox.warning(self, "警告", "请先连接到服务器")
return
dir_to_delete = self.delete_dir_input.text().strip()
if not dir_to_delete:
QMessageBox.warning(self, "警告", "请输入要删除的目录名")
return
current_dir = self.current_dir_display.text().strip()
if not current_dir:
current_dir = "~"
# 确认删除
reply = QMessageBox.question(self, "确认删除",
f"确定要删除目录 '{current_dir}/{dir_to_delete}' 及其所有内容吗?\n此操作不可撤销!",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.No:
return
self.output_text.clear()
self.status_label.setText("正在删除目录...")
# 创建并启动线程执行命令
command = f"cd {current_dir} && rm -rf {dir_to_delete}"
self.command_thread = RemoteCommandThread(self.ssh_client, command)
self.command_thread.output_signal.connect(self.append_output)
self.command_thread.finished_signal.connect(self.on_command_finished)
self.command_thread.start()
def check_ssh_connection(self):
"""检查SSH连接是否有效"""
if not self.ssh_client:
return False
try:
transport = self.ssh_client.get_transport()
return transport and transport.is_active()
except Exception as e:
logger.error(f"检查SSH连接时发生错误: {str(e)}")
return False
def reconnect_ssh(self):
"""重新连接SSH服务器"""
logger.info("尝试重新连接SSH服务器")
# 关闭现有连接
if self.ssh_client:
try:
self.ssh_client.close()
except:
pass
self.ssh_client = None
# 切换到服务器连接标签页
main_window = self.parent().parent()
if hasattr(main_window, 'tabs'):
main_window.tabs.setCurrentIndex(0) # 切换到服务器连接标签页
# 显示提示信息
self.status_label.setText("请重新连接服务器")
self.status_label.setStyleSheet("color: orange;")
self.output_text.append("\n=== SSH连接已断开 ===\n")
self.output_text.append("请切换到\"服务器连接\"标签页重新连接服务器\n")
# 显示重新连接的对话框
reply = QMessageBox.question(self, "SSH连接已断开",
"SSH连接已断开是否现在重新连接",
QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes)
if reply == QMessageBox.Yes:
# 模拟点击连接按钮
server_connection_tab = main_window.server_connection_tab
server_connection_tab.connect_to_server()