diff --git a/scripts/push_ssh_key.py b/scripts/push_ssh_key.py index 211f542..31e755d 100644 --- a/scripts/push_ssh_key.py +++ b/scripts/push_ssh_key.py @@ -1,23 +1,54 @@ """推公钥到远程服务器 /root/.ssh/authorized_keys。 -用法:python _push_key.py +用法: + set REMOTE_PASS=xxx (Windows PowerShell: $env:REMOTE_PASS='xxx') + python scripts/push_ssh_key.py + 依赖:paramiko + +密钥优先级:id_ed25519.pub > id_rsa.pub(可由环境变量 SSH_PUB_KEY_PATH 覆盖) + +去重策略:用 SSH 公钥 fingerprint(SHA256)匹配,而不是用 marker。 +这样同一台机器多次推、不同算法/不同机器都能正确去重。 """ import os +import subprocess import sys + import paramiko HOST = "207.57.129.228" PORT = 19717 USER = "root" PASSWORD = os.environ["REMOTE_PASS"] # 由调用方设置 -PUB_KEY_PATH = os.path.expanduser("~/.ssh/id_rsa.pub") +# 优先 ed25519,fallback 到 rsa;亦可通过 SSH_PUB_KEY_PATH 指定 +_PUB_KEY_CANDIDATES = ("~/.ssh/id_ed25519.pub", "~/.ssh/id_rsa.pub") +_candidate = os.environ.get("SSH_PUB_KEY_PATH") +if _candidate: + PUB_KEY_PATH = os.path.expanduser(_candidate) +else: + PUB_KEY_PATH = None + for _p in _PUB_KEY_CANDIDATES: + _expanded = os.path.expanduser(_p) + if os.path.exists(_expanded): + PUB_KEY_PATH = _expanded + break + if PUB_KEY_PATH is None: + PUB_KEY_PATH = os.path.expanduser(_PUB_KEY_CANDIDATES[0]) # 默认仍给一个清晰的报错路径 def main() -> int: pub = open(PUB_KEY_PATH, encoding="utf-8").read().strip() print(f"公钥({PUB_KEY_PATH})前 60 字符: {pub[:60]}...") + # 本地算 fingerprint(SSH 公钥的 SHA256,OpenSSH 标准格式) + fp_proc = subprocess.run( + ["ssh-keygen", "-lf", PUB_KEY_PATH], + capture_output=True, text=True, check=True, + ) + fingerprint = fp_proc.stdout.split()[1] # 形如 "SHA256:xxxx" + print(f"fingerprint: {fingerprint}") + client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: @@ -32,43 +63,43 @@ def main() -> int: # 1) 检查 .ssh 目录 cmds = [ "mkdir -p /root/.ssh && chmod 700 /root/.ssh", - f"touch /root/.ssh/authorized_keys && chmod 600 /root/.ssh/authorized_keys", + "touch /root/.ssh/authorized_keys && chmod 600 /root/.ssh/authorized_keys", ] for c in cmds: _exec(client, c) - # 2) 检查是否已经存在(去重) - stdin, stdout, stderr = client.exec_command("grep -F -c 'news-deploy-key' /root/.ssh/authorized_keys || true") - # 用一个独特注释,后续可识别 - marker = "news-deploy-key" - if marker not in pub: - pub_with_marker = f"{pub} {marker}" - else: - pub_with_marker = pub - stdin, stdout, stderr = client.exec_command("cat /root/.ssh/authorized_keys | grep -F '" + marker + "' || true") - existing = stdout.read().decode().strip() + # 2) 用 fingerprint 去重 — 服务器上是否已有这个 key + _si, _so, _se = client.exec_command( + f"grep -F '{fingerprint}' /root/.ssh/authorized_keys 2>/dev/null || true" + ) + existing = _so.read().decode().strip() if existing: - print(f"已存在,跳过: {existing[:60]}...") + print(f"该 fingerprint 已在 authorized_keys,跳过: {existing.splitlines()[0][:80]}...") else: - # 写入(用 heredoc 避免转义) + # 3) 追加(用 printf %s\\n 避免 echo 的换行问题) + marker = "news-deploy-key" + if marker not in pub: + pub_with_marker = f"{pub} {marker}" + else: + pub_with_marker = pub quoted = pub_with_marker.replace("'", "'\\''") - cmd = f"echo '{quoted}' >> /root/.ssh/authorized_keys" - _exec(client, cmd) - print(f"已追加公钥到 /root/.ssh/authorized_keys") + _exec(client, f"printf '%s\\n' '{quoted}' >> /root/.ssh/authorized_keys") + print(f"已追加公钥到 /root/.ssh/authorized_keys({fingerprint})") - # 3) 验证权限 + # 4) 验证权限 _exec(client, "ls -la /root/.ssh/ /root/.ssh/authorized_keys") + # 5) 列一下当前所有 key + _exec(client, "awk '{print NR, $1, $2, $3}' /root/.ssh/authorized_keys") - # 4) 关闭密码登录(可选项,MVP 保留) client.close() return 0 def _exec(client: paramiko.SSHClient, cmd: str) -> None: print(f"$ {cmd}") - stdin, stdout, stderr = client.exec_command(cmd, timeout=15) - out = stdout.read().decode().strip() - err = stderr.read().decode().strip() + _si, so, se = client.exec_command(cmd, timeout=15) + out = so.read().decode().strip() + err = se.read().decode().strip() if out: print(out) if err: