Files
diary-news/scripts/healthcheck.py

1289 lines
53 KiB
Python
Raw Normal View History

2026-06-11 17:24:46 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""diary-news 服务器健康检查 checklist。
可在本机跑(SSH 远端)或在服务器上直接跑( --local)
docker compose 6 个服务:postgres / redis / api / worker / caddy / frontend,
外加主机层面的端口/磁盘/内存/日志
依赖:
pip install paramiko
用法(Windows PowerShell):
$env:REMOTE_PASS = '你的root密码'
python scripts/healthcheck.py
python scripts/healthcheck.py --local # 在服务器上直接跑
python scripts/healthcheck.py --host 1.2.3.4 --port 22 --user news
python scripts/healthcheck.py --only docker,disk # 只跑指定组
python scripts/healthcheck.py --json out.json # 导出结构化结果
环境变量(可覆盖默认值):
REMOTE_HOST 207.57.129.228
REMOTE_PORT 19717
REMOTE_USER root
REMOTE_PASS (SSH 必填; --local 不需要)
COMPOSE_DIR /srv/news
API_BASE_URL http://127.0.0.1:8000 # API 健康检查端点
"""
from __future__ import annotations
import argparse
import base64
import json
import os
import re
import sys
import time
from dataclasses import dataclass, field, asdict
from typing import Callable, Optional
# 可选依赖:只在远程模式下需要
try:
import paramiko # type: ignore
except ImportError:
paramiko = None # --local 模式不强制
# ============== 配置 ==============
DEFAULT_HOST = "207.57.129.228"
DEFAULT_PORT = 19717
DEFAULT_USER = "root"
DEFAULT_COMPOSE = "/srv/news"
DEFAULT_API_BASE = "http://127.0.0.1/api/v1/healthz" # 走 Caddy 80 反代到 api:8000
SSH_TIMEOUT = 30
# docker-compose.yml 里声明的 6 个服务
EXPECTED_SERVICES = ["postgres", "redis", "api", "worker", "caddy", "frontend"]
# 关键端口(默认只检对外服务的 80;其他按需加)
KEY_PORTS = {
"http": 80, # Caddy / Frontend 对外端口
}
# ============== 数据结构 ==============
@dataclass
class Check:
name: str
group: str
ok: bool
summary: str
detail: str = ""
elapsed_ms: int = 0
severity: str = "info" # info / warn / error
command: str = "" # 执行的命令(失败时方便复现)
@dataclass
class Report:
target: str
started_at: str
finished_at: str = ""
checks: list = field(default_factory=list)
def add(self, c: Check, verbose: bool = False) -> None:
self.checks.append(asdict(c))
# 控制台输出
icon = "" if c.ok else ""
sev = "" if c.severity == "info" else f" [{c.severity.upper()}]"
print(f" {icon}{sev} {c.name}: {c.summary} ({c.elapsed_ms}ms)")
# 失败时:error 永远显示完整 detail + 命令;warn 默认前 12 行,--verbose 全显
if not c.ok:
if c.command:
print(f" $ {c.command}")
if c.detail:
if c.severity == "error" or verbose:
for line in c.detail.splitlines() or ["(no detail)"]:
print(f" {line}")
else:
lines = c.detail.splitlines()
for line in lines[:12]:
print(f" {line}")
if len(lines) > 12:
print(f" ... (共 {len(lines)} 行,用 --verbose 看完整)")
def summary(self) -> tuple[int, int, int]:
ok = sum(1 for c in self.checks if c["ok"])
bad = len(self.checks) - ok
err = sum(1 for c in self.checks if not c["ok"] and c["severity"] == "error")
return ok, bad, err
# ============== 远程执行抽象 ==============
class Remote:
"""统一封装: paramiko SSH 走远端, --local 直接在本机 shell。"""
def __init__(self, local: bool, host: str = "", port: int = 22,
user: str = "root", password: str = ""):
self.local = local
self.client: Optional[paramiko.SSHClient] = None
if local:
return
if paramiko is None:
print("ERROR: paramiko 未安装,远程模式需要 `pip install paramiko`", file=sys.stderr)
sys.exit(2)
pw = password or os.environ.get("REMOTE_PASS", "")
if not pw:
print("ERROR: 请先设置环境变量 REMOTE_PASS,或加 --password xxx", file=sys.stderr)
sys.exit(2)
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
c.connect(host, port=port, username=user, password=pw,
timeout=SSH_TIMEOUT, banner_timeout=SSH_TIMEOUT, auth_timeout=SSH_TIMEOUT,
allow_agent=False, look_for_keys=False)
self.client = c
def run(self, cmd: str, timeout: int = 60) -> tuple[int, str, str]:
"""执行命令,返回 (rc, stdout, stderr)。"""
if self.local:
import subprocess
try:
p = subprocess.run(cmd, shell=True, capture_output=True,
text=True, timeout=timeout)
return p.returncode, p.stdout, p.stderr
except subprocess.TimeoutExpired as e:
return 124, e.stdout or "", f"timeout after {timeout}s"
assert self.client is not None
_si, so, se = self.client.exec_command(cmd, timeout=timeout, get_pty=True)
out = so.read().decode(errors="replace")
err = se.read().decode(errors="replace")
rc = so.channel.recv_exit_status()
return rc, out, err
def close(self) -> None:
if self.client is not None:
self.client.close()
# ============== 检查项 ==============
def timed(fn: Callable) -> Callable:
def wrapper(*args, **kwargs):
t0 = time.time()
c = fn(*args, **kwargs)
c.elapsed_ms = int((time.time() - t0) * 1000)
return c
return wrapper
@timed
def check_compose_ps(remote: Remote, compose_dir: str) -> Check:
"""1.1 docker compose ps — 所有服务应 healthy / running。"""
cmd = f"cd {compose_dir} && docker compose ps --format '{{{{.Service}}}}|{{{{.State}}}}|{{{{.Status}}}}'"
rc, out, err = remote.run(cmd, timeout=30)
lines = [l.strip() for l in out.splitlines() if l.strip()]
running, unhealthy, missing = set(), set(), set(EXPECTED_SERVICES)
detail_lines = []
for line in lines:
parts = line.split("|")
if len(parts) < 3:
continue
svc, state, status = parts[0], parts[1], parts[2]
missing.discard(svc)
detail_lines.append(f" {svc:10s} {state:12s} {status}")
if state.lower() in ("running", "healthy") and "exit" not in status.lower():
running.add(svc)
elif state.lower() in ("running",) and "(healthy)" in status.lower():
running.add(svc)
else:
unhealthy.add(svc)
ok = not missing and not unhealthy and len(running) == len(EXPECTED_SERVICES)
summary = (
f"{len(running)}/{len(EXPECTED_SERVICES)} running"
if ok
else f"missing={sorted(missing) or '-'} unhealthy={sorted(unhealthy) or '-'}"
)
sev = "error" if missing else ("warn" if unhealthy else "info")
return Check("docker compose ps", "docker", ok, summary, "\n".join(detail_lines), severity=sev)
@timed
def check_container_logs(remote: Remote, compose_dir: str) -> Check:
"""1.2 最近 worker / api 日志是否有 ERROR / Traceback。"""
cmd = (
f"cd {compose_dir} && "
"docker compose logs --tail=200 --no-color worker api 2>&1 | "
"grep -E -i 'traceback|error|exception|critical' | head -20"
)
rc, out, err = remote.run(cmd, timeout=30)
out = out.strip()
if not out:
return Check("近 200 行 worker/api 日志无 ERROR", "docker",
True, "clean", severity="info")
count = len([l for l in out.splitlines() if l.strip()])
return Check("近 200 行 worker/api 日志无 ERROR", "docker",
False, f"{count} 行可疑", out, severity="warn")
@timed
def check_disk(remote: Remote) -> Check:
"""1.3 磁盘空间 — 关键挂载点使用率。"""
rc, out, err = remote.run("df -h --output=target,size,used,avail,pcent 2>/dev/null | grep -E '/$|/srv|/var$'")
out = out.strip()
high = []
for line in out.splitlines():
m = re.search(r"(\d+)%", line)
if m and int(m.group(1)) >= 85:
high.append(line.strip())
ok = not high
summary = "ok" if ok else f"高占用: {'; '.join(high)}"
return Check("磁盘空间", "docker", ok, summary, out, severity="warn" if not ok else "info")
def _parse_size_to_mb(token: str) -> float:
"""'1.9Gi' / '806Mi' / '512Ki' / '1024' 转成 MB。"""
m = re.match(r"^\s*(\d+(?:\.\d+)?)\s*([KMG]?i?B?)?\s*$", token)
if not m:
return 0.0
val = float(m.group(1))
unit = (m.group(2) or "").upper()
if unit.startswith("GI") or unit == "G":
return val * 1024
if unit.startswith("MI") or unit == "M":
return val
if unit.startswith("KI") or unit == "K":
return val / 1024
# 无单位,默认 KiB (free -h 罕见)
return val / 1024
@timed
def check_memory(remote: Remote) -> Check:
"""1.4 内存 + Swap。"""
rc, out, _ = remote.run("free -h | head -3")
out = out.strip()
high = False
pct = 0.0
for line in out.splitlines():
if line.startswith("Mem"):
parts = line.split()
# ['Mem:', 'total', 'used', 'free', 'shared', 'buff/cache', 'available']
if len(parts) >= 7:
total_mb = _parse_size_to_mb(parts[1])
used_mb = _parse_size_to_mb(parts[2])
if total_mb > 0:
pct = used_mb / total_mb * 100
if pct > 90:
high = True
summary = "ok" if not high else f">90% used ({pct:.1f}%)"
return Check("内存使用", "host", not high, summary, out,
severity="warn" if high else "info")
@timed
def check_ports(remote: Remote) -> Check:
"""1.5 关键端口监听(默认只检 80)。
ss -tln 拿到 LISTEN 行的 LocalAddress 字段( 4 ,包含 0.0.0.0:80*:443[::]:80 )
不用 -H(避免不同发行版 header 行差异); 不用 ss -l(避免加 unix socket 干扰)
"""
cmd = (
"ss -tln 2>/dev/null | "
"awk 'tolower($1) ~ /listen/ {print $4}' | sort -u"
)
rc, out, _ = remote.run(cmd)
listening = set()
for m in re.finditer(r":(\d+)$", out, re.MULTILINE):
listening.add(int(m.group(1)))
need = set(KEY_PORTS.values())
missing = sorted(need - listening)
ok = not missing
label = "/".join(str(p) for p in need)
return Check(f"关键端口 {label} 监听", "network", ok,
"ok" if ok else f"缺失 {missing}",
f"监听中: {sorted(listening)}\n# raw ss output:\n{out.strip()}",
command=cmd, severity="warn" if not ok else "info")
@timed
def check_docker_system(remote: Remote) -> Check:
"""1.6 docker system df — 卷 / 镜像 / 构建缓存占用。"""
rc, out, _ = remote.run("docker system df 2>&1")
out = out.strip()
# 看 images / build cache 是否爆掉
bloated = False
for line in out.splitlines():
if "GB" in line:
m = re.search(r"(\d+\.\d+)\s*GB", line)
if m and float(m.group(1)) > 5:
bloated = True
return Check("docker system df", "docker", not bloated,
"ok" if not bloated else "有 >5GB 的大件",
out, severity="warn" if bloated else "info")
@timed
def check_api_health(remote: Remote, api_base: str) -> Check:
"""1.7 API 健康端点。
api_base 接受两种形式:
- 完整 URL(已含路径): 'http://127.0.0.1/api/v1/healthz' 直接用
- 基础 URL: 'http://127.0.0.1:8000' 自动拼 /api/v1/healthz
"""
base = api_base.rstrip("/")
# 已经看起来是健康端点(以 /healthz 或 /health 结尾)就直接用
if base.endswith("/healthz") or base.endswith("/health"):
url = base
else:
url = f"{base}/api/v1/healthz"
cmd = (
f"curl -sS -m 5 -o /tmp/hc_body -w 'http=%{{http_code}} t=%{{time_total}}\\n' '{url}'; "
f"echo '--- body ---'; head -c 400 /tmp/hc_body 2>/dev/null; echo"
)
rc, out, _ = remote.run(cmd)
m = re.search(r"http=(\d+)", out)
code = int(m.group(1)) if m else 0
ok = 200 <= code < 400
summary = f"http={code}" + (" (✓ ok)" if ok else " (✗ failed)")
return Check(f"API {url}", "app", ok, summary, out.strip(),
command=cmd, severity="error" if not ok else "info")
@timed
def check_db_counts(remote: Remote, compose_dir: str) -> Check:
"""1.8 articles / sources 表行数(从 .env 读凭据)。"""
cmd = (
f"cd {compose_dir} && "
"set -a; . ./.env; set +a; "
"docker compose exec -T postgres psql -U \"$POSTGRES_USER\" -d \"$POSTGRES_DB\" -t -A -c "
"\"SELECT 'articles='||count(*) FROM articles;"
"SELECT 'sources='||count(*) FROM sources;"
"SELECT 'translated='||count(*) FROM articles WHERE title_zh IS NOT NULL;"
"SELECT 'untranslated_24h='||count(*) FROM articles "
" WHERE published_at > now() - interval '24 hour' AND title_zh IS NULL;\" 2>&1"
)
rc, out, _ = remote.run(cmd, timeout=30)
out = out.strip()
untrans_m = re.search(r"untranslated_24h=(\d+)", out)
untrans_24h = int(untrans_m.group(1)) if untrans_m else -1
ok = rc == 0 and untrans_24h <= 50 # 24h 内未翻译超过 50 算异常
sev = "warn" if (untrans_24h > 50 and untrans_24h <= 200) else ("error" if untrans_24h > 200 else "info")
return Check("DB 行数 articles/sources", "app", ok,
out.replace("\n", " | "),
severity=sev)
@timed
def check_llm_workflow(remote: Remote, compose_dir: str) -> Check:
"""1.13 LLM 工作流落实度:5 个步骤的状态分布 + 24h 增量。
步骤( enrichment.py:294 顺序):
1. 翻译 translation_status (translation_loop)
2. 分类 classify_status (enrichment 1 )
3. 排版 format_status (enrichment 2 ,生成 body_zh_formatted)
4. 插图 image_ai_status (enrichment 3 ,生成 image_ai_url)
5. 评论 commentary_status (enrichment 4 ,生成 commentary)
判据:
- 翻译失败的行 5% warn(但已知有可能是源站没译文源是中文等,不是 worker )
- 24h 增量中,翻译成功的文章里:
LLM 全部 n/a info(LLM 增强关闭 / 还没轮到这个 batch)
LLM 全部 ok
任一 failed 比例 20% warn(LLM 部分任务坏掉)
区分"n/a"(LLM 关了)"pending"(排队中)"ok/failed":
- LLM 没配 / 关了 n/a,这是正常状态,info
- LLM 开了但文章还没 enrich n/a + pending 共存,info
"""
# 一次拿 5 个状态的全局分布 + 24h 内翻译成功的文章里 4 个 LLM 状态的分布
sql = r"""
SELECT 'tr_glob' AS k, translation_status AS st, count(*)::int AS n
FROM articles GROUP BY translation_status
UNION ALL
SELECT 'cl_glob', classify_status, count(*)::int FROM articles GROUP BY classify_status
UNION ALL
SELECT 'fm_glob', format_status, count(*)::int FROM articles GROUP BY format_status
UNION ALL
SELECT 'im_glob', image_ai_status, count(*)::int FROM articles GROUP BY image_ai_status
UNION ALL
SELECT 'co_glob', commentary_status, count(*)::int FROM articles GROUP BY commentary_status
UNION ALL
-- 24h 内翻译成功(translation_status=ok)的文章里,4 LLM 状态分布
SELECT 'cl_24h', classify_status, count(*)::int FROM articles
WHERE translation_status='ok' AND translated_at > now()-interval '24 hour'
GROUP BY classify_status
UNION ALL
SELECT 'fm_24h', format_status, count(*)::int FROM articles
WHERE translation_status='ok' AND translated_at > now()-interval '24 hour'
GROUP BY format_status
UNION ALL
SELECT 'im_24h', image_ai_status, count(*)::int FROM articles
WHERE translation_status='ok' AND translated_at > now()-interval '24 hour'
GROUP BY image_ai_status
UNION ALL
SELECT 'co_24h', commentary_status, count(*)::int FROM articles
WHERE translation_status='ok' AND translated_at > now()-interval '24 hour'
GROUP BY commentary_status;
"""
cmd = (
f"cd {compose_dir} && "
"set -a; . ./.env; set +a; "
"docker compose exec -T postgres psql -U \"$POSTGRES_USER\" -d \"$POSTGRES_DB\" -t -A -F $'\\t' -c \""
+ sql.replace(chr(34), chr(92) + chr(34))
+ "\" 2>&1"
)
rc, out, _ = remote.run(cmd, timeout=30)
# 解析:tab 分隔,3 列 (k, st, n)
glob: dict[str, dict[str, int]] = {} # glob['tr_glob'] = {'ok': 100, 'failed': 5, ...}
for line in out.splitlines():
line = line.strip()
if line.count("\t") < 2:
continue
k, st, n_s = line.split("\t", 2)
try:
n = int(n_s)
except ValueError:
continue
glob.setdefault(k, {})[st] = n
if not glob:
return Check(
"LLM 工作流落实度(翻译/分类/排版/插图/评论)", "app", False,
"查询无结果(SQL 失败?)",
detail=out[:600],
command=cmd,
severity="error",
)
# === 1) 翻译全局健康 ===
tr = glob.get("tr_glob", {})
tr_total = sum(tr.values())
tr_failed = tr.get("failed", 0) + tr.get("partial", 0)
tr_failed_pct = (tr_failed / tr_total * 100) if tr_total else 0.0
tr_ok = tr.get("ok", 0)
# === 2) 24h 翻译成功的文章里 4 个 LLM 状态的落实度 ===
# 总样本 = cl_24h 的所有值之和(也等于其他 3 个的样本量)
llm_24h_total = sum(glob.get("cl_24h", {}).values())
llm_summary: list[str] = []
llm_issues: list[str] = []
for prefix, name in [("cl_24h", "分类"), ("fm_24h", "排版"),
("im_24h", "插图"), ("co_24h", "评论")]:
d = glob.get(prefix, {})
ok = d.get("ok", 0)
failed = d.get("failed", 0)
pending = d.get("pending", 0)
na = d.get("n/a", 0)
if llm_24h_total == 0:
llm_summary.append(f"{name}: 无 24h 翻译样本")
continue
ok_pct = ok / llm_24h_total * 100
fail_pct = failed / llm_24h_total * 100
llm_summary.append(
f"{name}: ok={ok} failed={failed} pending={pending} n/a={na} ({ok_pct:.0f}% ok)"
)
if fail_pct >= 20:
llm_issues.append(f"{name} 24h 失败率 {fail_pct:.0f}% (≥20%)")
# === 3) 全局 LLM 状态分布(用于看整体)===
glob_parts: list[str] = []
for prefix, name in [("cl_glob", "分类"), ("fm_glob", "排版"),
("im_glob", "插图"), ("co_glob", "评论")]:
d = glob.get(prefix, {})
if d:
parts = ",".join(f"{k}={v}" for k, v in sorted(d.items(), key=lambda x: -x[1])[:3])
glob_parts.append(f"{name} {parts}")
# === 4) 汇总判据 ===
issues: list[str] = []
if tr_failed_pct >= 20:
issues.append(f"翻译失败率 {tr_failed_pct:.0f}% ≥20%")
elif tr_failed_pct >= 5:
issues.append(f"翻译失败率 {tr_failed_pct:.0f}% ≥5%")
issues.extend(llm_issues)
if llm_24h_total == 0:
# 24h 内没翻译成功的文章,工作流谈不上"落实"不"落实",info 跳过
sev = "info"
summary = f"24h 内无翻译成功样本(无法评估 LLM 工作流)"
else:
sev = "error" if any("≥20%" in i and "失败" in i for i in issues) else (
"warn" if issues else "info"
)
summary = f"翻译 ok={tr_ok}/{tr_total} ({100 - tr_failed_pct:.0f}%) | " + " · ".join(llm_summary)
if issues:
summary += " · " + "; ".join(issues[:2])
detail_lines = [
f"翻译全局(全量): " + ", ".join(f"{k}={v}" for k, v in sorted(tr.items(), key=lambda x: -x[1])),
f"翻译失败率: {tr_failed_pct:.1f}%",
f"24h 已翻译文章样本: {llm_24h_total}",
] + llm_summary + [
"",
"全局 LLM 状态(全量,取 top3):",
] + [f" {p}" for p in glob_parts]
if issues:
detail_lines.append("")
detail_lines.append("⚠ 问题: " + "; ".join(issues))
ok = not issues and llm_24h_total > 0
return Check(
"LLM 工作流落实度(翻译/分类/排版/插图/评论)", "app", ok, summary,
detail="\n".join(detail_lines),
command="psql: 5 个 status 字段 × 全局/24h 分布",
severity=sev,
)
@timed
def check_translation_sample(remote: Remote, compose_dir: str, sample_n: int = 3) -> Check:
"""1.9 抽查最近 24h 内已翻译的 N 篇文章(默认 3 篇),检查翻译质量。
抽样条件: published_at > now()-24h AND title_zh IS NOT NULL
AND translation_status IN ('ok','partial')
判据(每篇):
- title_zh 非空
- body_zh_text 非空
- title_zh != title (未翻译 fallback 的典型表现)
- title_zh 长度 >= 2
整体判据:
- 没候选: info (无样本,worker 还没产出)
- 全部通过: ok
- 通过 1 / N : error (翻译管线几乎坏了)
- 通过 2..N-1: warn (部分文章翻译坏掉)
"""
# 一次拉 sample_n 条,字段用 \t 分隔,转义好 psql 输出
sql = (
f"SELECT id, "
f" coalesce(source_id::text,'?') AS src, "
f" title, "
f" title_zh, "
f" coalesce(substring(body_zh_text, 1, 200), '') AS body_zh_preview, "
f" translation_status, "
f" translation_engine, "
f" coalesce(to_char(translated_at, 'YYYY-MM-DD HH24:MI'), '-') AS tat, "
f" coalesce(lang_src,'-') AS lang, "
f" coalesce(char_length(title),0) AS tlen, "
f" coalesce(char_length(title_zh),0) AS zlen, "
f" coalesce(char_length(body_zh_text),0) AS blen "
f"FROM articles "
f"WHERE published_at > now() - interval '24 hour' "
f" AND title_zh IS NOT NULL "
f" AND translation_status IN ('ok','partial') "
f"ORDER BY random() "
f"LIMIT {sample_n};"
)
# 头部一行,方便按列对齐
header = "id\tsrc\ttitle\ttitle_zh\tbody_zh_preview\tstatus\tengine\ttranslated_at\tlang\ttlen\tzlen\tblen"
cmd = (
f"cd {compose_dir} && "
"set -a; . ./.env; set +a; "
f"echo '{header}'; "
f"docker compose exec -T postgres psql -U \"$POSTGRES_USER\" -d \"$POSTGRES_DB\" -t -A -F $'\\t' -c \"{sql.replace(chr(34), chr(92)+chr(34))}\" 2>&1"
)
rc, out, err = remote.run(cmd, timeout=30)
# 解析输出:跳过 header 行(就是 echo 的那个),保留真实数据行
lines = [l for l in out.splitlines() if l.strip() and not l.startswith("id\t")]
# 一些 psql 在 -t 模式下仍可能输出 NOTICE 之类 — 按制表符列数过滤
rows = []
for l in lines:
if l.count("\t") >= 9: # 至少 10 列
rows.append(l.split("\t"))
if not rows:
# 候选为 0 = 24h 内没有已翻译文章(可能刚启动 / 数据少)
return Check(
f"翻译抽查({sample_n}篇/24h)", "app", True,
f"无样本(24h 内暂无已翻译文章)",
detail=f"# raw output:\n{out.strip()[:500]}",
severity="info",
command=cmd,
)
# 逐篇判分
verdicts: list[tuple[bool, str]] = [] # (ok, 一行可读摘要)
bad_detail: list[str] = []
for cols in rows:
try:
(aid, src, title, title_zh, body_zh_pv, status,
engine, tat, lang, tlen, zlen, blen) = cols[:12]
except ValueError:
continue
tlen_i, zlen_i, blen_i = int(tlen or 0), int(zlen or 0), int(blen or 0)
# 判据
reasons: list[str] = []
if not title_zh.strip():
reasons.append("title_zh 空")
if not body_zh_pv.strip():
reasons.append("body_zh_text 空")
if title_zh.strip() and title.strip() and title_zh.strip() == title.strip():
reasons.append("title_zh == title(未翻译)")
if zlen_i < 2:
reasons.append(f"title_zh 长度={zlen_i}")
is_ok = len(reasons) == 0
verdicts.append((is_ok, reasons))
# 详细行:可读的"原文标题 / 译文标题 / 长度 / 状态"
t_disp = (title[:50] + "") if len(title) > 50 else title
z_disp = (title_zh[:50] + "") if len(title_zh) > 50 else title_zh
line = (f"#{aid} src={src} lang={lang} status={status} "
f"len: 原 {tlen_i} → 译 {zlen_i} (body_zh {blen_i}) "
f"engine={engine} at={tat}")
if is_ok:
line = "" + line + f"\n 原: {t_disp}\n 译: {z_disp}"
else:
line = "" + line + f"\n 原因: {'; '.join(reasons)}\n 原: {t_disp}\n 译: {z_disp}"
bad_detail.append(line)
passed = sum(1 for ok, _ in verdicts if ok)
total = len(verdicts)
if passed == total:
sev, summary = "info", f"{passed}/{total} 通过"
elif passed == 0:
sev, summary = "error", f"0/{total} 通过 ⚠ 翻译管线可能挂了"
else:
sev = "warn"
summary = f"{passed}/{total} 通过(部分文章翻译异常)"
ok_flag = (passed == total)
return Check(
f"翻译抽查({sample_n}篇/24h)", "app", ok_flag, summary,
detail="\n".join(bad_detail),
command=cmd, severity=sev,
)
@timed
def check_redis(remote: Remote, compose_dir: str) -> Check:
"""1.9 Redis ping + 内存。"""
cmd = (
f"cd {compose_dir} && "
"set -a; . ./.env; set +a; "
"docker compose exec -T redis redis-cli -a \"$REDIS_PASSWORD\" --no-auth-warning "
"ping 2>&1; "
"docker compose exec -T redis redis-cli -a \"$REDIS_PASSWORD\" --no-auth-warning "
"info memory 2>&1 | grep -E 'used_memory_human|used_memory_peak_human|maxmemory_human'"
)
rc, out, _ = remote.run(cmd, timeout=20)
pong = "PONG" in out
return Check("Redis", "app", pong, out.strip().replace("\n", " | "),
severity="error" if not pong else "info")
@timed
def check_homepage(remote: Remote, api_base: str, auth_token: str = "") -> Check:
"""1.10 首页 SPA + Feed API + 移动端适配。
前端是 Vue SPA,首页 index.html 是空壳;真正要查的是:
1) / 200 + 包含 viewport meta + 引用了 JS bundle
2) /api/v1/articles?page=1&page_size=10 返回 {items,total,total_pages},
items[].title_zh 存在(翻译过的文章会展示) 此端点需 auth
3) 移动端: index.html viewport,前端 style.css @media (max-width: 768px)
401 视为"端点需要 token,服务正常" info,不污染汇总
"""
# 1) 拉首页 HTML
rc1, html, _ = remote.run("curl -sS -m 5 http://127.0.0.1/", timeout=10)
has_viewport = "name=\"viewport\"" in html or "name='viewport'" in html
has_app_div = 'id="app"' in html
has_js = "main.ts" in html or "/src/main.ts" in html or "/assets/index-" in html
has_lang_zh = 'lang="zh-CN"' in html or "lang='zh-CN'" in html
# 2) 拉首页文章列表 API(需 auth)
api_url = f"{api_base.rstrip('/').removesuffix('/api/v1/healthz')}/api/v1/articles?page=1&page_size=10"
auth_header = ""
if auth_token:
# 用 base64 转义,避免 shell history / ps 里看见明文
tok_b64 = base64.b64encode(auth_token.encode("utf-8")).decode("ascii")
auth_header = f" -H 'Authorization: Bearer $(echo {tok_b64} | base64 -d)'"
rc2, body, _ = remote.run(
"curl -sS -m 8 '" + api_url + "'" + auth_header +
" -w '\\n---HTTP=%{http_code} TIME=%{time_total}---\\n' 2>&1",
timeout=15,
)
items: list = []
api_code = 0
total = 0
api_err = ""
try:
marker = "\n---HTTP="
if marker in body:
json_part, status_part = body.rsplit(marker, 1)
m = re.search(r"HTTP=(\d+)", status_part)
api_code = int(m.group(1)) if m else 0
else:
json_part = body
data = json.loads(json_part)
items = data.get("items") or []
total = int(data.get("total") or 0)
except Exception as e:
api_err = f"{type(e).__name__}: {e}"
data = None
# 3) 移动端断点 — 在服务端 grep 计数,避免 head 截断
css_href = ""
m = re.search(r'<link[^>]+rel="stylesheet"[^>]+href="([^"]+)"', html)
if m:
css_href = m.group(1)
mobile_768 = mobile_480 = 0
if css_href:
cmd_css = (
"curl -sS -m 8 'http://127.0.0.1" + css_href + "' | "
"grep -oc -E 'max-width:[[:space:]]*768px' || true; "
"echo ---480---; "
"curl -sS -m 8 'http://127.0.0.1" + css_href + "' | "
"grep -oc -E 'max-width:[[:space:]]*480px' || true"
)
rc3, css_out, _ = remote.run(cmd_css, timeout=15)
# 解析"数\n---480---\n数"
parts = re.split(r"---480---", css_out)
try: mobile_768 = int((parts[0].strip().splitlines() or ["0"])[-1])
except Exception: pass
try: mobile_480 = int((parts[1].strip().splitlines() or ["0"])[-1]) if len(parts) > 1 else 0
except Exception: pass
# === 汇总 ===
issues: list[str] = []
if not has_viewport: issues.append("首页 HTML 缺 viewport meta(移动端不友好)")
if not has_app_div: issues.append("首页 HTML 缺 #app 挂载点")
if not has_js: issues.append("首页 HTML 没引 JS bundle")
if not has_lang_zh: issues.append("首页 HTML lang 不是 zh-CN")
# Feed API 状态:401 没带 token 时不算 error;带 token 还 401 算 error
need_auth_msg = ""
if api_code == 401 and not auth_token:
need_auth_msg = "Feed API 401(端点需登录)— 用 --auth-user / --auth-pass 传 owner 凭据"
elif api_code != 200:
issues.append(f"Feed API 返回 {api_code} (非 200)")
if api_err:
issues.append(f"Feed API 解析失败: {api_err}")
if data is not None and not items and api_code == 200:
issues.append(f"Feed API 返回 items 为空 (total={total})")
# 译文抽样
sample = []
for it in items[:3]:
sample.append({
"id": it.get("id"),
"title": (it.get("title") or "")[:60],
"title_zh": (it.get("title_zh") or "")[:60],
"status": it.get("translation_status"),
"engine": it.get("translation_engine"),
})
has_zh = sum(1 for it in items if it.get("title_zh"))
summary_parts = [
f"html: {'' if has_viewport and has_app_div and has_js else ''}",
f"feed: {len(items)}/{total} (有译文 {has_zh})" if api_code == 200
else f"feed: http={api_code}",
f"mobile-css: {mobile_768}×768 + {mobile_480}×480" if css_href
else "mobile-css: (无 CSS 链接)",
]
summary = " · ".join(summary_parts)
if need_auth_msg:
summary += " · " + need_auth_msg
elif issues:
summary += " · " + "; ".join(issues[:2])
# 判定:HTML 元素都齐 + (有 token 拿到了数据 或 401 无 token 算 info)
html_ok = has_viewport and has_app_div and has_js and has_lang_zh
if need_auth_msg:
# 没 token → 401 → 服务正常,降级 info
ok = html_ok
sev = "info"
else:
ok = html_ok and not issues
sev = "error" if (api_code not in (0, 200) and not need_auth_msg) else (
"warn" if issues else "info"
)
detail_lines = [
f"首页 HTML: viewport={has_viewport} #app={has_app_div} js={has_js} lang-zh={has_lang_zh}",
f"Feed API: http={api_code} items={len(items)} total={total} 译过={has_zh}",
]
if css_href:
detail_lines.append(f"CSS: {css_href} mobile: 768px={mobile_768} 处, 480px={mobile_480}")
if sample:
detail_lines.append("首屏抽样:")
for s in sample:
detail_lines.append(
f" #{s['id']} {s['title']!r}{s['title_zh']!r} "
f"[{s['status']}/{s['engine']}]"
)
if need_auth_msg:
detail_lines.append("提示: " + need_auth_msg)
if issues:
detail_lines.append("问题: " + "; ".join(issues))
return Check(
"首页 SPA + Feed API + 移动端", "app", ok, summary,
detail="\n".join(detail_lines),
command=f"GET /; GET {api_url}; GET {css_href or '(no css)'}",
severity=sev,
)
@timed
def check_article_detail(remote: Remote, api_base: str, auth_token: str = "") -> Check:
"""1.11 详情页:取一篇最新已翻译文章,GET /api/v1/articles/{id},看:
- status=200
- 字段齐: title / title_zh / body_zh_text body_zh_formatted
- body_zh_formatted <div class="article-body"> (说明 LLM 排版版带了 CSS 容器)
- 移动端: meta viewport(首页的) + 详情页路由 /article/{id}
401 视为"端点需 auth" 提示用户加 --auth-user / --auth-pass,不污染汇总
"""
base = api_base.rstrip("/").removesuffix("/api/v1/healthz")
list_url = f"{base}/api/v1/articles?page=1&page_size=1"
tok_b64 = base64.b64encode(auth_token.encode("utf-8")).decode("ascii") if auth_token else ""
auth_h = f" -H 'Authorization: Bearer $(echo {tok_b64} | base64 -d)'" if tok_b64 else ""
rc, list_body, _ = remote.run(
"curl -sS -m 8 '" + list_url + "'" + auth_h + " -w '\\n---HTTP=%{http_code}---\\n' 2>&1",
timeout=10,
)
article_id = None
list_code = 0
if rc == 0 and list_body:
try:
marker = "\n---HTTP="
if marker in list_body:
json_part, status_part = list_body.rsplit(marker, 1)
m = re.search(r"HTTP=(\d+)", status_part)
list_code = int(m.group(1)) if m else 0
else:
json_part = list_body
data = json.loads(json_part)
if data.get("items"):
article_id = data["items"][0]["id"]
except Exception:
pass
if list_code == 401 and not auth_token:
return Check(
"详情页 API + 译文 CSS", "app", True,
"需 owner token(用 --auth-user / --auth-pass)",
detail=f"# raw list response:\n{list_body[:300]}",
command=f"GET {list_url} (no token)",
severity="info",
)
if not article_id:
return Check(
"详情页 API + 译文 CSS", "app", False,
f"无可用文章样本(列表 http={list_code}, items=0?)",
detail=list_body[:500],
command=list_url,
severity="warn",
)
# 拉详情
detail_url = f"{base}/api/v1/articles/{article_id}"
rc2, body2, _ = remote.run(
"curl -sS -m 8 '" + detail_url + "'" + auth_h + " -w '\\n---HTTP=%{http_code}---\\n' 2>&1",
timeout=10,
)
api_code = 0
article = {}
parse_err = ""
try:
marker = "\n---HTTP="
if marker in body2:
json_part, status_part = body2.rsplit(marker, 1)
else:
json_part, status_part = body2, ""
m = re.search(r"HTTP=(\d+)", status_part)
api_code = int(m.group(1)) if m else 0
article = json.loads(json_part)
except Exception as e:
parse_err = f"{type(e).__name__}: {e}"
if api_code != 200 or not article:
return Check(
f"详情页 API #{article_id} + 译文 CSS", "app", False,
f"http={api_code} parse_err={parse_err or '-'}",
detail=body2[:500],
command=detail_url,
severity="error",
)
# 判据
title = article.get("title") or ""
title_zh = article.get("title_zh") or ""
body_zh_text = article.get("body_zh_text") or ""
body_zh_formatted = article.get("body_zh_formatted") or ""
body_zh_html = article.get("body_zh_html") or ""
fmt_status = article.get("format_status") or "n/a"
tr_status = article.get("translation_status") or "-"
tr_engine = article.get("translation_engine") or "-"
issues: list[str] = []
if not title_zh: issues.append("缺 title_zh(无译文)")
if not (body_zh_text or body_zh_formatted or body_zh_html):
issues.append("缺 body_zh_text/formatted/html(译文全空)")
if title_zh and title and title_zh.strip() == title.strip():
issues.append("title_zh == title(未翻译)")
has_css_container = (
'class="article-body"' in body_zh_formatted
or "class='article-body'" in body_zh_formatted
)
css_info = "✓ 排版版带 .article-body 容器" if has_css_container else (
"✗ 排版版缺 .article-body 容器(译文没套 CSS)"
if body_zh_formatted
else "— 无排版版(用原始译文展示)"
)
if not has_css_container and body_zh_formatted:
issues.append("排版版 body_zh_formatted 缺 .article-body CSS 容器")
summary = (
f"#{article_id} {tr_status}/{tr_engine} fmt={fmt_status} "
f"译字 {len(title_zh)}/{len(body_zh_text)}; CSS {css_info}"
)
if issues:
summary += " · " + "; ".join(issues[:2])
detail_lines = [
f"原标题: {title[:80]!r}",
f"译标题: {title_zh[:80]!r}",
f"body_zh_text 长度: {len(body_zh_text)}",
f"body_zh_formatted 长度: {len(body_zh_formatted)} status={fmt_status}",
f"body_zh_html 长度: {len(body_zh_html)}",
f"CSS 容器(.article-body): {'' if has_css_container else ''}",
]
# 抽 body_zh_formatted 前 300 字符(可能 < 字符被转义了)
if body_zh_formatted:
detail_lines.append(f"body_zh_formatted 前 300: {body_zh_formatted[:300]!r}")
ok = not issues and api_code == 200
sev = "error" if (api_code != 200) else ("warn" if issues else "info")
return Check(
f"详情页 API #{article_id} + 译文 CSS", "app", ok, summary,
detail="\n".join(detail_lines),
command=detail_url,
severity=sev,
)
@timed
def check_agnes_llm(remote: Remote, compose_dir: str) -> Check:
"""1.12 Agnes LLM 健康:真发一次 chat/completions 调用。
- .env AGNES_API_KEY / AGNES_BASE_URL / AGNES_CHAT_MODEL
- 没配 info 跳过(LLM 增强是可选模块)
- 配了 发一次最小调用(max_tokens=8, prompt) 200 + choices[0].message.content
注意:为了避免 API 密钥泄露到 shell history,密钥用 base64 编码后
在远程 shell decode 出来,再注入到 curl Header
"""
# 1) 读 .env 拿 3 个变量
rc, env_out, _ = remote.run(
f"cd {compose_dir} 2>/dev/null && "
"grep -E '^(AGNES_API_KEY|AGNES_BASE_URL|AGNES_CHAT_MODEL)=' .env 2>/dev/null"
)
api_key = base_url = model = ""
for line in env_out.splitlines():
m = re.match(r"^AGNES_API_KEY=(.+)$", line)
if m: api_key = m.group(1).strip().strip('"').strip("'")
m = re.match(r"^AGNES_BASE_URL=(.+)$", line)
if m: base_url = m.group(1).strip().strip('"').strip("'")
m = re.match(r"^AGNES_CHAT_MODEL=(.+)$", line)
if m: model = m.group(1).strip().strip('"').strip("'")
if not api_key or api_key.startswith("your_"):
return Check(
"Agnes LLM 联通", "app", True,
"未配 AGNES_API_KEY(LLM 增强模块关闭),跳过",
detail=env_out.strip()[:300],
severity="info",
)
base_url = base_url or "https://apihub.agnes-ai.com/v1"
model = model or "agnes-2.0-flash"
chat_url = f"{base_url.rstrip('/')}/chat/completions"
# 2) base64 编码密钥 + payload,再在 shell 里 decode 出来拼 header
# 避免 API key 出现在 process list / history 里
key_b64 = base64.b64encode(api_key.encode("utf-8")).decode("ascii")
payload_obj = {
"model": model,
"messages": [
{"role": "system", "content": "You are a ping bot. Reply with a single word."},
{"role": "user", "content": "ping"},
],
"max_tokens": 8,
"temperature": 0,
}
payload_b64 = base64.b64encode(
json.dumps(payload_obj, ensure_ascii=False).encode("utf-8")
).decode("ascii")
cmd = (
f"KEY_B64={key_b64}; "
f"PAYLOAD_B64={payload_b64}; "
"BODY=$(echo \"$PAYLOAD_B64\" | base64 -d); "
f"curl -sS -m 25 -o /tmp/agnes_resp -w 'http=%{{http_code}} t=%{{time_total}}\\n' "
"-H \"Authorization: Bearer $(echo $KEY_B64 | base64 -d)\" "
"-H 'Content-Type: application/json' "
f"-d \"$BODY\" '{chat_url}'; "
"echo '--- body (first 400 chars) ---'; head -c 400 /tmp/agnes_resp 2>/dev/null; echo"
)
rc2, out, _ = remote.run(cmd, timeout=40)
# 解析
m = re.search(r"http=(\d+)\s+t=([\d.]+)", out)
code = int(m.group(1)) if m else 0
elapsed = float(m.group(2)) if m else 0
body_str = ""
if "--- body" in out:
body_str = out.split("--- body", 1)[1].split("---", 1)[-1].strip()
if code != 200:
return Check(
f"Agnes LLM chat 调用", "app", False,
f"http={code} t={elapsed:.1f}s",
detail=out[:600],
command=f"POST {chat_url} (auth via base64-decoded key, not echoed)",
severity="error",
)
# 看返回里有没有 text
try:
resp = json.loads(out.split("--- body", 1)[-1].split("---", 1)[-1].strip() or body_str)
text = (resp.get("choices") or [{}])[0].get("message", {}).get("content", "")
except Exception:
text = ""
ok = code == 200 and bool(text)
summary = f"http={code} t={elapsed:.1f}s model={model} reply={text[:30]!r}"
return Check(
"Agnes LLM chat 调用", "app", ok, summary,
detail=f"# model: {model}\n# base_url: {base_url}\n# raw:\n{out[:800]}",
command=f"POST {chat_url}",
severity="info" if ok else "warn",
)
@timed
def check_caddy(remote: Remote) -> Check:
"""1.10 Caddy 反代 — 80 端口根路径 200/301/302。"""
cmd = "curl -sS -m 5 -o /dev/null -w 'http=%{http_code} t=%{time_total}\\n' http://127.0.0.1/"
rc, out, _ = remote.run(cmd)
m = re.search(r"http=(\d+)", out)
code = int(m.group(1)) if m else 0
ok = 200 <= code < 400
return Check("Caddy http://127.0.0.1/", "app", ok, out.strip(),
severity="error" if not ok else "info")
@timed
def check_frontend(remote: Remote) -> Check:
"""1.11 Frontend — 80 端口 / 返回 index.html。"""
cmd = (
"curl -sS -m 5 -o /dev/null -w 'http=%{http_code} t=%{time_total} ct=%{content_type}\\n' http://127.0.0.1/; "
"curl -sS -m 5 http://127.0.0.1/ | head -3"
)
rc, out, _ = remote.run(cmd)
m = re.search(r"http=(\d+)", out)
code = int(m.group(1)) if m else 0
ok = 200 <= code < 400 and ("html" in out.lower() or "<!doctype" in out.lower())
return Check("Frontend 首页", "app", ok, out.splitlines()[0] if out else "",
out, severity="warn" if not ok else "info")
@timed
def check_tls_cert(remote: Remote) -> Check:
"""1.12 HTTPS 证书 — 仅在 .env 里 DOMAIN 非空时检查。"""
# 先从 .env 读 DOMAIN 值(没配就跳过)
rc, env_out, _ = remote.run(
f"cd {COMPOSE_DIR} 2>/dev/null && "
"grep -E '^DOMAIN=' .env 2>/dev/null | head -1"
)
domain = ""
for line in env_out.splitlines():
m = re.match(r"^DOMAIN=(.+)$", line.strip())
if m:
domain = m.group(1).strip().strip('"').strip("'")
break
if not domain:
return Check("HTTPS 证书(域名)", "app", True,
"未配 DOMAIN,跳过(走 IP 模式)", severity="info")
# 有域名,拉证书
cmd2 = f"echo | openssl s_client -servername {domain} -connect {domain}:443 2>/dev/null | openssl x509 -noout -dates 2>&1"
rc2, out2, _ = remote.run(cmd2, timeout=15)
m = re.search(r"notAfter=(.+)", out2)
if not m:
return Check(f"HTTPS 证书 {domain}", "app", False,
"无法获取证书(可能 443 未开)", out2, severity="warn")
return Check(f"HTTPS 证书 {domain}", "app", True, f"notAfter={m.group(1).strip()}",
severity="info")
@timed
def check_docker_logs_size(remote: Remote, compose_dir: str) -> Check:
"""1.13 日志卷积压。"""
cmd = (
f"cd {compose_dir} && "
"docker compose logs --no-color --tail=0 2>&1 >/dev/null; "
"du -sh /var/lib/docker/containers/*/*-json.log 2>/dev/null | sort -h | tail -5"
)
rc, out, _ = remote.run(cmd, timeout=20)
big = []
for line in out.splitlines():
m = re.match(r"(\d+)([KMG]?)\s+", line.strip())
if not m: continue
size, unit = int(m.group(1)), m.group(2)
mb = size * (1024 if unit == "G" else 1 if unit == "M" else 1/1024)
if unit == "G" or (unit == "M" and size > 200):
big.append(line.strip())
return Check("容器日志大小", "docker", not big,
"ok" if not big else f"大日志: {'; '.join(big)}",
out, severity="warn" if big else "info")
# ============== 主流程 ==============
GROUPS: dict[str, list[Callable]] = {
"docker": [
("docker compose ps", lambda r: check_compose_ps(r, COMPOSE_DIR)),
("近 200 行 worker/api 日志", lambda r: check_container_logs(r, COMPOSE_DIR)),
("docker system df", lambda r: check_docker_system(r)),
("容器日志大小", lambda r: check_docker_logs_size(r, COMPOSE_DIR)),
],
"host": [
("磁盘空间", lambda r: check_disk(r)),
("内存使用", lambda r: check_memory(r)),
],
"network": [
("关键端口监听", lambda r: check_ports(r)),
],
"app": [
("API 健康", lambda r: check_api_health(r, API_BASE)),
("Redis ping", lambda r: check_redis(r, COMPOSE_DIR)),
("DB 行数", lambda r: check_db_counts(r, COMPOSE_DIR)),
("LLM 工作流落实度", lambda r: check_llm_workflow(r, COMPOSE_DIR)),
(f"翻译抽查", lambda r: check_translation_sample(r, COMPOSE_DIR, SAMPLE_N)),
("Caddy 反代", lambda r: check_caddy(r)),
("Frontend 首页", lambda r: check_frontend(r)),
("首页 SPA + Feed API", lambda r: check_homepage(r, API_BASE, AUTH_TOKEN)),
("详情页 + 译文 CSS", lambda r: check_article_detail(r, API_BASE, AUTH_TOKEN)),
("Agnes LLM 调用", lambda r: check_agnes_llm(r, COMPOSE_DIR)),
("HTTPS 证书", lambda r: check_tls_cert(r)),
],
}
def main() -> int:
ap = argparse.ArgumentParser(
description="diary-news 服务器健康检查",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="示例:\n"
" python healthcheck.py # 跑全部\n"
" python healthcheck.py --only docker,app # 只跑 docker 和 app 组\n"
" python healthcheck.py --local --compose-dir . # 服务器本地跑\n"
" python healthcheck.py --json report.json # 导出结构化报告\n",
)
ap.add_argument("--local", action="store_true", help="在服务器本地跑,不走 SSH")
ap.add_argument("--host", default=os.environ.get("REMOTE_HOST", DEFAULT_HOST))
ap.add_argument("--port", type=int, default=int(os.environ.get("REMOTE_PORT", DEFAULT_PORT)))
ap.add_argument("--user", default=os.environ.get("REMOTE_USER", DEFAULT_USER))
ap.add_argument("--password", default=os.environ.get("REMOTE_PASS", ""))
ap.add_argument("--compose-dir", default=os.environ.get("COMPOSE_DIR", DEFAULT_COMPOSE))
ap.add_argument("--api-base", default=os.environ.get("API_BASE_URL", DEFAULT_API_BASE))
ap.add_argument("--only", help="逗号分隔的组名: docker,host,network,app")
ap.add_argument("--skip", help="逗号分隔的组名,跳过")
ap.add_argument("--json", dest="json_out", help="把结果写到 JSON 文件")
ap.add_argument("--quiet", action="store_true", help="只输出汇总")
ap.add_argument("--verbose", "-v", action="store_true",
help="显示失败项的完整原始输出(默认 warn 截断 12 行)")
ap.add_argument("--sample", type=int, default=3,
help="翻译抽查的文章数(默认 3 篇,24h 内已翻译的随机样本)")
ap.add_argument("--auth-user", default=os.environ.get("OWNER_USER", "owner"),
help="owner 用户名(用于获取 JWT token,调 /api/v1/auth/login)")
ap.add_argument("--auth-pass", default=os.environ.get("OWNER_PASS", ""),
help="owner 密码(env: OWNER_PASS)。如不传,API 端点会降级为 info(不污染汇总)")
ap.add_argument("--skip-auth", action="store_true",
help="明确跳过 auth token,等价于不传 --auth-pass")
args = ap.parse_args()
global COMPOSE_DIR, API_BASE, SAMPLE_N, AUTH_TOKEN
COMPOSE_DIR = args.compose_dir
API_BASE = args.api_base
SAMPLE_N = max(1, min(args.sample, 20)) # 1..20 封顶,避免误传爆 1000
# 提前在 main 函数顶部声明,稍后赋值后,GROUPS 里的 lambda 能读到
only = set((args.only or "").split(",")) - {""}
skip = set((args.skip or "").split(",")) - {""}
target = "local" if args.local else f"{args.user}@{args.host}:{args.port}"
print(f"==== diary-news 健康检查 ====")
print(f"目标: {target}")
print(f"目录: {COMPOSE_DIR}")
print(f"时间: {time.strftime('%Y-%m-%d %H:%M:%S %Z')}")
print()
remote = Remote(local=args.local, host=args.host, port=args.port,
user=args.user, password=args.password)
report = Report(target=target, started_at=time.strftime("%Y-%m-%dT%H:%M:%S%z"))
# ===== 拿 owner token(可选)=====
# 必须无条件初始化:即使跳过了 login,AUTH_TOKEN 也要在模块 dict 里,
# 否则 GROUPS 里的 lambda 闭包查找时会 NameError。
global AUTH_TOKEN
AUTH_TOKEN = ""
if not args.skip_auth and args.auth_pass:
# base64 编码密码再传,避免出现在 process list
pw_b64 = base64.b64encode(args.auth_pass.encode("utf-8")).decode("ascii")
login_url = f"{API_BASE.rstrip('/').removesuffix('/api/v1/healthz')}/api/v1/auth/login"
login_cmd = (
f"PW_B64={pw_b64}; "
"BODY=$(printf '{\"username\":\"%s\",\"password\":\"'\"$(echo $PW_B64 | base64 -d)\"'\"}' \""
+ args.auth_user + "\"); "
"curl -sS -m 8 -o /tmp/login_resp -w 'http=%{http_code}\\n' "
f"-H 'Content-Type: application/json' -d \"$BODY\" '{login_url}'; "
"echo '--- token (jwt header only) ---'; "
"head -c 200 /tmp/login_resp 2>/dev/null; echo"
)
rc, out, _ = remote.run(login_cmd, timeout=15)
m = re.search(r"http=(\d+)", out)
if m and m.group(1) == "200":
try:
body_str = out.rsplit("--- token (jwt header only) ---", 1)[-1].strip()
body_str = body_str.rstrip("---").strip()
resp = json.loads(body_str)
AUTH_TOKEN = resp.get("access_token") or resp.get("accessToken") or resp.get("token") or ""
except Exception as e:
print(f" ⚠ auth: 解析响应失败 {e}")
if AUTH_TOKEN:
print(f" ✓ auth: 已登录 owner='{args.auth_user}', token 长度 {len(AUTH_TOKEN)}")
else:
code_str = m.group(1) if m else "?"
print(f" ⚠ auth: 登录失败 http={code_str}, API 检查项将无 token(降级 info)")
else:
print(" · auth: 未传 --auth-pass(API 检查项将降级为 info 提示)")
try:
for group, fns in GROUPS.items():
if only and group not in only: continue
if skip and group in skip: continue
print(f"--- [{group}] ---")
for name, fn in fns:
try:
c = fn(remote)
if not args.quiet:
report.add(c, verbose=args.verbose)
else:
report.checks.append(asdict(c))
except Exception as e:
err_c = Check(name, group, False, f"异常: {e}",
detail=f"type={type(e).__name__}\n{type(e).__doc__ or ''}",
severity="error")
if not args.quiet:
report.add(err_c, verbose=args.verbose)
else:
report.checks.append(asdict(err_c))
print()
finally:
remote.close()
report.finished_at = time.strftime("%Y-%m-%dT%H:%M:%S%z")
ok, bad, err = report.summary()
print(f"==== 汇总 ====")
print(f" 合计 {len(report.checks)} 项 · 通过 {ok} · 失败 {bad} · 严重错误 {err}")
if err > 0:
print(f" ✗ 存在 {err} 个 error 级问题,建议立即排查")
code = 2
elif bad > 0:
print(f" ⚠ 存在 {bad} 个 warn 级问题,建议看一下")
code = 1
else:
print(f" ✓ 全部通过")
code = 0
if args.json_out:
with open(args.json_out, "w", encoding="utf-8") as f:
json.dump(asdict(report), f, ensure_ascii=False, indent=2)
print(f" 报告已写入: {args.json_out}")
return code
if __name__ == "__main__":
sys.exit(main())