Files
diary-news/scripts/healthcheck.py
2026-06-11 17:24:46 +08:00

1289 lines
53 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""diary-news 服务器健康检查 checklist。
可在本机跑(SSH 远端)或在服务器上直接跑(用 --local)。
走 docker compose 的 6 个服务:postgres / redis / api / worker / caddy / frontend,
外加主机层面的端口/磁盘/内存/日志。
依赖:
pip install paramiko
用法(Windows PowerShell):
$env:REMOTE_PASS = '你的root密码'
python scripts/healthcheck.py
python scripts/healthcheck.py --local # 在服务器上直接跑
python scripts/healthcheck.py --host 1.2.3.4 --port 22 --user news
python scripts/healthcheck.py --only docker,disk # 只跑指定组
python scripts/healthcheck.py --json out.json # 导出结构化结果
环境变量(可覆盖默认值):
REMOTE_HOST 207.57.129.228
REMOTE_PORT 19717
REMOTE_USER root
REMOTE_PASS (SSH 必填; --local 不需要)
COMPOSE_DIR /srv/news
API_BASE_URL http://127.0.0.1:8000 # API 健康检查端点
"""
from __future__ import annotations
import argparse
import base64
import json
import os
import re
import sys
import time
from dataclasses import dataclass, field, asdict
from typing import Callable, Optional
# 可选依赖:只在远程模式下需要
try:
import paramiko # type: ignore
except ImportError:
paramiko = None # --local 模式不强制
# ============== 配置 ==============
DEFAULT_HOST = "207.57.129.228"
DEFAULT_PORT = 19717
DEFAULT_USER = "root"
DEFAULT_COMPOSE = "/srv/news"
DEFAULT_API_BASE = "http://127.0.0.1/api/v1/healthz" # 走 Caddy 80 反代到 api:8000
SSH_TIMEOUT = 30
# docker-compose.yml 里声明的 6 个服务
EXPECTED_SERVICES = ["postgres", "redis", "api", "worker", "caddy", "frontend"]
# 关键端口(默认只检对外服务的 80;其他按需加)
KEY_PORTS = {
"http": 80, # Caddy / Frontend 对外端口
}
# ============== 数据结构 ==============
@dataclass
class Check:
name: str
group: str
ok: bool
summary: str
detail: str = ""
elapsed_ms: int = 0
severity: str = "info" # info / warn / error
command: str = "" # 执行的命令(失败时方便复现)
@dataclass
class Report:
target: str
started_at: str
finished_at: str = ""
checks: list = field(default_factory=list)
def add(self, c: Check, verbose: bool = False) -> None:
self.checks.append(asdict(c))
# 控制台输出
icon = "" if c.ok else ""
sev = "" if c.severity == "info" else f" [{c.severity.upper()}]"
print(f" {icon}{sev} {c.name}: {c.summary} ({c.elapsed_ms}ms)")
# 失败时:error 永远显示完整 detail + 命令;warn 默认前 12 行,--verbose 全显
if not c.ok:
if c.command:
print(f" $ {c.command}")
if c.detail:
if c.severity == "error" or verbose:
for line in c.detail.splitlines() or ["(no detail)"]:
print(f" {line}")
else:
lines = c.detail.splitlines()
for line in lines[:12]:
print(f" {line}")
if len(lines) > 12:
print(f" ... (共 {len(lines)} 行,用 --verbose 看完整)")
def summary(self) -> tuple[int, int, int]:
ok = sum(1 for c in self.checks if c["ok"])
bad = len(self.checks) - ok
err = sum(1 for c in self.checks if not c["ok"] and c["severity"] == "error")
return ok, bad, err
# ============== 远程执行抽象 ==============
class Remote:
"""统一封装: paramiko SSH 走远端, --local 直接在本机 shell。"""
def __init__(self, local: bool, host: str = "", port: int = 22,
user: str = "root", password: str = ""):
self.local = local
self.client: Optional[paramiko.SSHClient] = None
if local:
return
if paramiko is None:
print("ERROR: paramiko 未安装,远程模式需要 `pip install paramiko`", file=sys.stderr)
sys.exit(2)
pw = password or os.environ.get("REMOTE_PASS", "")
if not pw:
print("ERROR: 请先设置环境变量 REMOTE_PASS,或加 --password xxx", file=sys.stderr)
sys.exit(2)
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
c.connect(host, port=port, username=user, password=pw,
timeout=SSH_TIMEOUT, banner_timeout=SSH_TIMEOUT, auth_timeout=SSH_TIMEOUT,
allow_agent=False, look_for_keys=False)
self.client = c
def run(self, cmd: str, timeout: int = 60) -> tuple[int, str, str]:
"""执行命令,返回 (rc, stdout, stderr)。"""
if self.local:
import subprocess
try:
p = subprocess.run(cmd, shell=True, capture_output=True,
text=True, timeout=timeout)
return p.returncode, p.stdout, p.stderr
except subprocess.TimeoutExpired as e:
return 124, e.stdout or "", f"timeout after {timeout}s"
assert self.client is not None
_si, so, se = self.client.exec_command(cmd, timeout=timeout, get_pty=True)
out = so.read().decode(errors="replace")
err = se.read().decode(errors="replace")
rc = so.channel.recv_exit_status()
return rc, out, err
def close(self) -> None:
if self.client is not None:
self.client.close()
# ============== 检查项 ==============
def timed(fn: Callable) -> Callable:
def wrapper(*args, **kwargs):
t0 = time.time()
c = fn(*args, **kwargs)
c.elapsed_ms = int((time.time() - t0) * 1000)
return c
return wrapper
@timed
def check_compose_ps(remote: Remote, compose_dir: str) -> Check:
"""1.1 docker compose ps — 所有服务应 healthy / running。"""
cmd = f"cd {compose_dir} && docker compose ps --format '{{{{.Service}}}}|{{{{.State}}}}|{{{{.Status}}}}'"
rc, out, err = remote.run(cmd, timeout=30)
lines = [l.strip() for l in out.splitlines() if l.strip()]
running, unhealthy, missing = set(), set(), set(EXPECTED_SERVICES)
detail_lines = []
for line in lines:
parts = line.split("|")
if len(parts) < 3:
continue
svc, state, status = parts[0], parts[1], parts[2]
missing.discard(svc)
detail_lines.append(f" {svc:10s} {state:12s} {status}")
if state.lower() in ("running", "healthy") and "exit" not in status.lower():
running.add(svc)
elif state.lower() in ("running",) and "(healthy)" in status.lower():
running.add(svc)
else:
unhealthy.add(svc)
ok = not missing and not unhealthy and len(running) == len(EXPECTED_SERVICES)
summary = (
f"{len(running)}/{len(EXPECTED_SERVICES)} running"
if ok
else f"missing={sorted(missing) or '-'} unhealthy={sorted(unhealthy) or '-'}"
)
sev = "error" if missing else ("warn" if unhealthy else "info")
return Check("docker compose ps", "docker", ok, summary, "\n".join(detail_lines), severity=sev)
@timed
def check_container_logs(remote: Remote, compose_dir: str) -> Check:
"""1.2 最近 worker / api 日志是否有 ERROR / Traceback。"""
cmd = (
f"cd {compose_dir} && "
"docker compose logs --tail=200 --no-color worker api 2>&1 | "
"grep -E -i 'traceback|error|exception|critical' | head -20"
)
rc, out, err = remote.run(cmd, timeout=30)
out = out.strip()
if not out:
return Check("近 200 行 worker/api 日志无 ERROR", "docker",
True, "clean", severity="info")
count = len([l for l in out.splitlines() if l.strip()])
return Check("近 200 行 worker/api 日志无 ERROR", "docker",
False, f"{count} 行可疑", out, severity="warn")
@timed
def check_disk(remote: Remote) -> Check:
"""1.3 磁盘空间 — 关键挂载点使用率。"""
rc, out, err = remote.run("df -h --output=target,size,used,avail,pcent 2>/dev/null | grep -E '/$|/srv|/var$'")
out = out.strip()
high = []
for line in out.splitlines():
m = re.search(r"(\d+)%", line)
if m and int(m.group(1)) >= 85:
high.append(line.strip())
ok = not high
summary = "ok" if ok else f"高占用: {'; '.join(high)}"
return Check("磁盘空间", "docker", ok, summary, out, severity="warn" if not ok else "info")
def _parse_size_to_mb(token: str) -> float:
"""'1.9Gi' / '806Mi' / '512Ki' / '1024' 转成 MB。"""
m = re.match(r"^\s*(\d+(?:\.\d+)?)\s*([KMG]?i?B?)?\s*$", token)
if not m:
return 0.0
val = float(m.group(1))
unit = (m.group(2) or "").upper()
if unit.startswith("GI") or unit == "G":
return val * 1024
if unit.startswith("MI") or unit == "M":
return val
if unit.startswith("KI") or unit == "K":
return val / 1024
# 无单位,默认 KiB (free -h 罕见)
return val / 1024
@timed
def check_memory(remote: Remote) -> Check:
"""1.4 内存 + Swap。"""
rc, out, _ = remote.run("free -h | head -3")
out = out.strip()
high = False
pct = 0.0
for line in out.splitlines():
if line.startswith("Mem"):
parts = line.split()
# ['Mem:', 'total', 'used', 'free', 'shared', 'buff/cache', 'available']
if len(parts) >= 7:
total_mb = _parse_size_to_mb(parts[1])
used_mb = _parse_size_to_mb(parts[2])
if total_mb > 0:
pct = used_mb / total_mb * 100
if pct > 90:
high = True
summary = "ok" if not high else f">90% used ({pct:.1f}%)"
return Check("内存使用", "host", not high, summary, out,
severity="warn" if high else "info")
@timed
def check_ports(remote: Remote) -> Check:
"""1.5 关键端口监听(默认只检 80)。
用 ss -tln 拿到 LISTEN 行的 LocalAddress 字段(第 4 列,包含 0.0.0.0:80、*:443、[::]:80 等)。
不用 -H(避免不同发行版 header 行差异); 不用 ss -l(避免加 unix socket 干扰)。
"""
cmd = (
"ss -tln 2>/dev/null | "
"awk 'tolower($1) ~ /listen/ {print $4}' | sort -u"
)
rc, out, _ = remote.run(cmd)
listening = set()
for m in re.finditer(r":(\d+)$", out, re.MULTILINE):
listening.add(int(m.group(1)))
need = set(KEY_PORTS.values())
missing = sorted(need - listening)
ok = not missing
label = "/".join(str(p) for p in need)
return Check(f"关键端口 {label} 监听", "network", ok,
"ok" if ok else f"缺失 {missing}",
f"监听中: {sorted(listening)}\n# raw ss output:\n{out.strip()}",
command=cmd, severity="warn" if not ok else "info")
@timed
def check_docker_system(remote: Remote) -> Check:
"""1.6 docker system df — 卷 / 镜像 / 构建缓存占用。"""
rc, out, _ = remote.run("docker system df 2>&1")
out = out.strip()
# 看 images / build cache 是否爆掉
bloated = False
for line in out.splitlines():
if "GB" in line:
m = re.search(r"(\d+\.\d+)\s*GB", line)
if m and float(m.group(1)) > 5:
bloated = True
return Check("docker system df", "docker", not bloated,
"ok" if not bloated else "有 >5GB 的大件",
out, severity="warn" if bloated else "info")
@timed
def check_api_health(remote: Remote, api_base: str) -> Check:
"""1.7 API 健康端点。
api_base 接受两种形式:
- 完整 URL(已含路径): 'http://127.0.0.1/api/v1/healthz' → 直接用
- 基础 URL: 'http://127.0.0.1:8000' → 自动拼 /api/v1/healthz
"""
base = api_base.rstrip("/")
# 已经看起来是健康端点(以 /healthz 或 /health 结尾)就直接用
if base.endswith("/healthz") or base.endswith("/health"):
url = base
else:
url = f"{base}/api/v1/healthz"
cmd = (
f"curl -sS -m 5 -o /tmp/hc_body -w 'http=%{{http_code}} t=%{{time_total}}\\n' '{url}'; "
f"echo '--- body ---'; head -c 400 /tmp/hc_body 2>/dev/null; echo"
)
rc, out, _ = remote.run(cmd)
m = re.search(r"http=(\d+)", out)
code = int(m.group(1)) if m else 0
ok = 200 <= code < 400
summary = f"http={code}" + (" (✓ ok)" if ok else " (✗ failed)")
return Check(f"API {url}", "app", ok, summary, out.strip(),
command=cmd, severity="error" if not ok else "info")
@timed
def check_db_counts(remote: Remote, compose_dir: str) -> Check:
"""1.8 articles / sources 表行数(从 .env 读凭据)。"""
cmd = (
f"cd {compose_dir} && "
"set -a; . ./.env; set +a; "
"docker compose exec -T postgres psql -U \"$POSTGRES_USER\" -d \"$POSTGRES_DB\" -t -A -c "
"\"SELECT 'articles='||count(*) FROM articles;"
"SELECT 'sources='||count(*) FROM sources;"
"SELECT 'translated='||count(*) FROM articles WHERE title_zh IS NOT NULL;"
"SELECT 'untranslated_24h='||count(*) FROM articles "
" WHERE published_at > now() - interval '24 hour' AND title_zh IS NULL;\" 2>&1"
)
rc, out, _ = remote.run(cmd, timeout=30)
out = out.strip()
untrans_m = re.search(r"untranslated_24h=(\d+)", out)
untrans_24h = int(untrans_m.group(1)) if untrans_m else -1
ok = rc == 0 and untrans_24h <= 50 # 24h 内未翻译超过 50 算异常
sev = "warn" if (untrans_24h > 50 and untrans_24h <= 200) else ("error" if untrans_24h > 200 else "info")
return Check("DB 行数 articles/sources", "app", ok,
out.replace("\n", " | "),
severity=sev)
@timed
def check_llm_workflow(remote: Remote, compose_dir: str) -> Check:
"""1.13 LLM 工作流落实度:5 个步骤的状态分布 + 24h 增量。
步骤(按 enrichment.py:294 顺序):
1. 翻译 translation_status (translation_loop)
2. 分类 classify_status (enrichment 第 1 步)
3. 排版 format_status (enrichment 第 2 步,生成 body_zh_formatted)
4. 插图 image_ai_status (enrichment 第 3 步,生成 image_ai_url)
5. 评论 commentary_status (enrichment 第 4 步,生成 commentary)
判据:
- 翻译失败的行 ≥ 5% → warn(但已知有可能是源站没译文、源是中文等,不是 worker 锅)
- 24h 增量中,翻译成功的文章里:
LLM 全部 n/a → info(LLM 增强关闭 / 还没轮到这个 batch)
LLM 全部 ok → ✓ 好
任一 failed 比例 ≥ 20% → warn(LLM 部分任务坏掉)
区分"n/a"(LLM 关了)和"pending"(排队中)和"ok/failed":
- LLM 没配 / 关了 → 全 n/a,这是正常状态,info
- LLM 开了但文章还没 enrich 完 → n/a + pending 共存,info
"""
# 一次拿 5 个状态的全局分布 + 24h 内翻译成功的文章里 4 个 LLM 状态的分布
sql = r"""
SELECT 'tr_glob' AS k, translation_status AS st, count(*)::int AS n
FROM articles GROUP BY translation_status
UNION ALL
SELECT 'cl_glob', classify_status, count(*)::int FROM articles GROUP BY classify_status
UNION ALL
SELECT 'fm_glob', format_status, count(*)::int FROM articles GROUP BY format_status
UNION ALL
SELECT 'im_glob', image_ai_status, count(*)::int FROM articles GROUP BY image_ai_status
UNION ALL
SELECT 'co_glob', commentary_status, count(*)::int FROM articles GROUP BY commentary_status
UNION ALL
-- 24h 内翻译成功(translation_status=ok)的文章里,4 个 LLM 状态分布
SELECT 'cl_24h', classify_status, count(*)::int FROM articles
WHERE translation_status='ok' AND translated_at > now()-interval '24 hour'
GROUP BY classify_status
UNION ALL
SELECT 'fm_24h', format_status, count(*)::int FROM articles
WHERE translation_status='ok' AND translated_at > now()-interval '24 hour'
GROUP BY format_status
UNION ALL
SELECT 'im_24h', image_ai_status, count(*)::int FROM articles
WHERE translation_status='ok' AND translated_at > now()-interval '24 hour'
GROUP BY image_ai_status
UNION ALL
SELECT 'co_24h', commentary_status, count(*)::int FROM articles
WHERE translation_status='ok' AND translated_at > now()-interval '24 hour'
GROUP BY commentary_status;
"""
cmd = (
f"cd {compose_dir} && "
"set -a; . ./.env; set +a; "
"docker compose exec -T postgres psql -U \"$POSTGRES_USER\" -d \"$POSTGRES_DB\" -t -A -F $'\\t' -c \""
+ sql.replace(chr(34), chr(92) + chr(34))
+ "\" 2>&1"
)
rc, out, _ = remote.run(cmd, timeout=30)
# 解析:tab 分隔,3 列 (k, st, n)
glob: dict[str, dict[str, int]] = {} # glob['tr_glob'] = {'ok': 100, 'failed': 5, ...}
for line in out.splitlines():
line = line.strip()
if line.count("\t") < 2:
continue
k, st, n_s = line.split("\t", 2)
try:
n = int(n_s)
except ValueError:
continue
glob.setdefault(k, {})[st] = n
if not glob:
return Check(
"LLM 工作流落实度(翻译/分类/排版/插图/评论)", "app", False,
"查询无结果(SQL 失败?)",
detail=out[:600],
command=cmd,
severity="error",
)
# === 1) 翻译全局健康 ===
tr = glob.get("tr_glob", {})
tr_total = sum(tr.values())
tr_failed = tr.get("failed", 0) + tr.get("partial", 0)
tr_failed_pct = (tr_failed / tr_total * 100) if tr_total else 0.0
tr_ok = tr.get("ok", 0)
# === 2) 24h 翻译成功的文章里 4 个 LLM 状态的落实度 ===
# 总样本 = cl_24h 的所有值之和(也等于其他 3 个的样本量)
llm_24h_total = sum(glob.get("cl_24h", {}).values())
llm_summary: list[str] = []
llm_issues: list[str] = []
for prefix, name in [("cl_24h", "分类"), ("fm_24h", "排版"),
("im_24h", "插图"), ("co_24h", "评论")]:
d = glob.get(prefix, {})
ok = d.get("ok", 0)
failed = d.get("failed", 0)
pending = d.get("pending", 0)
na = d.get("n/a", 0)
if llm_24h_total == 0:
llm_summary.append(f"{name}: 无 24h 翻译样本")
continue
ok_pct = ok / llm_24h_total * 100
fail_pct = failed / llm_24h_total * 100
llm_summary.append(
f"{name}: ok={ok} failed={failed} pending={pending} n/a={na} ({ok_pct:.0f}% ok)"
)
if fail_pct >= 20:
llm_issues.append(f"{name} 24h 失败率 {fail_pct:.0f}% (≥20%)")
# === 3) 全局 LLM 状态分布(用于看整体)===
glob_parts: list[str] = []
for prefix, name in [("cl_glob", "分类"), ("fm_glob", "排版"),
("im_glob", "插图"), ("co_glob", "评论")]:
d = glob.get(prefix, {})
if d:
parts = ",".join(f"{k}={v}" for k, v in sorted(d.items(), key=lambda x: -x[1])[:3])
glob_parts.append(f"{name} {parts}")
# === 4) 汇总判据 ===
issues: list[str] = []
if tr_failed_pct >= 20:
issues.append(f"翻译失败率 {tr_failed_pct:.0f}% ≥20%")
elif tr_failed_pct >= 5:
issues.append(f"翻译失败率 {tr_failed_pct:.0f}% ≥5%")
issues.extend(llm_issues)
if llm_24h_total == 0:
# 24h 内没翻译成功的文章,工作流谈不上"落实"不"落实",info 跳过
sev = "info"
summary = f"24h 内无翻译成功样本(无法评估 LLM 工作流)"
else:
sev = "error" if any("≥20%" in i and "失败" in i for i in issues) else (
"warn" if issues else "info"
)
summary = f"翻译 ok={tr_ok}/{tr_total} ({100 - tr_failed_pct:.0f}%) | " + " · ".join(llm_summary)
if issues:
summary += " · " + "; ".join(issues[:2])
detail_lines = [
f"翻译全局(全量): " + ", ".join(f"{k}={v}" for k, v in sorted(tr.items(), key=lambda x: -x[1])),
f"翻译失败率: {tr_failed_pct:.1f}%",
f"24h 已翻译文章样本: {llm_24h_total}",
] + llm_summary + [
"",
"全局 LLM 状态(全量,取 top3):",
] + [f" {p}" for p in glob_parts]
if issues:
detail_lines.append("")
detail_lines.append("⚠ 问题: " + "; ".join(issues))
ok = not issues and llm_24h_total > 0
return Check(
"LLM 工作流落实度(翻译/分类/排版/插图/评论)", "app", ok, summary,
detail="\n".join(detail_lines),
command="psql: 5 个 status 字段 × 全局/24h 分布",
severity=sev,
)
@timed
def check_translation_sample(remote: Remote, compose_dir: str, sample_n: int = 3) -> Check:
"""1.9 抽查最近 24h 内已翻译的 N 篇文章(默认 3 篇),检查翻译质量。
抽样条件: published_at > now()-24h AND title_zh IS NOT NULL
AND translation_status IN ('ok','partial')
判据(每篇):
- title_zh 非空
- body_zh_text 非空
- title_zh != title (未翻译 fallback 的典型表现)
- title_zh 长度 >= 2
整体判据:
- 没候选: info (无样本,worker 还没产出)
- 全部通过: ok
- 通过 1 / N 篇: error (翻译管线几乎坏了)
- 通过 2..N-1: warn (部分文章翻译坏掉)
"""
# 一次拉 sample_n 条,字段用 \t 分隔,转义好 psql 输出
sql = (
f"SELECT id, "
f" coalesce(source_id::text,'?') AS src, "
f" title, "
f" title_zh, "
f" coalesce(substring(body_zh_text, 1, 200), '') AS body_zh_preview, "
f" translation_status, "
f" translation_engine, "
f" coalesce(to_char(translated_at, 'YYYY-MM-DD HH24:MI'), '-') AS tat, "
f" coalesce(lang_src,'-') AS lang, "
f" coalesce(char_length(title),0) AS tlen, "
f" coalesce(char_length(title_zh),0) AS zlen, "
f" coalesce(char_length(body_zh_text),0) AS blen "
f"FROM articles "
f"WHERE published_at > now() - interval '24 hour' "
f" AND title_zh IS NOT NULL "
f" AND translation_status IN ('ok','partial') "
f"ORDER BY random() "
f"LIMIT {sample_n};"
)
# 头部一行,方便按列对齐
header = "id\tsrc\ttitle\ttitle_zh\tbody_zh_preview\tstatus\tengine\ttranslated_at\tlang\ttlen\tzlen\tblen"
cmd = (
f"cd {compose_dir} && "
"set -a; . ./.env; set +a; "
f"echo '{header}'; "
f"docker compose exec -T postgres psql -U \"$POSTGRES_USER\" -d \"$POSTGRES_DB\" -t -A -F $'\\t' -c \"{sql.replace(chr(34), chr(92)+chr(34))}\" 2>&1"
)
rc, out, err = remote.run(cmd, timeout=30)
# 解析输出:跳过 header 行(就是 echo 的那个),保留真实数据行
lines = [l for l in out.splitlines() if l.strip() and not l.startswith("id\t")]
# 一些 psql 在 -t 模式下仍可能输出 NOTICE 之类 — 按制表符列数过滤
rows = []
for l in lines:
if l.count("\t") >= 9: # 至少 10 列
rows.append(l.split("\t"))
if not rows:
# 候选为 0 = 24h 内没有已翻译文章(可能刚启动 / 数据少)
return Check(
f"翻译抽查({sample_n}篇/24h)", "app", True,
f"无样本(24h 内暂无已翻译文章)",
detail=f"# raw output:\n{out.strip()[:500]}",
severity="info",
command=cmd,
)
# 逐篇判分
verdicts: list[tuple[bool, str]] = [] # (ok, 一行可读摘要)
bad_detail: list[str] = []
for cols in rows:
try:
(aid, src, title, title_zh, body_zh_pv, status,
engine, tat, lang, tlen, zlen, blen) = cols[:12]
except ValueError:
continue
tlen_i, zlen_i, blen_i = int(tlen or 0), int(zlen or 0), int(blen or 0)
# 判据
reasons: list[str] = []
if not title_zh.strip():
reasons.append("title_zh 空")
if not body_zh_pv.strip():
reasons.append("body_zh_text 空")
if title_zh.strip() and title.strip() and title_zh.strip() == title.strip():
reasons.append("title_zh == title(未翻译)")
if zlen_i < 2:
reasons.append(f"title_zh 长度={zlen_i}")
is_ok = len(reasons) == 0
verdicts.append((is_ok, reasons))
# 详细行:可读的"原文标题 / 译文标题 / 长度 / 状态"
t_disp = (title[:50] + "") if len(title) > 50 else title
z_disp = (title_zh[:50] + "") if len(title_zh) > 50 else title_zh
line = (f"#{aid} src={src} lang={lang} status={status} "
f"len: 原 {tlen_i} → 译 {zlen_i} (body_zh {blen_i}) "
f"engine={engine} at={tat}")
if is_ok:
line = "" + line + f"\n 原: {t_disp}\n 译: {z_disp}"
else:
line = "" + line + f"\n 原因: {'; '.join(reasons)}\n 原: {t_disp}\n 译: {z_disp}"
bad_detail.append(line)
passed = sum(1 for ok, _ in verdicts if ok)
total = len(verdicts)
if passed == total:
sev, summary = "info", f"{passed}/{total} 通过"
elif passed == 0:
sev, summary = "error", f"0/{total} 通过 ⚠ 翻译管线可能挂了"
else:
sev = "warn"
summary = f"{passed}/{total} 通过(部分文章翻译异常)"
ok_flag = (passed == total)
return Check(
f"翻译抽查({sample_n}篇/24h)", "app", ok_flag, summary,
detail="\n".join(bad_detail),
command=cmd, severity=sev,
)
@timed
def check_redis(remote: Remote, compose_dir: str) -> Check:
"""1.9 Redis ping + 内存。"""
cmd = (
f"cd {compose_dir} && "
"set -a; . ./.env; set +a; "
"docker compose exec -T redis redis-cli -a \"$REDIS_PASSWORD\" --no-auth-warning "
"ping 2>&1; "
"docker compose exec -T redis redis-cli -a \"$REDIS_PASSWORD\" --no-auth-warning "
"info memory 2>&1 | grep -E 'used_memory_human|used_memory_peak_human|maxmemory_human'"
)
rc, out, _ = remote.run(cmd, timeout=20)
pong = "PONG" in out
return Check("Redis", "app", pong, out.strip().replace("\n", " | "),
severity="error" if not pong else "info")
@timed
def check_homepage(remote: Remote, api_base: str, auth_token: str = "") -> Check:
"""1.10 首页 SPA + Feed API + 移动端适配。
前端是 Vue SPA,首页 index.html 是空壳;真正要查的是:
1) / 200 + 包含 viewport meta + 引用了 JS bundle
2) /api/v1/articles?page=1&page_size=10 返回 {items,total,total_pages},
items[].title_zh 存在(翻译过的文章会展示) ← 此端点需 auth
3) 移动端: index.html 含 viewport,前端 style.css 含 @media (max-width: 768px)
401 视为"端点需要 token,服务正常" → info,不污染汇总。
"""
# 1) 拉首页 HTML
rc1, html, _ = remote.run("curl -sS -m 5 http://127.0.0.1/", timeout=10)
has_viewport = "name=\"viewport\"" in html or "name='viewport'" in html
has_app_div = 'id="app"' in html
has_js = "main.ts" in html or "/src/main.ts" in html or "/assets/index-" in html
has_lang_zh = 'lang="zh-CN"' in html or "lang='zh-CN'" in html
# 2) 拉首页文章列表 API(需 auth)
api_url = f"{api_base.rstrip('/').removesuffix('/api/v1/healthz')}/api/v1/articles?page=1&page_size=10"
auth_header = ""
if auth_token:
# 用 base64 转义,避免 shell history / ps 里看见明文
tok_b64 = base64.b64encode(auth_token.encode("utf-8")).decode("ascii")
auth_header = f" -H 'Authorization: Bearer $(echo {tok_b64} | base64 -d)'"
rc2, body, _ = remote.run(
"curl -sS -m 8 '" + api_url + "'" + auth_header +
" -w '\\n---HTTP=%{http_code} TIME=%{time_total}---\\n' 2>&1",
timeout=15,
)
items: list = []
api_code = 0
total = 0
api_err = ""
try:
marker = "\n---HTTP="
if marker in body:
json_part, status_part = body.rsplit(marker, 1)
m = re.search(r"HTTP=(\d+)", status_part)
api_code = int(m.group(1)) if m else 0
else:
json_part = body
data = json.loads(json_part)
items = data.get("items") or []
total = int(data.get("total") or 0)
except Exception as e:
api_err = f"{type(e).__name__}: {e}"
data = None
# 3) 移动端断点 — 在服务端 grep 计数,避免 head 截断
css_href = ""
m = re.search(r'<link[^>]+rel="stylesheet"[^>]+href="([^"]+)"', html)
if m:
css_href = m.group(1)
mobile_768 = mobile_480 = 0
if css_href:
cmd_css = (
"curl -sS -m 8 'http://127.0.0.1" + css_href + "' | "
"grep -oc -E 'max-width:[[:space:]]*768px' || true; "
"echo ---480---; "
"curl -sS -m 8 'http://127.0.0.1" + css_href + "' | "
"grep -oc -E 'max-width:[[:space:]]*480px' || true"
)
rc3, css_out, _ = remote.run(cmd_css, timeout=15)
# 解析"数\n---480---\n数"
parts = re.split(r"---480---", css_out)
try: mobile_768 = int((parts[0].strip().splitlines() or ["0"])[-1])
except Exception: pass
try: mobile_480 = int((parts[1].strip().splitlines() or ["0"])[-1]) if len(parts) > 1 else 0
except Exception: pass
# === 汇总 ===
issues: list[str] = []
if not has_viewport: issues.append("首页 HTML 缺 viewport meta(移动端不友好)")
if not has_app_div: issues.append("首页 HTML 缺 #app 挂载点")
if not has_js: issues.append("首页 HTML 没引 JS bundle")
if not has_lang_zh: issues.append("首页 HTML lang 不是 zh-CN")
# Feed API 状态:401 没带 token 时不算 error;带 token 还 401 算 error
need_auth_msg = ""
if api_code == 401 and not auth_token:
need_auth_msg = "Feed API 401(端点需登录)— 用 --auth-user / --auth-pass 传 owner 凭据"
elif api_code != 200:
issues.append(f"Feed API 返回 {api_code} (非 200)")
if api_err:
issues.append(f"Feed API 解析失败: {api_err}")
if data is not None and not items and api_code == 200:
issues.append(f"Feed API 返回 items 为空 (total={total})")
# 译文抽样
sample = []
for it in items[:3]:
sample.append({
"id": it.get("id"),
"title": (it.get("title") or "")[:60],
"title_zh": (it.get("title_zh") or "")[:60],
"status": it.get("translation_status"),
"engine": it.get("translation_engine"),
})
has_zh = sum(1 for it in items if it.get("title_zh"))
summary_parts = [
f"html: {'' if has_viewport and has_app_div and has_js else ''}",
f"feed: {len(items)}/{total} (有译文 {has_zh})" if api_code == 200
else f"feed: http={api_code}",
f"mobile-css: {mobile_768}×768 + {mobile_480}×480" if css_href
else "mobile-css: (无 CSS 链接)",
]
summary = " · ".join(summary_parts)
if need_auth_msg:
summary += " · " + need_auth_msg
elif issues:
summary += " · " + "; ".join(issues[:2])
# 判定:HTML 元素都齐 + (有 token 拿到了数据 或 401 无 token 算 info)
html_ok = has_viewport and has_app_div and has_js and has_lang_zh
if need_auth_msg:
# 没 token → 401 → 服务正常,降级 info
ok = html_ok
sev = "info"
else:
ok = html_ok and not issues
sev = "error" if (api_code not in (0, 200) and not need_auth_msg) else (
"warn" if issues else "info"
)
detail_lines = [
f"首页 HTML: viewport={has_viewport} #app={has_app_div} js={has_js} lang-zh={has_lang_zh}",
f"Feed API: http={api_code} items={len(items)} total={total} 译过={has_zh}",
]
if css_href:
detail_lines.append(f"CSS: {css_href} mobile: 768px={mobile_768} 处, 480px={mobile_480}")
if sample:
detail_lines.append("首屏抽样:")
for s in sample:
detail_lines.append(
f" #{s['id']} {s['title']!r}{s['title_zh']!r} "
f"[{s['status']}/{s['engine']}]"
)
if need_auth_msg:
detail_lines.append("提示: " + need_auth_msg)
if issues:
detail_lines.append("问题: " + "; ".join(issues))
return Check(
"首页 SPA + Feed API + 移动端", "app", ok, summary,
detail="\n".join(detail_lines),
command=f"GET /; GET {api_url}; GET {css_href or '(no css)'}",
severity=sev,
)
@timed
def check_article_detail(remote: Remote, api_base: str, auth_token: str = "") -> Check:
"""1.11 详情页:取一篇最新已翻译文章,GET /api/v1/articles/{id},看:
- status=200
- 字段齐: title / title_zh / body_zh_text 或 body_zh_formatted
- body_zh_formatted 含 <div class="article-body"> (说明 LLM 排版版带了 CSS 容器)
- 移动端: meta viewport(首页的) + 详情页路由 /article/{id}
401 视为"端点需 auth"— 提示用户加 --auth-user / --auth-pass,不污染汇总。
"""
base = api_base.rstrip("/").removesuffix("/api/v1/healthz")
list_url = f"{base}/api/v1/articles?page=1&page_size=1"
tok_b64 = base64.b64encode(auth_token.encode("utf-8")).decode("ascii") if auth_token else ""
auth_h = f" -H 'Authorization: Bearer $(echo {tok_b64} | base64 -d)'" if tok_b64 else ""
rc, list_body, _ = remote.run(
"curl -sS -m 8 '" + list_url + "'" + auth_h + " -w '\\n---HTTP=%{http_code}---\\n' 2>&1",
timeout=10,
)
article_id = None
list_code = 0
if rc == 0 and list_body:
try:
marker = "\n---HTTP="
if marker in list_body:
json_part, status_part = list_body.rsplit(marker, 1)
m = re.search(r"HTTP=(\d+)", status_part)
list_code = int(m.group(1)) if m else 0
else:
json_part = list_body
data = json.loads(json_part)
if data.get("items"):
article_id = data["items"][0]["id"]
except Exception:
pass
if list_code == 401 and not auth_token:
return Check(
"详情页 API + 译文 CSS", "app", True,
"需 owner token(用 --auth-user / --auth-pass)",
detail=f"# raw list response:\n{list_body[:300]}",
command=f"GET {list_url} (no token)",
severity="info",
)
if not article_id:
return Check(
"详情页 API + 译文 CSS", "app", False,
f"无可用文章样本(列表 http={list_code}, items=0?)",
detail=list_body[:500],
command=list_url,
severity="warn",
)
# 拉详情
detail_url = f"{base}/api/v1/articles/{article_id}"
rc2, body2, _ = remote.run(
"curl -sS -m 8 '" + detail_url + "'" + auth_h + " -w '\\n---HTTP=%{http_code}---\\n' 2>&1",
timeout=10,
)
api_code = 0
article = {}
parse_err = ""
try:
marker = "\n---HTTP="
if marker in body2:
json_part, status_part = body2.rsplit(marker, 1)
else:
json_part, status_part = body2, ""
m = re.search(r"HTTP=(\d+)", status_part)
api_code = int(m.group(1)) if m else 0
article = json.loads(json_part)
except Exception as e:
parse_err = f"{type(e).__name__}: {e}"
if api_code != 200 or not article:
return Check(
f"详情页 API #{article_id} + 译文 CSS", "app", False,
f"http={api_code} parse_err={parse_err or '-'}",
detail=body2[:500],
command=detail_url,
severity="error",
)
# 判据
title = article.get("title") or ""
title_zh = article.get("title_zh") or ""
body_zh_text = article.get("body_zh_text") or ""
body_zh_formatted = article.get("body_zh_formatted") or ""
body_zh_html = article.get("body_zh_html") or ""
fmt_status = article.get("format_status") or "n/a"
tr_status = article.get("translation_status") or "-"
tr_engine = article.get("translation_engine") or "-"
issues: list[str] = []
if not title_zh: issues.append("缺 title_zh(无译文)")
if not (body_zh_text or body_zh_formatted or body_zh_html):
issues.append("缺 body_zh_text/formatted/html(译文全空)")
if title_zh and title and title_zh.strip() == title.strip():
issues.append("title_zh == title(未翻译)")
has_css_container = (
'class="article-body"' in body_zh_formatted
or "class='article-body'" in body_zh_formatted
)
css_info = "✓ 排版版带 .article-body 容器" if has_css_container else (
"✗ 排版版缺 .article-body 容器(译文没套 CSS)"
if body_zh_formatted
else "— 无排版版(用原始译文展示)"
)
if not has_css_container and body_zh_formatted:
issues.append("排版版 body_zh_formatted 缺 .article-body CSS 容器")
summary = (
f"#{article_id} {tr_status}/{tr_engine} fmt={fmt_status} "
f"译字 {len(title_zh)}/{len(body_zh_text)}; CSS {css_info}"
)
if issues:
summary += " · " + "; ".join(issues[:2])
detail_lines = [
f"原标题: {title[:80]!r}",
f"译标题: {title_zh[:80]!r}",
f"body_zh_text 长度: {len(body_zh_text)}",
f"body_zh_formatted 长度: {len(body_zh_formatted)} status={fmt_status}",
f"body_zh_html 长度: {len(body_zh_html)}",
f"CSS 容器(.article-body): {'' if has_css_container else ''}",
]
# 抽 body_zh_formatted 前 300 字符(可能 < 字符被转义了)
if body_zh_formatted:
detail_lines.append(f"body_zh_formatted 前 300: {body_zh_formatted[:300]!r}")
ok = not issues and api_code == 200
sev = "error" if (api_code != 200) else ("warn" if issues else "info")
return Check(
f"详情页 API #{article_id} + 译文 CSS", "app", ok, summary,
detail="\n".join(detail_lines),
command=detail_url,
severity=sev,
)
@timed
def check_agnes_llm(remote: Remote, compose_dir: str) -> Check:
"""1.12 Agnes LLM 健康:真发一次 chat/completions 调用。
- 读 .env 的 AGNES_API_KEY / AGNES_BASE_URL / AGNES_CHAT_MODEL
- 没配 → info 跳过(LLM 增强是可选模块)
- 配了 → 发一次最小调用(max_tokens=8,短 prompt)看 200 + choices[0].message.content
注意:为了避免 API 密钥泄露到 shell history,密钥用 base64 编码后
在远程 shell 里 decode 出来,再注入到 curl Header。
"""
# 1) 读 .env 拿 3 个变量
rc, env_out, _ = remote.run(
f"cd {compose_dir} 2>/dev/null && "
"grep -E '^(AGNES_API_KEY|AGNES_BASE_URL|AGNES_CHAT_MODEL)=' .env 2>/dev/null"
)
api_key = base_url = model = ""
for line in env_out.splitlines():
m = re.match(r"^AGNES_API_KEY=(.+)$", line)
if m: api_key = m.group(1).strip().strip('"').strip("'")
m = re.match(r"^AGNES_BASE_URL=(.+)$", line)
if m: base_url = m.group(1).strip().strip('"').strip("'")
m = re.match(r"^AGNES_CHAT_MODEL=(.+)$", line)
if m: model = m.group(1).strip().strip('"').strip("'")
if not api_key or api_key.startswith("your_"):
return Check(
"Agnes LLM 联通", "app", True,
"未配 AGNES_API_KEY(LLM 增强模块关闭),跳过",
detail=env_out.strip()[:300],
severity="info",
)
base_url = base_url or "https://apihub.agnes-ai.com/v1"
model = model or "agnes-2.0-flash"
chat_url = f"{base_url.rstrip('/')}/chat/completions"
# 2) base64 编码密钥 + payload,再在 shell 里 decode 出来拼 header
# 避免 API key 出现在 process list / history 里
key_b64 = base64.b64encode(api_key.encode("utf-8")).decode("ascii")
payload_obj = {
"model": model,
"messages": [
{"role": "system", "content": "You are a ping bot. Reply with a single word."},
{"role": "user", "content": "ping"},
],
"max_tokens": 8,
"temperature": 0,
}
payload_b64 = base64.b64encode(
json.dumps(payload_obj, ensure_ascii=False).encode("utf-8")
).decode("ascii")
cmd = (
f"KEY_B64={key_b64}; "
f"PAYLOAD_B64={payload_b64}; "
"BODY=$(echo \"$PAYLOAD_B64\" | base64 -d); "
f"curl -sS -m 25 -o /tmp/agnes_resp -w 'http=%{{http_code}} t=%{{time_total}}\\n' "
"-H \"Authorization: Bearer $(echo $KEY_B64 | base64 -d)\" "
"-H 'Content-Type: application/json' "
f"-d \"$BODY\" '{chat_url}'; "
"echo '--- body (first 400 chars) ---'; head -c 400 /tmp/agnes_resp 2>/dev/null; echo"
)
rc2, out, _ = remote.run(cmd, timeout=40)
# 解析
m = re.search(r"http=(\d+)\s+t=([\d.]+)", out)
code = int(m.group(1)) if m else 0
elapsed = float(m.group(2)) if m else 0
body_str = ""
if "--- body" in out:
body_str = out.split("--- body", 1)[1].split("---", 1)[-1].strip()
if code != 200:
return Check(
f"Agnes LLM chat 调用", "app", False,
f"http={code} t={elapsed:.1f}s",
detail=out[:600],
command=f"POST {chat_url} (auth via base64-decoded key, not echoed)",
severity="error",
)
# 看返回里有没有 text
try:
resp = json.loads(out.split("--- body", 1)[-1].split("---", 1)[-1].strip() or body_str)
text = (resp.get("choices") or [{}])[0].get("message", {}).get("content", "")
except Exception:
text = ""
ok = code == 200 and bool(text)
summary = f"http={code} t={elapsed:.1f}s model={model} reply={text[:30]!r}"
return Check(
"Agnes LLM chat 调用", "app", ok, summary,
detail=f"# model: {model}\n# base_url: {base_url}\n# raw:\n{out[:800]}",
command=f"POST {chat_url}",
severity="info" if ok else "warn",
)
@timed
def check_caddy(remote: Remote) -> Check:
"""1.10 Caddy 反代 — 80 端口根路径 200/301/302。"""
cmd = "curl -sS -m 5 -o /dev/null -w 'http=%{http_code} t=%{time_total}\\n' http://127.0.0.1/"
rc, out, _ = remote.run(cmd)
m = re.search(r"http=(\d+)", out)
code = int(m.group(1)) if m else 0
ok = 200 <= code < 400
return Check("Caddy http://127.0.0.1/", "app", ok, out.strip(),
severity="error" if not ok else "info")
@timed
def check_frontend(remote: Remote) -> Check:
"""1.11 Frontend — 80 端口 / 返回 index.html。"""
cmd = (
"curl -sS -m 5 -o /dev/null -w 'http=%{http_code} t=%{time_total} ct=%{content_type}\\n' http://127.0.0.1/; "
"curl -sS -m 5 http://127.0.0.1/ | head -3"
)
rc, out, _ = remote.run(cmd)
m = re.search(r"http=(\d+)", out)
code = int(m.group(1)) if m else 0
ok = 200 <= code < 400 and ("html" in out.lower() or "<!doctype" in out.lower())
return Check("Frontend 首页", "app", ok, out.splitlines()[0] if out else "",
out, severity="warn" if not ok else "info")
@timed
def check_tls_cert(remote: Remote) -> Check:
"""1.12 HTTPS 证书 — 仅在 .env 里 DOMAIN 非空时检查。"""
# 先从 .env 读 DOMAIN 值(没配就跳过)
rc, env_out, _ = remote.run(
f"cd {COMPOSE_DIR} 2>/dev/null && "
"grep -E '^DOMAIN=' .env 2>/dev/null | head -1"
)
domain = ""
for line in env_out.splitlines():
m = re.match(r"^DOMAIN=(.+)$", line.strip())
if m:
domain = m.group(1).strip().strip('"').strip("'")
break
if not domain:
return Check("HTTPS 证书(域名)", "app", True,
"未配 DOMAIN,跳过(走 IP 模式)", severity="info")
# 有域名,拉证书
cmd2 = f"echo | openssl s_client -servername {domain} -connect {domain}:443 2>/dev/null | openssl x509 -noout -dates 2>&1"
rc2, out2, _ = remote.run(cmd2, timeout=15)
m = re.search(r"notAfter=(.+)", out2)
if not m:
return Check(f"HTTPS 证书 {domain}", "app", False,
"无法获取证书(可能 443 未开)", out2, severity="warn")
return Check(f"HTTPS 证书 {domain}", "app", True, f"notAfter={m.group(1).strip()}",
severity="info")
@timed
def check_docker_logs_size(remote: Remote, compose_dir: str) -> Check:
"""1.13 日志卷积压。"""
cmd = (
f"cd {compose_dir} && "
"docker compose logs --no-color --tail=0 2>&1 >/dev/null; "
"du -sh /var/lib/docker/containers/*/*-json.log 2>/dev/null | sort -h | tail -5"
)
rc, out, _ = remote.run(cmd, timeout=20)
big = []
for line in out.splitlines():
m = re.match(r"(\d+)([KMG]?)\s+", line.strip())
if not m: continue
size, unit = int(m.group(1)), m.group(2)
mb = size * (1024 if unit == "G" else 1 if unit == "M" else 1/1024)
if unit == "G" or (unit == "M" and size > 200):
big.append(line.strip())
return Check("容器日志大小", "docker", not big,
"ok" if not big else f"大日志: {'; '.join(big)}",
out, severity="warn" if big else "info")
# ============== 主流程 ==============
GROUPS: dict[str, list[Callable]] = {
"docker": [
("docker compose ps", lambda r: check_compose_ps(r, COMPOSE_DIR)),
("近 200 行 worker/api 日志", lambda r: check_container_logs(r, COMPOSE_DIR)),
("docker system df", lambda r: check_docker_system(r)),
("容器日志大小", lambda r: check_docker_logs_size(r, COMPOSE_DIR)),
],
"host": [
("磁盘空间", lambda r: check_disk(r)),
("内存使用", lambda r: check_memory(r)),
],
"network": [
("关键端口监听", lambda r: check_ports(r)),
],
"app": [
("API 健康", lambda r: check_api_health(r, API_BASE)),
("Redis ping", lambda r: check_redis(r, COMPOSE_DIR)),
("DB 行数", lambda r: check_db_counts(r, COMPOSE_DIR)),
("LLM 工作流落实度", lambda r: check_llm_workflow(r, COMPOSE_DIR)),
(f"翻译抽查", lambda r: check_translation_sample(r, COMPOSE_DIR, SAMPLE_N)),
("Caddy 反代", lambda r: check_caddy(r)),
("Frontend 首页", lambda r: check_frontend(r)),
("首页 SPA + Feed API", lambda r: check_homepage(r, API_BASE, AUTH_TOKEN)),
("详情页 + 译文 CSS", lambda r: check_article_detail(r, API_BASE, AUTH_TOKEN)),
("Agnes LLM 调用", lambda r: check_agnes_llm(r, COMPOSE_DIR)),
("HTTPS 证书", lambda r: check_tls_cert(r)),
],
}
def main() -> int:
ap = argparse.ArgumentParser(
description="diary-news 服务器健康检查",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="示例:\n"
" python healthcheck.py # 跑全部\n"
" python healthcheck.py --only docker,app # 只跑 docker 和 app 组\n"
" python healthcheck.py --local --compose-dir . # 服务器本地跑\n"
" python healthcheck.py --json report.json # 导出结构化报告\n",
)
ap.add_argument("--local", action="store_true", help="在服务器本地跑,不走 SSH")
ap.add_argument("--host", default=os.environ.get("REMOTE_HOST", DEFAULT_HOST))
ap.add_argument("--port", type=int, default=int(os.environ.get("REMOTE_PORT", DEFAULT_PORT)))
ap.add_argument("--user", default=os.environ.get("REMOTE_USER", DEFAULT_USER))
ap.add_argument("--password", default=os.environ.get("REMOTE_PASS", ""))
ap.add_argument("--compose-dir", default=os.environ.get("COMPOSE_DIR", DEFAULT_COMPOSE))
ap.add_argument("--api-base", default=os.environ.get("API_BASE_URL", DEFAULT_API_BASE))
ap.add_argument("--only", help="逗号分隔的组名: docker,host,network,app")
ap.add_argument("--skip", help="逗号分隔的组名,跳过")
ap.add_argument("--json", dest="json_out", help="把结果写到 JSON 文件")
ap.add_argument("--quiet", action="store_true", help="只输出汇总")
ap.add_argument("--verbose", "-v", action="store_true",
help="显示失败项的完整原始输出(默认 warn 截断 12 行)")
ap.add_argument("--sample", type=int, default=3,
help="翻译抽查的文章数(默认 3 篇,24h 内已翻译的随机样本)")
ap.add_argument("--auth-user", default=os.environ.get("OWNER_USER", "owner"),
help="owner 用户名(用于获取 JWT token,调 /api/v1/auth/login)")
ap.add_argument("--auth-pass", default=os.environ.get("OWNER_PASS", ""),
help="owner 密码(env: OWNER_PASS)。如不传,API 端点会降级为 info(不污染汇总)")
ap.add_argument("--skip-auth", action="store_true",
help="明确跳过 auth token,等价于不传 --auth-pass")
args = ap.parse_args()
global COMPOSE_DIR, API_BASE, SAMPLE_N, AUTH_TOKEN
COMPOSE_DIR = args.compose_dir
API_BASE = args.api_base
SAMPLE_N = max(1, min(args.sample, 20)) # 1..20 封顶,避免误传爆 1000
# 提前在 main 函数顶部声明,稍后赋值后,GROUPS 里的 lambda 能读到
only = set((args.only or "").split(",")) - {""}
skip = set((args.skip or "").split(",")) - {""}
target = "local" if args.local else f"{args.user}@{args.host}:{args.port}"
print(f"==== diary-news 健康检查 ====")
print(f"目标: {target}")
print(f"目录: {COMPOSE_DIR}")
print(f"时间: {time.strftime('%Y-%m-%d %H:%M:%S %Z')}")
print()
remote = Remote(local=args.local, host=args.host, port=args.port,
user=args.user, password=args.password)
report = Report(target=target, started_at=time.strftime("%Y-%m-%dT%H:%M:%S%z"))
# ===== 拿 owner token(可选)=====
# 必须无条件初始化:即使跳过了 login,AUTH_TOKEN 也要在模块 dict 里,
# 否则 GROUPS 里的 lambda 闭包查找时会 NameError。
global AUTH_TOKEN
AUTH_TOKEN = ""
if not args.skip_auth and args.auth_pass:
# base64 编码密码再传,避免出现在 process list
pw_b64 = base64.b64encode(args.auth_pass.encode("utf-8")).decode("ascii")
login_url = f"{API_BASE.rstrip('/').removesuffix('/api/v1/healthz')}/api/v1/auth/login"
login_cmd = (
f"PW_B64={pw_b64}; "
"BODY=$(printf '{\"username\":\"%s\",\"password\":\"'\"$(echo $PW_B64 | base64 -d)\"'\"}' \""
+ args.auth_user + "\"); "
"curl -sS -m 8 -o /tmp/login_resp -w 'http=%{http_code}\\n' "
f"-H 'Content-Type: application/json' -d \"$BODY\" '{login_url}'; "
"echo '--- token (jwt header only) ---'; "
"head -c 200 /tmp/login_resp 2>/dev/null; echo"
)
rc, out, _ = remote.run(login_cmd, timeout=15)
m = re.search(r"http=(\d+)", out)
if m and m.group(1) == "200":
try:
body_str = out.rsplit("--- token (jwt header only) ---", 1)[-1].strip()
body_str = body_str.rstrip("---").strip()
resp = json.loads(body_str)
AUTH_TOKEN = resp.get("access_token") or resp.get("accessToken") or resp.get("token") or ""
except Exception as e:
print(f" ⚠ auth: 解析响应失败 {e}")
if AUTH_TOKEN:
print(f" ✓ auth: 已登录 owner='{args.auth_user}', token 长度 {len(AUTH_TOKEN)}")
else:
code_str = m.group(1) if m else "?"
print(f" ⚠ auth: 登录失败 http={code_str}, API 检查项将无 token(降级 info)")
else:
print(" · auth: 未传 --auth-pass(API 检查项将降级为 info 提示)")
try:
for group, fns in GROUPS.items():
if only and group not in only: continue
if skip and group in skip: continue
print(f"--- [{group}] ---")
for name, fn in fns:
try:
c = fn(remote)
if not args.quiet:
report.add(c, verbose=args.verbose)
else:
report.checks.append(asdict(c))
except Exception as e:
err_c = Check(name, group, False, f"异常: {e}",
detail=f"type={type(e).__name__}\n{type(e).__doc__ or ''}",
severity="error")
if not args.quiet:
report.add(err_c, verbose=args.verbose)
else:
report.checks.append(asdict(err_c))
print()
finally:
remote.close()
report.finished_at = time.strftime("%Y-%m-%dT%H:%M:%S%z")
ok, bad, err = report.summary()
print(f"==== 汇总 ====")
print(f" 合计 {len(report.checks)} 项 · 通过 {ok} · 失败 {bad} · 严重错误 {err}")
if err > 0:
print(f" ✗ 存在 {err} 个 error 级问题,建议立即排查")
code = 2
elif bad > 0:
print(f" ⚠ 存在 {bad} 个 warn 级问题,建议看一下")
code = 1
else:
print(f" ✓ 全部通过")
code = 0
if args.json_out:
with open(args.json_out, "w", encoding="utf-8") as f:
json.dump(asdict(report), f, ensure_ascii=False, indent=2)
print(f" 报告已写入: {args.json_out}")
return code
if __name__ == "__main__":
sys.exit(main())