from PySide6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QGroupBox, QTextEdit, QMessageBox, QFormLayout, QDialog, QDialogButtonBox) from PySide6.QtCore import Qt, QThread, Signal from loguru import logger class PasswordDialog(QDialog): def __init__(self, parent=None): super().__init__(parent) self.setWindowTitle("输入密码") self.setMinimumWidth(300) layout = QVBoxLayout() # 提示标签 label = QLabel("请输入sudo密码:") layout.addWidget(label) # 密码输入框 self.password_input = QLineEdit() self.password_input.setEchoMode(QLineEdit.Password) layout.addWidget(self.password_input) # 按钮 button_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel) button_box.accepted.connect(self.accept) button_box.rejected.connect(self.reject) layout.addWidget(button_box) self.setLayout(layout) def get_password(self): return self.password_input.text() class RemoteCommandThread(QThread): output_signal = Signal(str) finished_signal = Signal(bool, str) password_request_signal = Signal() # 请求密码的信号 def __init__(self, ssh_client, command): super().__init__() self.ssh_client = ssh_client self.command = command self.password = None self.waiting_for_password = False def set_password(self, password): self.password = password self.waiting_for_password = False def run(self): try: logger.info(f"执行远程命令: {self.command}") # 如果命令包含sudo,修改为使用-S选项从标准输入读取密码 if "sudo" in self.command: command_with_sudo = self.command.replace("sudo", "sudo -S") stdin, stdout, stderr = self.ssh_client.exec_command(command_with_sudo) # 检查是否需要密码 password_prompt = False for line in stderr: self.output_signal.emit(line) if "password for" in line.lower() or "密码" in line: password_prompt = True break # 如果需要密码,请求用户输入 if password_prompt: self.waiting_for_password = True self.password_request_signal.emit() # 等待密码输入 while self.waiting_for_password: self.msleep(100) # 发送密码 if self.password: stdin.write(self.password + "\n") stdin.flush() else: stdin, stdout, stderr = self.ssh_client.exec_command(self.command) # 读取输出 output = "" for line in stdout: output += line self.output_signal.emit(line) # 读取错误 error = "" for line in stderr: error += line if "password for" not in line.lower() and "密码" not in line: # 避免重复显示密码提示 self.output_signal.emit(f"错误: {line}") # 检查退出状态 exit_status = stdout.channel.recv_exit_status() if exit_status == 0: logger.info(f"命令执行成功: {self.command}") self.finished_signal.emit(True, "命令执行成功") else: logger.error(f"命令执行失败,退出状态: {exit_status}") self.finished_signal.emit(False, f"命令执行失败,退出状态: {exit_status}") except Exception as e: logger.error(f"执行命令时发生错误: {str(e)}") self.output_signal.emit(f"错误: {str(e)}") self.finished_signal.emit(False, f"执行命令时发生错误: {str(e)}") class RemoteCommandsTab(QWidget): def __init__(self): super().__init__() logger.info("初始化远程命令标签页") self.ssh_client = None self.command_thread = None self.init_ui() def init_ui(self): # 主布局 main_layout = QVBoxLayout() # Git操作组 git_group = QGroupBox("Git操作") git_layout = QVBoxLayout() # 安装Git按钮 install_git_layout = QHBoxLayout() self.install_git_button = QPushButton("安装Git") self.install_git_button.clicked.connect(self.install_git) install_git_layout.addWidget(self.install_git_button) install_git_layout.addStretch() git_layout.addLayout(install_git_layout) # 克隆项目 clone_layout = QHBoxLayout() # 左侧:仓库URL和目录信息 left_layout = QFormLayout() self.repo_url_input = QLineEdit() left_layout.addRow("仓库URL:", self.repo_url_input) self.remote_dir_display = QLineEdit() self.remote_dir_display.setReadOnly(True) left_layout.addRow("远程目录:", self.remote_dir_display) clone_layout.addLayout(left_layout) # 右侧:按钮 right_layout = QVBoxLayout() self.clone_button = QPushButton("克隆项目") self.clone_button.clicked.connect(self.clone_repository) right_layout.addWidget(self.clone_button) right_layout.addStretch() clone_layout.addLayout(right_layout) git_layout.addLayout(clone_layout) git_group.setLayout(git_layout) main_layout.addWidget(git_group) # 命令输出区域 output_group = QGroupBox("命令输出") output_layout = QVBoxLayout() self.output_text = QTextEdit() self.output_text.setReadOnly(True) output_layout.addWidget(self.output_text) output_group.setLayout(output_layout) main_layout.addWidget(output_group) # 状态栏 self.status_label = QLabel("就绪") self.status_label.setAlignment(Qt.AlignCenter) main_layout.addWidget(self.status_label) # 添加伸缩空间 main_layout.addStretch() self.setLayout(main_layout) logger.info("远程命令标签页UI初始化完成") def set_ssh_client(self, ssh_client): logger.info("设置SSH客户端") self.ssh_client = ssh_client if self.ssh_client: self.status_label.setText("已连接到服务器") self.status_label.setStyleSheet("color: green;") else: self.status_label.setText("未连接到服务器") self.status_label.setStyleSheet("color: red;") def set_server_config(self, git_url, remote_dir): logger.info(f"设置服务器配置: git_url={git_url}, remote_dir={remote_dir}") self.repo_url_input.setText(git_url) self.remote_dir_display.setText(remote_dir) def install_git(self): logger.info("安装Git") if not self.ssh_client: QMessageBox.warning(self, "警告", "请先连接到服务器") return self.output_text.clear() self.status_label.setText("正在安装Git...") # 创建并启动线程执行命令 self.command_thread = RemoteCommandThread(self.ssh_client, "sudo apt update && sudo apt install -y git") self.command_thread.output_signal.connect(self.append_output) self.command_thread.finished_signal.connect(self.on_command_finished) self.command_thread.password_request_signal.connect(self.request_password) self.command_thread.start() def request_password(self): logger.info("请求输入sudo密码") # 创建密码输入对话框 dialog = PasswordDialog(self) if dialog.exec() == QDialog.Accepted: password = dialog.get_password() if password: self.command_thread.set_password(password) self.output_text.append("密码已发送\n") else: self.output_text.append("已取消输入密码\n") self.command_thread.set_password("") self.command_thread.waiting_for_password = False def clone_repository(self): logger.info("克隆仓库") if not self.ssh_client: QMessageBox.warning(self, "警告", "请先连接到服务器") return repo_url = self.repo_url_input.text().strip() if not repo_url: QMessageBox.warning(self, "警告", "请输入仓库URL") return remote_dir = self.remote_dir_display.text().strip() self.output_text.clear() self.status_label.setText("正在克隆仓库...") # 构建克隆命令 if remote_dir: # 如果指定了远程目录,先创建目录(如果不存在),然后克隆到指定目录 clone_command = f"mkdir -p {remote_dir} && cd {remote_dir} && git clone {repo_url}" else: clone_command = f"git clone {repo_url}" # 创建并启动线程执行命令 self.command_thread = RemoteCommandThread(self.ssh_client, clone_command) self.command_thread.output_signal.connect(self.append_output) self.command_thread.finished_signal.connect(self.on_command_finished) self.command_thread.start() def append_output(self, text): self.output_text.append(text) def on_command_finished(self, success, message): if success: self.status_label.setText(message) self.status_label.setStyleSheet("color: green;") QMessageBox.information(self, "成功", message) else: self.status_label.setText(message) self.status_label.setStyleSheet("color: red;") QMessageBox.warning(self, "错误", message)