"""推公钥到远程服务器 /root/.ssh/authorized_keys。 用法:python _push_key.py 依赖:paramiko """ import os import sys import paramiko HOST = "207.57.129.228" PORT = 19717 USER = "root" PASSWORD = os.environ["REMOTE_PASS"] # 由调用方设置 PUB_KEY_PATH = os.path.expanduser("~/.ssh/id_rsa.pub") def main() -> int: pub = open(PUB_KEY_PATH, encoding="utf-8").read().strip() print(f"公钥({PUB_KEY_PATH})前 60 字符: {pub[:60]}...") client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: client.connect( HOST, port=PORT, username=USER, password=PASSWORD, timeout=15, allow_agent=False, look_for_keys=False, ) except Exception as e: print(f"连接失败: {e}", file=sys.stderr) return 1 # 1) 检查 .ssh 目录 cmds = [ "mkdir -p /root/.ssh && chmod 700 /root/.ssh", f"touch /root/.ssh/authorized_keys && chmod 600 /root/.ssh/authorized_keys", ] for c in cmds: _exec(client, c) # 2) 检查是否已经存在(去重) stdin, stdout, stderr = client.exec_command("grep -F -c 'news-deploy-key' /root/.ssh/authorized_keys || true") # 用一个独特注释,后续可识别 marker = "news-deploy-key" if marker not in pub: pub_with_marker = f"{pub} {marker}" else: pub_with_marker = pub stdin, stdout, stderr = client.exec_command("cat /root/.ssh/authorized_keys | grep -F '" + marker + "' || true") existing = stdout.read().decode().strip() if existing: print(f"已存在,跳过: {existing[:60]}...") else: # 写入(用 heredoc 避免转义) quoted = pub_with_marker.replace("'", "'\\''") cmd = f"echo '{quoted}' >> /root/.ssh/authorized_keys" _exec(client, cmd) print(f"已追加公钥到 /root/.ssh/authorized_keys") # 3) 验证权限 _exec(client, "ls -la /root/.ssh/ /root/.ssh/authorized_keys") # 4) 关闭密码登录(可选项,MVP 保留) client.close() return 0 def _exec(client: paramiko.SSHClient, cmd: str) -> None: print(f"$ {cmd}") stdin, stdout, stderr = client.exec_command(cmd, timeout=15) out = stdout.read().decode().strip() err = stderr.read().decode().strip() if out: print(out) if err: print(f"[stderr] {err}", file=sys.stderr) if __name__ == "__main__": sys.exit(main())