399 lines
15 KiB
Python
399 lines
15 KiB
Python
from PySide6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
|
||
QLineEdit, QPushButton, QGroupBox, QTextEdit,
|
||
QMessageBox, QFormLayout, QDialog, QDialogButtonBox)
|
||
from PySide6.QtCore import Qt, QThread, Signal
|
||
from loguru import logger
|
||
|
||
class PasswordDialog(QDialog):
|
||
def __init__(self, parent=None):
|
||
super().__init__(parent)
|
||
self.setWindowTitle("输入密码")
|
||
self.setMinimumWidth(300)
|
||
|
||
layout = QVBoxLayout()
|
||
|
||
# 提示标签
|
||
label = QLabel("请输入sudo密码:")
|
||
layout.addWidget(label)
|
||
|
||
# 密码输入框
|
||
self.password_input = QLineEdit()
|
||
self.password_input.setEchoMode(QLineEdit.Password)
|
||
layout.addWidget(self.password_input)
|
||
|
||
# 按钮
|
||
button_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel)
|
||
button_box.accepted.connect(self.accept)
|
||
button_box.rejected.connect(self.reject)
|
||
layout.addWidget(button_box)
|
||
|
||
self.setLayout(layout)
|
||
|
||
def get_password(self):
|
||
return self.password_input.text()
|
||
|
||
class RemoteCommandThread(QThread):
|
||
output_signal = Signal(str)
|
||
finished_signal = Signal(bool, str)
|
||
password_request_signal = Signal() # 请求密码的信号
|
||
|
||
def __init__(self, ssh_client, command):
|
||
super().__init__()
|
||
self.ssh_client = ssh_client
|
||
self.command = command
|
||
self.password = None
|
||
self.waiting_for_password = False
|
||
|
||
def set_password(self, password):
|
||
self.password = password
|
||
self.waiting_for_password = False
|
||
|
||
def run(self):
|
||
try:
|
||
logger.info(f"执行远程命令: {self.command}")
|
||
|
||
# 如果命令包含sudo,修改为使用-S选项从标准输入读取密码
|
||
if "sudo" in self.command:
|
||
command_with_sudo = self.command.replace("sudo", "sudo -S")
|
||
stdin, stdout, stderr = self.ssh_client.exec_command(command_with_sudo)
|
||
|
||
# 检查是否需要密码
|
||
password_prompt = False
|
||
for line in stderr:
|
||
self.output_signal.emit(line)
|
||
if "password for" in line.lower() or "密码" in line:
|
||
password_prompt = True
|
||
break
|
||
|
||
# 如果需要密码,请求用户输入
|
||
if password_prompt:
|
||
self.waiting_for_password = True
|
||
self.password_request_signal.emit()
|
||
|
||
# 等待密码输入
|
||
while self.waiting_for_password:
|
||
self.msleep(100)
|
||
|
||
# 发送密码
|
||
if self.password:
|
||
stdin.write(self.password + "\n")
|
||
stdin.flush()
|
||
else:
|
||
stdin, stdout, stderr = self.ssh_client.exec_command(self.command)
|
||
|
||
# 读取输出
|
||
output = ""
|
||
for line in stdout:
|
||
output += line
|
||
self.output_signal.emit(line)
|
||
|
||
# 读取错误
|
||
error = ""
|
||
for line in stderr:
|
||
error += line
|
||
if "password for" not in line.lower() and "密码" not in line: # 避免重复显示密码提示
|
||
self.output_signal.emit(f"错误: {line}")
|
||
|
||
# 检查退出状态
|
||
exit_status = stdout.channel.recv_exit_status()
|
||
|
||
if exit_status == 0:
|
||
logger.info(f"命令执行成功: {self.command}")
|
||
self.finished_signal.emit(True, "命令执行成功")
|
||
else:
|
||
logger.error(f"命令执行失败,退出状态: {exit_status}")
|
||
self.finished_signal.emit(False, f"命令执行失败,退出状态: {exit_status}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"执行命令时发生错误: {str(e)}")
|
||
self.output_signal.emit(f"错误: {str(e)}")
|
||
self.finished_signal.emit(False, f"执行命令时发生错误: {str(e)}")
|
||
|
||
class RemoteCommandsTab(QWidget):
|
||
def __init__(self):
|
||
super().__init__()
|
||
|
||
logger.info("初始化远程命令标签页")
|
||
self.ssh_client = None
|
||
self.command_thread = None
|
||
self.init_ui()
|
||
|
||
def init_ui(self):
|
||
# 主布局
|
||
main_layout = QVBoxLayout()
|
||
|
||
# Git操作组
|
||
git_group = QGroupBox("Git操作")
|
||
git_layout = QVBoxLayout()
|
||
|
||
# 安装Git按钮
|
||
install_git_layout = QHBoxLayout()
|
||
self.install_git_button = QPushButton("安装Git")
|
||
self.install_git_button.clicked.connect(self.install_git)
|
||
install_git_layout.addWidget(self.install_git_button)
|
||
install_git_layout.addStretch()
|
||
git_layout.addLayout(install_git_layout)
|
||
|
||
# 克隆项目
|
||
clone_layout = QHBoxLayout()
|
||
|
||
# 左侧:仓库URL和目录信息
|
||
left_layout = QFormLayout()
|
||
self.repo_url_input = QLineEdit()
|
||
left_layout.addRow("仓库URL:", self.repo_url_input)
|
||
|
||
self.remote_dir_display = QLineEdit()
|
||
self.remote_dir_display.setReadOnly(True)
|
||
left_layout.addRow("远程目录:", self.remote_dir_display)
|
||
|
||
clone_layout.addLayout(left_layout)
|
||
|
||
# 右侧:按钮
|
||
right_layout = QVBoxLayout()
|
||
self.clone_button = QPushButton("克隆项目")
|
||
self.clone_button.clicked.connect(self.clone_repository)
|
||
right_layout.addWidget(self.clone_button)
|
||
right_layout.addStretch()
|
||
|
||
clone_layout.addLayout(right_layout)
|
||
|
||
git_layout.addLayout(clone_layout)
|
||
git_group.setLayout(git_layout)
|
||
main_layout.addWidget(git_group)
|
||
|
||
# 目录管理组
|
||
dir_group = QGroupBox("目录管理")
|
||
dir_layout = QVBoxLayout()
|
||
|
||
# 当前目录显示
|
||
current_dir_layout = QHBoxLayout()
|
||
current_dir_layout.addWidget(QLabel("当前目录:"))
|
||
self.current_dir_display = QLineEdit()
|
||
# 允许用户输入当前目录路径
|
||
# self.current_dir_display.setReadOnly(True)
|
||
# 添加回车键刷新目录功能
|
||
self.current_dir_display.returnPressed.connect(self.refresh_directory)
|
||
current_dir_layout.addWidget(self.current_dir_display)
|
||
|
||
# 刷新目录按钮
|
||
self.refresh_dir_button = QPushButton("刷新目录")
|
||
self.refresh_dir_button.clicked.connect(self.refresh_directory)
|
||
current_dir_layout.addWidget(self.refresh_dir_button)
|
||
|
||
dir_layout.addLayout(current_dir_layout)
|
||
|
||
# 目录列表
|
||
dir_list_layout = QHBoxLayout()
|
||
dir_list_layout.addWidget(QLabel("目录内容:"))
|
||
self.dir_list_text = QTextEdit()
|
||
self.dir_list_text.setReadOnly(True)
|
||
self.dir_list_text.setMaximumHeight(150)
|
||
dir_list_layout.addWidget(self.dir_list_text)
|
||
dir_layout.addLayout(dir_list_layout)
|
||
|
||
# 删除目录
|
||
delete_dir_layout = QHBoxLayout()
|
||
delete_dir_layout.addWidget(QLabel("删除目录:"))
|
||
self.delete_dir_input = QLineEdit()
|
||
delete_dir_layout.addWidget(self.delete_dir_input)
|
||
|
||
self.delete_dir_button = QPushButton("删除目录")
|
||
self.delete_dir_button.clicked.connect(self.delete_directory)
|
||
delete_dir_layout.addWidget(self.delete_dir_button)
|
||
|
||
dir_layout.addLayout(delete_dir_layout)
|
||
|
||
dir_group.setLayout(dir_layout)
|
||
main_layout.addWidget(dir_group)
|
||
|
||
# 命令输出区域
|
||
output_group = QGroupBox("命令输出")
|
||
output_layout = QVBoxLayout()
|
||
|
||
self.output_text = QTextEdit()
|
||
self.output_text.setReadOnly(True)
|
||
output_layout.addWidget(self.output_text)
|
||
|
||
output_group.setLayout(output_layout)
|
||
main_layout.addWidget(output_group)
|
||
|
||
# 状态栏
|
||
self.status_label = QLabel("就绪")
|
||
self.status_label.setAlignment(Qt.AlignCenter)
|
||
main_layout.addWidget(self.status_label)
|
||
|
||
# 添加伸缩空间
|
||
main_layout.addStretch()
|
||
|
||
self.setLayout(main_layout)
|
||
logger.info("远程命令标签页UI初始化完成")
|
||
|
||
def set_ssh_client(self, ssh_client):
|
||
logger.info("设置SSH客户端")
|
||
self.ssh_client = ssh_client
|
||
|
||
if self.ssh_client:
|
||
self.status_label.setText("已连接到服务器")
|
||
self.status_label.setStyleSheet("color: green;")
|
||
else:
|
||
self.status_label.setText("未连接到服务器")
|
||
self.status_label.setStyleSheet("color: red;")
|
||
|
||
def set_server_config(self, git_url, remote_dir):
|
||
logger.info(f"设置服务器配置: git_url={git_url}, remote_dir={remote_dir}")
|
||
self.repo_url_input.setText(git_url)
|
||
self.remote_dir_display.setText(remote_dir)
|
||
|
||
# 如果设置了远程目录,也设置为当前目录
|
||
if remote_dir:
|
||
self.current_dir_display.setText(remote_dir)
|
||
self.refresh_directory()
|
||
|
||
def install_git(self):
|
||
logger.info("安装Git")
|
||
|
||
if not self.ssh_client:
|
||
QMessageBox.warning(self, "警告", "请先连接到服务器")
|
||
return
|
||
|
||
self.output_text.clear()
|
||
self.status_label.setText("正在安装Git...")
|
||
|
||
# 创建并启动线程执行命令
|
||
self.command_thread = RemoteCommandThread(self.ssh_client, "sudo apt update && sudo apt install -y git")
|
||
self.command_thread.output_signal.connect(self.append_output)
|
||
self.command_thread.finished_signal.connect(self.on_command_finished)
|
||
self.command_thread.password_request_signal.connect(self.request_password)
|
||
self.command_thread.start()
|
||
|
||
def request_password(self):
|
||
logger.info("请求输入sudo密码")
|
||
|
||
# 创建密码输入对话框
|
||
dialog = PasswordDialog(self)
|
||
if dialog.exec() == QDialog.Accepted:
|
||
password = dialog.get_password()
|
||
if password:
|
||
self.command_thread.set_password(password)
|
||
self.output_text.append("密码已发送\n")
|
||
else:
|
||
self.output_text.append("已取消输入密码\n")
|
||
self.command_thread.set_password("")
|
||
self.command_thread.waiting_for_password = False
|
||
|
||
def clone_repository(self):
|
||
logger.info("克隆仓库")
|
||
|
||
if not self.ssh_client:
|
||
QMessageBox.warning(self, "警告", "请先连接到服务器")
|
||
return
|
||
|
||
repo_url = self.repo_url_input.text().strip()
|
||
if not repo_url:
|
||
QMessageBox.warning(self, "警告", "请输入仓库URL")
|
||
return
|
||
|
||
remote_dir = self.remote_dir_display.text().strip()
|
||
|
||
self.output_text.clear()
|
||
self.status_label.setText("正在克隆仓库...")
|
||
|
||
# 构建克隆命令
|
||
if remote_dir:
|
||
# 如果指定了远程目录,先创建目录(如果不存在),然后克隆到指定目录
|
||
clone_command = f"mkdir -p {remote_dir} && cd {remote_dir} && git clone --verbose {repo_url}"
|
||
else:
|
||
clone_command = f"git clone --verbose {repo_url}"
|
||
|
||
# 创建并启动线程执行命令
|
||
self.command_thread = RemoteCommandThread(self.ssh_client, clone_command)
|
||
self.command_thread.output_signal.connect(self.append_output)
|
||
self.command_thread.finished_signal.connect(self.on_command_finished)
|
||
self.command_thread.start()
|
||
|
||
def append_output(self, text):
|
||
self.output_text.append(text)
|
||
|
||
def on_command_finished(self, success, message):
|
||
if success:
|
||
self.status_label.setText(message)
|
||
self.status_label.setStyleSheet("color: green;")
|
||
QMessageBox.information(self, "成功", message)
|
||
|
||
# 如果是目录相关操作,刷新目录列表
|
||
if "删除" in message or "目录" in message:
|
||
self.refresh_directory()
|
||
else:
|
||
self.status_label.setText(message)
|
||
self.status_label.setStyleSheet("color: red;")
|
||
QMessageBox.warning(self, "错误", message)
|
||
|
||
def refresh_directory(self):
|
||
logger.info("刷新目录列表")
|
||
|
||
if not self.ssh_client:
|
||
QMessageBox.warning(self, "警告", "请先连接到服务器")
|
||
return
|
||
|
||
current_dir = self.current_dir_display.text().strip()
|
||
if not current_dir:
|
||
current_dir = "~" # 默认用户主目录
|
||
self.current_dir_display.setText(current_dir)
|
||
logger.info(f"使用默认目录: {current_dir}")
|
||
else:
|
||
logger.info(f"使用用户输入目录: {current_dir}")
|
||
|
||
self.status_label.setText("正在刷新目录列表...")
|
||
|
||
# 创建并启动线程执行命令
|
||
command = f"cd {current_dir} && pwd && ls -la"
|
||
self.command_thread = RemoteCommandThread(self.ssh_client, command)
|
||
self.command_thread.output_signal.connect(self.append_dir_output)
|
||
self.command_thread.finished_signal.connect(self.on_dir_refresh_finished)
|
||
self.command_thread.start()
|
||
|
||
def append_dir_output(self, text):
|
||
self.dir_list_text.append(text)
|
||
|
||
def on_dir_refresh_finished(self, success, message):
|
||
if success:
|
||
self.status_label.setText("目录列表已刷新")
|
||
self.status_label.setStyleSheet("color: green;")
|
||
else:
|
||
self.status_label.setText("刷新目录列表失败")
|
||
self.status_label.setStyleSheet("color: red;")
|
||
QMessageBox.warning(self, "错误", f"刷新目录列表失败: {message}")
|
||
|
||
def delete_directory(self):
|
||
logger.info("删除目录")
|
||
|
||
if not self.ssh_client:
|
||
QMessageBox.warning(self, "警告", "请先连接到服务器")
|
||
return
|
||
|
||
dir_to_delete = self.delete_dir_input.text().strip()
|
||
if not dir_to_delete:
|
||
QMessageBox.warning(self, "警告", "请输入要删除的目录名")
|
||
return
|
||
|
||
current_dir = self.current_dir_display.text().strip()
|
||
if not current_dir:
|
||
current_dir = "~"
|
||
|
||
# 确认删除
|
||
reply = QMessageBox.question(self, "确认删除",
|
||
f"确定要删除目录 '{current_dir}/{dir_to_delete}' 及其所有内容吗?\n此操作不可撤销!",
|
||
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
|
||
|
||
if reply == QMessageBox.No:
|
||
return
|
||
|
||
self.output_text.clear()
|
||
self.status_label.setText("正在删除目录...")
|
||
|
||
# 创建并启动线程执行命令
|
||
command = f"cd {current_dir} && rm -rf {dir_to_delete}"
|
||
self.command_thread = RemoteCommandThread(self.ssh_client, command)
|
||
self.command_thread.output_signal.connect(self.append_output)
|
||
self.command_thread.finished_signal.connect(self.on_command_finished)
|
||
self.command_thread.start() |