#!/usr/bin/env python3 """远程服务器初始化(系统级),幂等可重跑。 执行顺序(每步独立、可跳过): 1. 推送公钥到 /root/.ssh/authorized_keys 2. 修改主机名(默认 HK-News) 3. 统一时区到 Asia/Shanghai + 启用 NTP 同步 4. 启用 BBR 拥塞控制(需内核 >= 4.9) 5. 创建 1G Swap 6. 调整 swappiness=10 7. 系统更新 + 安全补丁 8. 用 key 复登,验证一切正常 依赖: pip install paramiko 用法: # Windows PowerShell $env:REMOTE_PASS = '密码' python scripts/server_init.py python scripts/server_init.py --dry-run python scripts/server_init.py --only push,bbr,swap python scripts/server_init.py --skip update # Linux / macOS REMOTE_PASS=密码 python scripts/server_init.py 环境变量(可覆盖默认值): REMOTE_HOST 207.57.129.228 REMOTE_PORT 19717 REMOTE_USER root REMOTE_PASS (必填) NEW_HOSTNAME HK-News TIMEZONE Asia/Shanghai SWAP_SIZE_MB 1024 SWAPPINESS 10 SSH_PUB_KEY_PATH ~/.ssh/id_ed25519.pub """ from __future__ import annotations import argparse import os import sys import time from typing import Callable import paramiko # ---------- 配置 ---------- def _env(name: str, default: str, cast: Callable = str): return cast(os.environ.get(name, default)) HOST = _env("REMOTE_HOST", "207.57.129.228") PORT = _env("REMOTE_PORT", "19717", int) USER = _env("REMOTE_USER", "root") NEW_HOST = _env("NEW_HOSTNAME", "HK-News") TIMEZONE = _env("TIMEZONE", "Asia/Shanghai") SWAP_MB = _env("SWAP_SIZE_MB", "1024", int) SWAPPINESS = _env("SWAPPINESS", "10", int) PUB_KEY = os.path.expanduser(_env("SSH_PUB_KEY_PATH", "~/.ssh/id_ed25519.pub")) # ---------- SSH 工具 ---------- def connect() -> paramiko.SSHClient: pw = os.environ.get("REMOTE_PASS") if not pw: print("ERROR: 请先设置环境变量 REMOTE_PASS", file=sys.stderr) sys.exit(2) c = paramiko.SSHClient() c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) c.connect( HOST, port=PORT, username=USER, password=pw, timeout=30, banner_timeout=30, auth_timeout=30, allow_agent=False, look_for_keys=False, ) return c def run(c: paramiko.SSHClient, cmd: str, check: bool = True, timeout: int = 300, label: str = ""): """执行远程命令,打印输出,出错按 check 决定是否抛异常。""" if label: print(f"# {label}") print(f"$ {cmd}") try: _si, so, se = c.exec_command(cmd, timeout=timeout, get_pty=True) out = so.read().decode(errors="replace") err = se.read().decode(errors="replace") rc = so.channel.recv_exit_status() except Exception as e: print(f"[超时/异常] {e}") if check: raise return -1, "", str(e) if out.strip(): print(out.rstrip()) if err.strip(): print(f"[stderr] {err.rstrip()}") print(f"-> exit={rc}\n") if check and rc != 0: raise RuntimeError(f"cmd failed (rc={rc}): {cmd}") return rc, out, err # ---------- 步骤实现 ---------- def step_push(c: paramiko.SSHClient) -> None: """1) 推公钥(用 fingerprint 去重,marker 保留以兼容历史)。""" import subprocess with open(PUB_KEY, encoding="utf-8") as f: pub = f.read().strip() # 本地算 fingerprint(SSH 公钥的 SHA256) fp_proc = subprocess.run( ["ssh-keygen", "-lf", PUB_KEY], capture_output=True, text=True, check=True, ) fingerprint = fp_proc.stdout.split()[1] # 形如 "SHA256:xxxx" print(f" 本次公钥 fingerprint: {fingerprint}") # 服务器上是否已经有这个 fingerprint _, existing, _ = run( c, f"grep -F '{fingerprint}' /root/.ssh/authorized_keys 2>/dev/null || true", check=False, ) if existing.strip(): print(f" 公钥已在 authorized_keys 中,跳过") return marker = "news-deploy-key" if marker not in pub: pub = f"{pub} {marker}" run(c, "mkdir -p /root/.ssh && chmod 700 /root/.ssh") run(c, "touch /root/.ssh/authorized_keys && chmod 600 /root/.ssh/authorized_keys") quoted = pub.replace("'", "'\\''") run(c, f"printf '%s\\n' '{quoted}' >> /root/.ssh/authorized_keys") print(f" ✓ 公钥已追加({fingerprint})") def step_hostname(c: paramiko.SSHClient) -> None: """2) 改主机名 + 同步 /etc/hosts。""" _, current, _ = run(c, "hostname", check=False) print(f" 当前: {current.strip()}") if current.strip() == NEW_HOST: print(" 已是目标主机名,跳过") return run(c, f"hostnamectl set-hostname {NEW_HOST}") # /etc/hosts:有就改,没有就加 run(c, f"if grep -E '^127\\.0\\.1\\.1\\b' /etc/hosts >/dev/null; then " f" sed -i 's/^127\\.0\\.1\\.1.*/127.0.1.1 {NEW_HOST}/' /etc/hosts; " f"else " f" echo '127.0.1.1 {NEW_HOST}' >> /etc/hosts; " f"fi") print(f" ✓ 主机名已设为 {NEW_HOST}") def step_tz_ntp(c: paramiko.SSHClient) -> None: """3) 时区 + NTP 同步。""" run(c, f"timedatectl set-timezone {TIMEZONE}") _, tz, _ = run(c, "timedatectl show -p Timezone --value", check=False) print(f" Timezone: {tz.strip()}") run(c, "timedatectl set-ntp true", check=False) # 启用常见的 NTP 客户端之一 for svc in ("systemd-timesyncd", "chrony", "ntp", "ntpd"): _, has, _ = run( c, f"systemctl list-unit-files {svc}.service 2>/dev/null | grep -q '^{svc}\\.service' && echo y || true", check=False, ) if has.strip(): run(c, f"systemctl enable --now {svc} 2>/dev/null || true", check=False) break # 强制同步一次 run(c, "chronyc -a makestep 2>/dev/null || true", check=False) run(c, "timedatectl wait-for-synced --timeout=30 2>/dev/null || true", check=False) _, sync, _ = run(c, "timedatectl show -p NTPSynchronized --value", check=False) print(f" NTPSynchronized: {sync.strip() or '(unknown)'}") def step_bbr(c: paramiko.SSHClient) -> None: """4) 启用 BBR。""" _, ver, _ = run(c, "uname -r", check=False) print(f" 内核: {ver.strip()}") parts = ver.strip().split('-')[0].split('.') try: major, minor = int(parts[0]), int(parts[1]) except (ValueError, IndexError): major, minor = 0, 0 if (major, minor) < (4, 9): print(f" ✗ 内核 {ver.strip()} 低于 4.9,BBR 不可用,跳过") return run(c, "modprobe tcp_bbr 2>/dev/null || true", check=False) run(c, "sysctl -w net.core.default_qdisc=fq") run(c, "sysctl -w net.ipv4.tcp_congestion_control=bbr") # 持久化到 /etc/sysctl.conf run(c, f"""bash -c ' CONF=/etc/sysctl.conf if grep -q "^net\\.core\\.default_qdisc" "$CONF"; then sed -i "s/^net\\.core\\.default_qdisc.*/net.core.default_qdisc = fq/" "$CONF" else echo "net.core.default_qdisc = fq" >> "$CONF" fi if grep -q "^net\\.ipv4\\.tcp_congestion_control" "$CONF"; then sed -i "s/^net\\.ipv4\\.tcp_congestion_control.*/net.ipv4.tcp_congestion_control = bbr/" "$CONF" else echo "net.ipv4.tcp_congestion_control = bbr" >> "$CONF" fi '""") _, ctrl, _ = run(c, "sysctl net.ipv4.tcp_congestion_control", check=False) print(f" 当前算法: {ctrl.strip()}") def step_swap(c: paramiko.SSHClient) -> None: """5) 创建 1G Swap(幂等)。""" _, existing, _ = run(c, "swapon --show --noheadings", check=False) if existing.strip(): print(f" 已有 swap:\n{existing.strip()}") return # 优先 fallocate,失败回退 dd run(c, f"fallocate -l {SWAP_MB}M /swapfile 2>/dev/null || " f"dd if=/dev/zero of=/swapfile bs=1M count={SWAP_MB} status=none", timeout=120) run(c, "chmod 600 /swapfile") run(c, "mkswap /swapfile") run(c, "swapon /swapfile") # fstab _, has, _ = run(c, "grep -E '^/swapfile\\b' /etc/fstab || true", check=False) if not has.strip(): run(c, "printf '%s\\n' '/swapfile none swap sw 0 0' >> /etc/fstab") _, info, _ = run(c, "free -h | awk '/^Swap:/'", check=False) print(f" {info.strip() or '(查看 free -h)'}") def step_swappiness(c: paramiko.SSHClient) -> None: """6) 调整 swappiness(默认 10)。""" run(c, f"sysctl -w vm.swappiness={SWAPPINESS}") run(c, f"""bash -c ' C=/etc/sysctl.conf if grep -q "^vm\\.swappiness" "$C"; then sed -i "s/^vm\\.swappiness.*/vm.swappiness = {SWAPPINESS}/" "$C" else echo "vm.swappiness = {SWAPPINESS}" >> "$C" fi '""") _, v, _ = run(c, "cat /proc/sys/vm/swappiness", check=False) print(f" 当前: {v.strip()}") def step_update(c: paramiko.SSHClient) -> None: """7) 系统更新(检测 apt/yum/dnf/apk)。""" _, pm_line, _ = run( c, "command -v apt-get yum dnf apk 2>/dev/null | head -1", check=False, ) pm = pm_line.strip() print(f" 包管理器: {pm or '(none)'}") if "apt" in pm: run(c, "export DEBIAN_FRONTEND=noninteractive; apt-get update -y", timeout=300) run(c, "export DEBIAN_FRONTEND=noninteractive; apt-get upgrade -y", timeout=1500) run(c, "export DEBIAN_FRONTEND=noninteractive; apt-get autoremove -y", check=False, timeout=300) elif "yum" in pm or "dnf" in pm: run(c, f"{pm} -y update", timeout=1500) elif "apk" in pm: run(c, "apk update", timeout=120) run(c, "apk upgrade", timeout=1500) else: print(" ✗ 未识别包管理器,跳过") def verify_key_login() -> None: """8) 用 key 复登,确认免密 OK。""" priv = PUB_KEY.replace(".pub", "") if not os.path.exists(priv): print(f" 私钥不存在: {priv},跳过验证") return print(f" 用私钥 {priv} 登录...") c = paramiko.SSHClient() c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: pkey = paramiko.Ed25519Key.from_private_key_file(priv) except Exception: pkey = paramiko.RSAKey.from_private_key_file(priv) c.connect(HOST, port=PORT, username=USER, pkey=pkey, timeout=15, banner_timeout=15, auth_timeout=15, allow_agent=False, look_for_keys=False) _si, so, _se = c.exec_command( "hostname; uptime -p; date; " "free -h | head -2; " "swapon --show --noheadings; " "sysctl -n net.ipv4.tcp_congestion_control; " "timedatectl show -p Timezone --value" ) print(so.read().decode().rstrip()) c.close() print(" ✓ key 登录成功") # ---------- 步骤注册表 ---------- STEPS: list[tuple[str, str, Callable]] = [ ("push", "推送公钥", step_push), ("hostname", f"改主机名为 {NEW_HOST}", step_hostname), ("tz", f"时区 {TIMEZONE} + NTP", step_tz_ntp), ("bbr", "启用 BBR", step_bbr), ("swap", f"创建 {SWAP_MB}M Swap", step_swap), ("swappiness", f"Swappiness={SWAPPINESS}", step_swappiness), ("update", "系统更新", step_update), ] # ---------- 入口 ---------- def main() -> None: p = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) p.add_argument("--dry-run", action="store_true", help="只展示计划,不执行") p.add_argument("--only", default="", help="只跑指定步骤(逗号分隔,如 push,bbr,swap)") p.add_argument("--skip", default="", help="跳过指定步骤(逗号分隔,如 update)") p.add_argument("--no-verify", action="store_true", help="不验证 key 登录") args = p.parse_args() if args.dry_run: print(f"目标: {USER}@{HOST}:{PORT}") print(f"计划执行:") for k, n, _ in STEPS: print(f" [{k:10}] {n}") if not args.no_verify: print(f" [verify] key 登录验证") return only = {s.strip() for s in args.only.split(",") if s.strip()} skip = {s.strip() for s in args.skip.split(",") if s.strip()} print(f"=== 连接 {USER}@{HOST}:{PORT} ===") c = connect() results: list[tuple[str, str, float]] = [] try: for k, name, fn in STEPS: if only and k not in only: continue if k in skip: continue print(f"\n=== [{k}] {name} ===") t0 = time.time() try: fn(c) results.append((k, "OK", time.time() - t0)) except Exception as e: print(f" ✗ 失败: {e}") results.append((k, f"FAIL: {e}", time.time() - t0)) if not args.no_verify and (not only or "verify" in only): print("\n=== [verify] key 登录验证 ===") t0 = time.time() try: verify_key_login() results.append(("verify", "OK", time.time() - t0)) except Exception as e: print(f" ✗ 失败: {e}") results.append(("verify", f"FAIL: {e}", time.time() - t0)) finally: c.close() print("\n=== 执行结果 ===") for k, status, dur in results: print(f" [{k:10}] {status:30} ({dur:.1f}s)") failed = [r for r in results if not r[1].startswith("OK")] if failed: print(f"\n{len(failed)} 项失败,需手动处理") sys.exit(1) print("\n全部完成 ✓") if __name__ == "__main__": main()