Files
diary-news/scripts/server_init.py

379 lines
13 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""远程服务器初始化(系统级),幂等可重跑。
执行顺序(每步独立可跳过):
1. 推送公钥到 /root/.ssh/authorized_keys
2. 修改主机名(默认 HK-News)
3. 统一时区到 Asia/Shanghai + 启用 NTP 同步
4. 启用 BBR 拥塞控制(需内核 >= 4.9)
5. 创建 1G Swap
6. 调整 swappiness=10
7. 系统更新 + 安全补丁
8. key 复登,验证一切正常
依赖:
pip install paramiko
用法:
# Windows PowerShell
$env:REMOTE_PASS = '密码'
python scripts/server_init.py
python scripts/server_init.py --dry-run
python scripts/server_init.py --only push,bbr,swap
python scripts/server_init.py --skip update
# Linux / macOS
REMOTE_PASS=密码 python scripts/server_init.py
环境变量(可覆盖默认值):
REMOTE_HOST 207.57.129.228
REMOTE_PORT 19717
REMOTE_USER root
REMOTE_PASS (必填)
NEW_HOSTNAME HK-News
TIMEZONE Asia/Shanghai
SWAP_SIZE_MB 1024
SWAPPINESS 10
SSH_PUB_KEY_PATH ~/.ssh/id_ed25519.pub
"""
from __future__ import annotations
import argparse
import os
import sys
import time
from typing import Callable
import paramiko
# ---------- 配置 ----------
def _env(name: str, default: str, cast: Callable = str):
return cast(os.environ.get(name, default))
HOST = _env("REMOTE_HOST", "207.57.129.228")
PORT = _env("REMOTE_PORT", "19717", int)
USER = _env("REMOTE_USER", "root")
NEW_HOST = _env("NEW_HOSTNAME", "HK-News")
TIMEZONE = _env("TIMEZONE", "Asia/Shanghai")
SWAP_MB = _env("SWAP_SIZE_MB", "1024", int)
SWAPPINESS = _env("SWAPPINESS", "10", int)
PUB_KEY = os.path.expanduser(_env("SSH_PUB_KEY_PATH", "~/.ssh/id_ed25519.pub"))
# ---------- SSH 工具 ----------
def connect() -> paramiko.SSHClient:
pw = os.environ.get("REMOTE_PASS")
if not pw:
print("ERROR: 请先设置环境变量 REMOTE_PASS", file=sys.stderr)
sys.exit(2)
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
c.connect(
HOST, port=PORT, username=USER, password=pw,
timeout=30, banner_timeout=30, auth_timeout=30,
allow_agent=False, look_for_keys=False,
)
return c
def run(c: paramiko.SSHClient, cmd: str, check: bool = True,
timeout: int = 300, label: str = ""):
"""执行远程命令,打印输出,出错按 check 决定是否抛异常。"""
if label:
print(f"# {label}")
print(f"$ {cmd}")
try:
_si, so, se = c.exec_command(cmd, timeout=timeout, get_pty=True)
out = so.read().decode(errors="replace")
err = se.read().decode(errors="replace")
rc = so.channel.recv_exit_status()
except Exception as e:
print(f"[超时/异常] {e}")
if check:
raise
return -1, "", str(e)
if out.strip():
print(out.rstrip())
if err.strip():
print(f"[stderr] {err.rstrip()}")
print(f"-> exit={rc}\n")
if check and rc != 0:
raise RuntimeError(f"cmd failed (rc={rc}): {cmd}")
return rc, out, err
# ---------- 步骤实现 ----------
def step_push(c: paramiko.SSHClient) -> None:
"""1) 推公钥(用 fingerprint 去重,marker 保留以兼容历史)。"""
import subprocess
with open(PUB_KEY, encoding="utf-8") as f:
pub = f.read().strip()
# 本地算 fingerprint(SSH 公钥的 SHA256)
fp_proc = subprocess.run(
["ssh-keygen", "-lf", PUB_KEY],
capture_output=True, text=True, check=True,
)
fingerprint = fp_proc.stdout.split()[1] # 形如 "SHA256:xxxx"
print(f" 本次公钥 fingerprint: {fingerprint}")
# 服务器上是否已经有这个 fingerprint
_, existing, _ = run(
c, f"grep -F '{fingerprint}' /root/.ssh/authorized_keys 2>/dev/null || true",
check=False,
)
if existing.strip():
print(f" 公钥已在 authorized_keys 中,跳过")
return
marker = "news-deploy-key"
if marker not in pub:
pub = f"{pub} {marker}"
run(c, "mkdir -p /root/.ssh && chmod 700 /root/.ssh")
run(c, "touch /root/.ssh/authorized_keys && chmod 600 /root/.ssh/authorized_keys")
quoted = pub.replace("'", "'\\''")
run(c, f"printf '%s\\n' '{quoted}' >> /root/.ssh/authorized_keys")
print(f" ✓ 公钥已追加({fingerprint})")
def step_hostname(c: paramiko.SSHClient) -> None:
"""2) 改主机名 + 同步 /etc/hosts。"""
_, current, _ = run(c, "hostname", check=False)
print(f" 当前: {current.strip()}")
if current.strip() == NEW_HOST:
print(" 已是目标主机名,跳过")
return
run(c, f"hostnamectl set-hostname {NEW_HOST}")
# /etc/hosts:有就改,没有就加
run(c,
f"if grep -E '^127\\.0\\.1\\.1\\b' /etc/hosts >/dev/null; then "
f" sed -i 's/^127\\.0\\.1\\.1.*/127.0.1.1 {NEW_HOST}/' /etc/hosts; "
f"else "
f" echo '127.0.1.1 {NEW_HOST}' >> /etc/hosts; "
f"fi")
print(f" ✓ 主机名已设为 {NEW_HOST}")
def step_tz_ntp(c: paramiko.SSHClient) -> None:
"""3) 时区 + NTP 同步。"""
run(c, f"timedatectl set-timezone {TIMEZONE}")
_, tz, _ = run(c, "timedatectl show -p Timezone --value", check=False)
print(f" Timezone: {tz.strip()}")
run(c, "timedatectl set-ntp true", check=False)
# 启用常见的 NTP 客户端之一
for svc in ("systemd-timesyncd", "chrony", "ntp", "ntpd"):
_, has, _ = run(
c,
f"systemctl list-unit-files {svc}.service 2>/dev/null | grep -q '^{svc}\\.service' && echo y || true",
check=False,
)
if has.strip():
run(c, f"systemctl enable --now {svc} 2>/dev/null || true", check=False)
break
# 强制同步一次
run(c, "chronyc -a makestep 2>/dev/null || true", check=False)
run(c, "timedatectl wait-for-synced --timeout=30 2>/dev/null || true", check=False)
_, sync, _ = run(c, "timedatectl show -p NTPSynchronized --value", check=False)
print(f" NTPSynchronized: {sync.strip() or '(unknown)'}")
def step_bbr(c: paramiko.SSHClient) -> None:
"""4) 启用 BBR。"""
_, ver, _ = run(c, "uname -r", check=False)
print(f" 内核: {ver.strip()}")
parts = ver.strip().split('-')[0].split('.')
try:
major, minor = int(parts[0]), int(parts[1])
except (ValueError, IndexError):
major, minor = 0, 0
if (major, minor) < (4, 9):
print(f" ✗ 内核 {ver.strip()} 低于 4.9,BBR 不可用,跳过")
return
run(c, "modprobe tcp_bbr 2>/dev/null || true", check=False)
run(c, "sysctl -w net.core.default_qdisc=fq")
run(c, "sysctl -w net.ipv4.tcp_congestion_control=bbr")
# 持久化到 /etc/sysctl.conf
run(c, f"""bash -c '
CONF=/etc/sysctl.conf
if grep -q "^net\\.core\\.default_qdisc" "$CONF"; then
sed -i "s/^net\\.core\\.default_qdisc.*/net.core.default_qdisc = fq/" "$CONF"
else
echo "net.core.default_qdisc = fq" >> "$CONF"
fi
if grep -q "^net\\.ipv4\\.tcp_congestion_control" "$CONF"; then
sed -i "s/^net\\.ipv4\\.tcp_congestion_control.*/net.ipv4.tcp_congestion_control = bbr/" "$CONF"
else
echo "net.ipv4.tcp_congestion_control = bbr" >> "$CONF"
fi
'""")
_, ctrl, _ = run(c, "sysctl net.ipv4.tcp_congestion_control", check=False)
print(f" 当前算法: {ctrl.strip()}")
def step_swap(c: paramiko.SSHClient) -> None:
"""5) 创建 1G Swap(幂等)。"""
_, existing, _ = run(c, "swapon --show --noheadings", check=False)
if existing.strip():
print(f" 已有 swap:\n{existing.strip()}")
return
# 优先 fallocate,失败回退 dd
run(c,
f"fallocate -l {SWAP_MB}M /swapfile 2>/dev/null || "
f"dd if=/dev/zero of=/swapfile bs=1M count={SWAP_MB} status=none",
timeout=120)
run(c, "chmod 600 /swapfile")
run(c, "mkswap /swapfile")
run(c, "swapon /swapfile")
# fstab
_, has, _ = run(c, "grep -E '^/swapfile\\b' /etc/fstab || true", check=False)
if not has.strip():
run(c, "printf '%s\\n' '/swapfile none swap sw 0 0' >> /etc/fstab")
_, info, _ = run(c, "free -h | awk '/^Swap:/'", check=False)
print(f" {info.strip() or '(查看 free -h)'}")
def step_swappiness(c: paramiko.SSHClient) -> None:
"""6) 调整 swappiness(默认 10)。"""
run(c, f"sysctl -w vm.swappiness={SWAPPINESS}")
run(c, f"""bash -c '
C=/etc/sysctl.conf
if grep -q "^vm\\.swappiness" "$C"; then
sed -i "s/^vm\\.swappiness.*/vm.swappiness = {SWAPPINESS}/" "$C"
else
echo "vm.swappiness = {SWAPPINESS}" >> "$C"
fi
'""")
_, v, _ = run(c, "cat /proc/sys/vm/swappiness", check=False)
print(f" 当前: {v.strip()}")
def step_update(c: paramiko.SSHClient) -> None:
"""7) 系统更新(检测 apt/yum/dnf/apk)。"""
_, pm_line, _ = run(
c, "command -v apt-get yum dnf apk 2>/dev/null | head -1", check=False,
)
pm = pm_line.strip()
print(f" 包管理器: {pm or '(none)'}")
if "apt" in pm:
run(c, "export DEBIAN_FRONTEND=noninteractive; apt-get update -y", timeout=300)
run(c, "export DEBIAN_FRONTEND=noninteractive; apt-get upgrade -y", timeout=1500)
run(c, "export DEBIAN_FRONTEND=noninteractive; apt-get autoremove -y",
check=False, timeout=300)
elif "yum" in pm or "dnf" in pm:
run(c, f"{pm} -y update", timeout=1500)
elif "apk" in pm:
run(c, "apk update", timeout=120)
run(c, "apk upgrade", timeout=1500)
else:
print(" ✗ 未识别包管理器,跳过")
def verify_key_login() -> None:
"""8) 用 key 复登,确认免密 OK。"""
priv = PUB_KEY.replace(".pub", "")
if not os.path.exists(priv):
print(f" 私钥不存在: {priv},跳过验证")
return
print(f" 用私钥 {priv} 登录...")
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
pkey = paramiko.Ed25519Key.from_private_key_file(priv)
except Exception:
pkey = paramiko.RSAKey.from_private_key_file(priv)
c.connect(HOST, port=PORT, username=USER, pkey=pkey,
timeout=15, banner_timeout=15, auth_timeout=15,
allow_agent=False, look_for_keys=False)
_si, so, _se = c.exec_command(
"hostname; uptime -p; date; "
"free -h | head -2; "
"swapon --show --noheadings; "
"sysctl -n net.ipv4.tcp_congestion_control; "
"timedatectl show -p Timezone --value"
)
print(so.read().decode().rstrip())
c.close()
print(" ✓ key 登录成功")
# ---------- 步骤注册表 ----------
STEPS: list[tuple[str, str, Callable]] = [
("push", "推送公钥", step_push),
("hostname", f"改主机名为 {NEW_HOST}", step_hostname),
("tz", f"时区 {TIMEZONE} + NTP", step_tz_ntp),
("bbr", "启用 BBR", step_bbr),
("swap", f"创建 {SWAP_MB}M Swap", step_swap),
("swappiness", f"Swappiness={SWAPPINESS}", step_swappiness),
("update", "系统更新", step_update),
]
# ---------- 入口 ----------
def main() -> None:
p = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
p.add_argument("--dry-run", action="store_true",
help="只展示计划,不执行")
p.add_argument("--only", default="",
help="只跑指定步骤(逗号分隔,如 push,bbr,swap)")
p.add_argument("--skip", default="",
help="跳过指定步骤(逗号分隔,如 update)")
p.add_argument("--no-verify", action="store_true",
help="不验证 key 登录")
args = p.parse_args()
if args.dry_run:
print(f"目标: {USER}@{HOST}:{PORT}")
print(f"计划执行:")
for k, n, _ in STEPS:
print(f" [{k:10}] {n}")
if not args.no_verify:
print(f" [verify] key 登录验证")
return
only = {s.strip() for s in args.only.split(",") if s.strip()}
skip = {s.strip() for s in args.skip.split(",") if s.strip()}
print(f"=== 连接 {USER}@{HOST}:{PORT} ===")
c = connect()
results: list[tuple[str, str, float]] = []
try:
for k, name, fn in STEPS:
if only and k not in only:
continue
if k in skip:
continue
print(f"\n=== [{k}] {name} ===")
t0 = time.time()
try:
fn(c)
results.append((k, "OK", time.time() - t0))
except Exception as e:
print(f" ✗ 失败: {e}")
results.append((k, f"FAIL: {e}", time.time() - t0))
if not args.no_verify and (not only or "verify" in only):
print("\n=== [verify] key 登录验证 ===")
t0 = time.time()
try:
verify_key_login()
results.append(("verify", "OK", time.time() - t0))
except Exception as e:
print(f" ✗ 失败: {e}")
results.append(("verify", f"FAIL: {e}", time.time() - t0))
finally:
c.close()
print("\n=== 执行结果 ===")
for k, status, dur in results:
print(f" [{k:10}] {status:30} ({dur:.1f}s)")
failed = [r for r in results if not r[1].startswith("OK")]
if failed:
print(f"\n{len(failed)} 项失败,需手动处理")
sys.exit(1)
print("\n全部完成 ✓")
if __name__ == "__main__":
main()