(说明 LLM 排版版带了 CSS 容器) - 移动端: meta viewport(首页的) + 详情页路由 /article/{id} 401 视为"端点需 auth"— 提示用户加 --auth-user / --auth-pass,不污染汇总。 """ base = api_base.rstrip("/").removesuffix("/api/v1/healthz") list_url = f"{base}/api/v1/articles?page=1&page_size=1" tok_b64 = base64.b64encode(auth_token.encode("utf-8")).decode("ascii") if auth_token else "" auth_h = f" -H 'Authorization: Bearer $(echo {tok_b64} | base64 -d)'" if tok_b64 else "" rc, list_body, _ = remote.run( "curl -sS -m 8 '" + list_url + "'" + auth_h + " -w '\\n---HTTP=%{http_code}---\\n' 2>&1", timeout=10, ) article_id = None list_code = 0 if rc == 0 and list_body: try: marker = "\n---HTTP=" if marker in list_body: json_part, status_part = list_body.rsplit(marker, 1) m = re.search(r"HTTP=(\d+)", status_part) list_code = int(m.group(1)) if m else 0 else: json_part = list_body data = json.loads(json_part) if data.get("items"): article_id = data["items"][0]["id"] except Exception: pass if list_code == 401 and not auth_token: return Check( "详情页 API + 译文 CSS", "app", True, "需 owner token(用 --auth-user / --auth-pass)", detail=f"# raw list response:\n{list_body[:300]}", command=f"GET {list_url} (no token)", severity="info", ) if not article_id: return Check( "详情页 API + 译文 CSS", "app", False, f"无可用文章样本(列表 http={list_code}, items=0?)", detail=list_body[:500], command=list_url, severity="warn", ) # 拉详情 detail_url = f"{base}/api/v1/articles/{article_id}" rc2, body2, _ = remote.run( "curl -sS -m 8 '" + detail_url + "'" + auth_h + " -w '\\n---HTTP=%{http_code}---\\n' 2>&1", timeout=10, ) api_code = 0 article = {} parse_err = "" try: marker = "\n---HTTP=" if marker in body2: json_part, status_part = body2.rsplit(marker, 1) else: json_part, status_part = body2, "" m = re.search(r"HTTP=(\d+)", status_part) api_code = int(m.group(1)) if m else 0 article = json.loads(json_part) except Exception as e: parse_err = f"{type(e).__name__}: {e}" if api_code != 200 or not article: return Check( f"详情页 API #{article_id} + 译文 CSS", "app", False, f"http={api_code} parse_err={parse_err or '-'}", detail=body2[:500], command=detail_url, severity="error", ) # 判据 title = article.get("title") or "" title_zh = article.get("title_zh") or "" body_zh_text = article.get("body_zh_text") or "" body_zh_formatted = article.get("body_zh_formatted") or "" body_zh_html = article.get("body_zh_html") or "" fmt_status = article.get("format_status") or "n/a" tr_status = article.get("translation_status") or "-" tr_engine = article.get("translation_engine") or "-" issues: list[str] = [] if not title_zh: issues.append("缺 title_zh(无译文)") if not (body_zh_text or body_zh_formatted or body_zh_html): issues.append("缺 body_zh_text/formatted/html(译文全空)") if title_zh and title and title_zh.strip() == title.strip(): issues.append("title_zh == title(未翻译)") has_css_container = ( 'class="article-body"' in body_zh_formatted or "class='article-body'" in body_zh_formatted ) css_info = "✓ 排版版带 .article-body 容器" if has_css_container else ( "✗ 排版版缺 .article-body 容器(译文没套 CSS)" if body_zh_formatted else "— 无排版版(用原始译文展示)" ) if not has_css_container and body_zh_formatted: issues.append("排版版 body_zh_formatted 缺 .article-body CSS 容器") summary = ( f"#{article_id} {tr_status}/{tr_engine} fmt={fmt_status} " f"译字 {len(title_zh)}/{len(body_zh_text)}; CSS {css_info}" ) if issues: summary += " · " + "; ".join(issues[:2]) detail_lines = [ f"原标题: {title[:80]!r}", f"译标题: {title_zh[:80]!r}", f"body_zh_text 长度: {len(body_zh_text)}", f"body_zh_formatted 长度: {len(body_zh_formatted)} status={fmt_status}", f"body_zh_html 长度: {len(body_zh_html)}", f"CSS 容器(.article-body): {'有' if has_css_container else '无'}", ] # 抽 body_zh_formatted 前 300 字符(可能 < 字符被转义了) if body_zh_formatted: detail_lines.append(f"body_zh_formatted 前 300: {body_zh_formatted[:300]!r}") ok = not issues and api_code == 200 sev = "error" if (api_code != 200) else ("warn" if issues else "info") return Check( f"详情页 API #{article_id} + 译文 CSS", "app", ok, summary, detail="\n".join(detail_lines), command=detail_url, severity=sev, ) @timed def check_agnes_llm(remote: Remote, compose_dir: str) -> Check: """1.12 Agnes LLM 健康:真发一次 chat/completions 调用。 - 读 .env 的 AGNES_API_KEY / AGNES_BASE_URL / AGNES_CHAT_MODEL - 没配 → info 跳过(LLM 增强是可选模块) - 配了 → 发一次最小调用(max_tokens=8,短 prompt)看 200 + choices[0].message.content 注意:为了避免 API 密钥泄露到 shell history,密钥用 base64 编码后 在远程 shell 里 decode 出来,再注入到 curl Header。 """ # 1) 读 .env 拿 3 个变量 rc, env_out, _ = remote.run( f"cd {compose_dir} 2>/dev/null && " "grep -E '^(AGNES_API_KEY|AGNES_BASE_URL|AGNES_CHAT_MODEL)=' .env 2>/dev/null" ) api_key = base_url = model = "" for line in env_out.splitlines(): m = re.match(r"^AGNES_API_KEY=(.+)$", line) if m: api_key = m.group(1).strip().strip('"').strip("'") m = re.match(r"^AGNES_BASE_URL=(.+)$", line) if m: base_url = m.group(1).strip().strip('"').strip("'") m = re.match(r"^AGNES_CHAT_MODEL=(.+)$", line) if m: model = m.group(1).strip().strip('"').strip("'") if not api_key or api_key.startswith("your_"): return Check( "Agnes LLM 联通", "app", True, "未配 AGNES_API_KEY(LLM 增强模块关闭),跳过", detail=env_out.strip()[:300], severity="info", ) base_url = base_url or "https://apihub.agnes-ai.com/v1" model = model or "agnes-2.0-flash" chat_url = f"{base_url.rstrip('/')}/chat/completions" # 2) base64 编码密钥 + payload,再在 shell 里 decode 出来拼 header # 避免 API key 出现在 process list / history 里 key_b64 = base64.b64encode(api_key.encode("utf-8")).decode("ascii") payload_obj = { "model": model, "messages": [ {"role": "system", "content": "You are a ping bot. Reply with a single word."}, {"role": "user", "content": "ping"}, ], "max_tokens": 8, "temperature": 0, } payload_b64 = base64.b64encode( json.dumps(payload_obj, ensure_ascii=False).encode("utf-8") ).decode("ascii") cmd = ( f"KEY_B64={key_b64}; " f"PAYLOAD_B64={payload_b64}; " "BODY=$(echo \"$PAYLOAD_B64\" | base64 -d); " f"curl -sS -m 25 -o /tmp/agnes_resp -w 'http=%{{http_code}} t=%{{time_total}}\\n' " "-H \"Authorization: Bearer $(echo $KEY_B64 | base64 -d)\" " "-H 'Content-Type: application/json' " f"-d \"$BODY\" '{chat_url}'; " "echo '--- body (first 400 chars) ---'; head -c 400 /tmp/agnes_resp 2>/dev/null; echo" ) rc2, out, _ = remote.run(cmd, timeout=40) # 解析 m = re.search(r"http=(\d+)\s+t=([\d.]+)", out) code = int(m.group(1)) if m else 0 elapsed = float(m.group(2)) if m else 0 body_str = "" if "--- body" in out: body_str = out.split("--- body", 1)[1].split("---", 1)[-1].strip() if code != 200: return Check( f"Agnes LLM chat 调用", "app", False, f"http={code} t={elapsed:.1f}s", detail=out[:600], command=f"POST {chat_url} (auth via base64-decoded key, not echoed)", severity="error", ) # 看返回里有没有 text try: resp = json.loads(out.split("--- body", 1)[-1].split("---", 1)[-1].strip() or body_str) text = (resp.get("choices") or [{}])[0].get("message", {}).get("content", "") except Exception: text = "" ok = code == 200 and bool(text) summary = f"http={code} t={elapsed:.1f}s model={model} reply={text[:30]!r}" return Check( "Agnes LLM chat 调用", "app", ok, summary, detail=f"# model: {model}\n# base_url: {base_url}\n# raw:\n{out[:800]}", command=f"POST {chat_url}", severity="info" if ok else "warn", ) @timed def check_caddy(remote: Remote) -> Check: """1.10 Caddy 反代 — 80 端口根路径 200/301/302。""" cmd = "curl -sS -m 5 -o /dev/null -w 'http=%{http_code} t=%{time_total}\\n' http://127.0.0.1/" rc, out, _ = remote.run(cmd) m = re.search(r"http=(\d+)", out) code = int(m.group(1)) if m else 0 ok = 200 <= code < 400 return Check("Caddy http://127.0.0.1/", "app", ok, out.strip(), severity="error" if not ok else "info") @timed def check_frontend(remote: Remote) -> Check: """1.11 Frontend — 80 端口 / 返回 index.html。""" cmd = ( "curl -sS -m 5 -o /dev/null -w 'http=%{http_code} t=%{time_total} ct=%{content_type}\\n' http://127.0.0.1/; " "curl -sS -m 5 http://127.0.0.1/ | head -3" ) rc, out, _ = remote.run(cmd) m = re.search(r"http=(\d+)", out) code = int(m.group(1)) if m else 0 ok = 200 <= code < 400 and ("html" in out.lower() or " Check: """1.12 HTTPS 证书 — 仅在 .env 里 DOMAIN 非空时检查。""" # 先从 .env 读 DOMAIN 值(没配就跳过) rc, env_out, _ = remote.run( f"cd {COMPOSE_DIR} 2>/dev/null && " "grep -E '^DOMAIN=' .env 2>/dev/null | head -1" ) domain = "" for line in env_out.splitlines(): m = re.match(r"^DOMAIN=(.+)$", line.strip()) if m: domain = m.group(1).strip().strip('"').strip("'") break if not domain: return Check("HTTPS 证书(域名)", "app", True, "未配 DOMAIN,跳过(走 IP 模式)", severity="info") # 有域名,拉证书 cmd2 = f"echo | openssl s_client -servername {domain} -connect {domain}:443 2>/dev/null | openssl x509 -noout -dates 2>&1" rc2, out2, _ = remote.run(cmd2, timeout=15) m = re.search(r"notAfter=(.+)", out2) if not m: return Check(f"HTTPS 证书 {domain}", "app", False, "无法获取证书(可能 443 未开)", out2, severity="warn") return Check(f"HTTPS 证书 {domain}", "app", True, f"notAfter={m.group(1).strip()}", severity="info") @timed def check_docker_logs_size(remote: Remote, compose_dir: str) -> Check: """1.13 日志卷积压。""" cmd = ( f"cd {compose_dir} && " "docker compose logs --no-color --tail=0 2>&1 >/dev/null; " "du -sh /var/lib/docker/containers/*/*-json.log 2>/dev/null | sort -h | tail -5" ) rc, out, _ = remote.run(cmd, timeout=20) big = [] for line in out.splitlines(): m = re.match(r"(\d+)([KMG]?)\s+", line.strip()) if not m: continue size, unit = int(m.group(1)), m.group(2) mb = size * (1024 if unit == "G" else 1 if unit == "M" else 1/1024) if unit == "G" or (unit == "M" and size > 200): big.append(line.strip()) return Check("容器日志大小", "docker", not big, "ok" if not big else f"大日志: {'; '.join(big)}", out, severity="warn" if big else "info") # ============== 主流程 ============== GROUPS: dict[str, list[Callable]] = { "docker": [ ("docker compose ps", lambda r: check_compose_ps(r, COMPOSE_DIR)), ("近 200 行 worker/api 日志", lambda r: check_container_logs(r, COMPOSE_DIR)), ("docker system df", lambda r: check_docker_system(r)), ("容器日志大小", lambda r: check_docker_logs_size(r, COMPOSE_DIR)), ], "host": [ ("磁盘空间", lambda r: check_disk(r)), ("内存使用", lambda r: check_memory(r)), ], "network": [ ("关键端口监听", lambda r: check_ports(r)), ], "app": [ ("API 健康", lambda r: check_api_health(r, API_BASE)), ("Redis ping", lambda r: check_redis(r, COMPOSE_DIR)), ("DB 行数", lambda r: check_db_counts(r, COMPOSE_DIR)), ("LLM 工作流落实度", lambda r: check_llm_workflow(r, COMPOSE_DIR)), (f"翻译抽查", lambda r: check_translation_sample(r, COMPOSE_DIR, SAMPLE_N)), ("Caddy 反代", lambda r: check_caddy(r)), ("Frontend 首页", lambda r: check_frontend(r)), ("首页 SPA + Feed API", lambda r: check_homepage(r, API_BASE, AUTH_TOKEN)), ("详情页 + 译文 CSS", lambda r: check_article_detail(r, API_BASE, AUTH_TOKEN)), ("Agnes LLM 调用", lambda r: check_agnes_llm(r, COMPOSE_DIR)), ("HTTPS 证书", lambda r: check_tls_cert(r)), ], } def main() -> int: ap = argparse.ArgumentParser( description="diary-news 服务器健康检查", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="示例:\n" " python healthcheck.py # 跑全部\n" " python healthcheck.py --only docker,app # 只跑 docker 和 app 组\n" " python healthcheck.py --local --compose-dir . # 服务器本地跑\n" " python healthcheck.py --json report.json # 导出结构化报告\n", ) ap.add_argument("--local", action="store_true", help="在服务器本地跑,不走 SSH") ap.add_argument("--host", default=os.environ.get("REMOTE_HOST", DEFAULT_HOST)) ap.add_argument("--port", type=int, default=int(os.environ.get("REMOTE_PORT", DEFAULT_PORT))) ap.add_argument("--user", default=os.environ.get("REMOTE_USER", DEFAULT_USER)) ap.add_argument("--password", default=os.environ.get("REMOTE_PASS", "")) ap.add_argument("--compose-dir", default=os.environ.get("COMPOSE_DIR", DEFAULT_COMPOSE)) ap.add_argument("--api-base", default=os.environ.get("API_BASE_URL", DEFAULT_API_BASE)) ap.add_argument("--only", help="逗号分隔的组名: docker,host,network,app") ap.add_argument("--skip", help="逗号分隔的组名,跳过") ap.add_argument("--json", dest="json_out", help="把结果写到 JSON 文件") ap.add_argument("--quiet", action="store_true", help="只输出汇总") ap.add_argument("--verbose", "-v", action="store_true", help="显示失败项的完整原始输出(默认 warn 截断 12 行)") ap.add_argument("--sample", type=int, default=3, help="翻译抽查的文章数(默认 3 篇,24h 内已翻译的随机样本)") ap.add_argument("--auth-user", default=os.environ.get("OWNER_USER", "owner"), help="owner 用户名(用于获取 JWT token,调 /api/v1/auth/login)") ap.add_argument("--auth-pass", default=os.environ.get("OWNER_PASS", ""), help="owner 密码(env: OWNER_PASS)。如不传,API 端点会降级为 info(不污染汇总)") ap.add_argument("--skip-auth", action="store_true", help="明确跳过 auth token,等价于不传 --auth-pass") args = ap.parse_args() global COMPOSE_DIR, API_BASE, SAMPLE_N, AUTH_TOKEN COMPOSE_DIR = args.compose_dir API_BASE = args.api_base SAMPLE_N = max(1, min(args.sample, 20)) # 1..20 封顶,避免误传爆 1000 # 提前在 main 函数顶部声明,稍后赋值后,GROUPS 里的 lambda 能读到 only = set((args.only or "").split(",")) - {""} skip = set((args.skip or "").split(",")) - {""} target = "local" if args.local else f"{args.user}@{args.host}:{args.port}" print(f"==== diary-news 健康检查 ====") print(f"目标: {target}") print(f"目录: {COMPOSE_DIR}") print(f"时间: {time.strftime('%Y-%m-%d %H:%M:%S %Z')}") print() remote = Remote(local=args.local, host=args.host, port=args.port, user=args.user, password=args.password) report = Report(target=target, started_at=time.strftime("%Y-%m-%dT%H:%M:%S%z")) # ===== 拿 owner token(可选)===== # 必须无条件初始化:即使跳过了 login,AUTH_TOKEN 也要在模块 dict 里, # 否则 GROUPS 里的 lambda 闭包查找时会 NameError。 global AUTH_TOKEN AUTH_TOKEN = "" if not args.skip_auth and args.auth_pass: # base64 编码密码再传,避免出现在 process list pw_b64 = base64.b64encode(args.auth_pass.encode("utf-8")).decode("ascii") login_url = f"{API_BASE.rstrip('/').removesuffix('/api/v1/healthz')}/api/v1/auth/login" login_cmd = ( f"PW_B64={pw_b64}; " "BODY=$(printf '{\"username\":\"%s\",\"password\":\"'\"$(echo $PW_B64 | base64 -d)\"'\"}' \"" + args.auth_user + "\"); " "curl -sS -m 8 -o /tmp/login_resp -w 'http=%{http_code}\\n' " f"-H 'Content-Type: application/json' -d \"$BODY\" '{login_url}'; " "echo '--- token (jwt header only) ---'; " "head -c 200 /tmp/login_resp 2>/dev/null; echo" ) rc, out, _ = remote.run(login_cmd, timeout=15) m = re.search(r"http=(\d+)", out) if m and m.group(1) == "200": try: body_str = out.rsplit("--- token (jwt header only) ---", 1)[-1].strip() body_str = body_str.rstrip("---").strip() resp = json.loads(body_str) AUTH_TOKEN = resp.get("access_token") or resp.get("accessToken") or resp.get("token") or "" except Exception as e: print(f" ⚠ auth: 解析响应失败 {e}") if AUTH_TOKEN: print(f" ✓ auth: 已登录 owner='{args.auth_user}', token 长度 {len(AUTH_TOKEN)}") else: code_str = m.group(1) if m else "?" print(f" ⚠ auth: 登录失败 http={code_str}, API 检查项将无 token(降级 info)") else: print(" · auth: 未传 --auth-pass(API 检查项将降级为 info 提示)") try: for group, fns in GROUPS.items(): if only and group not in only: continue if skip and group in skip: continue print(f"--- [{group}] ---") for name, fn in fns: try: c = fn(remote) if not args.quiet: report.add(c, verbose=args.verbose) else: report.checks.append(asdict(c)) except Exception as e: err_c = Check(name, group, False, f"异常: {e}", detail=f"type={type(e).__name__}\n{type(e).__doc__ or ''}", severity="error") if not args.quiet: report.add(err_c, verbose=args.verbose) else: report.checks.append(asdict(err_c)) print() finally: remote.close() report.finished_at = time.strftime("%Y-%m-%dT%H:%M:%S%z") ok, bad, err = report.summary() print(f"==== 汇总 ====") print(f" 合计 {len(report.checks)} 项 · 通过 {ok} · 失败 {bad} · 严重错误 {err}") if err > 0: print(f" ✗ 存在 {err} 个 error 级问题,建议立即排查") code = 2 elif bad > 0: print(f" ⚠ 存在 {bad} 个 warn 级问题,建议看一下") code = 1 else: print(f" ✓ 全部通过") code = 0 if args.json_out: with open(args.json_out, "w", encoding="utf-8") as f: json.dump(asdict(report), f, ensure_ascii=False, indent=2) print(f" 报告已写入: {args.json_out}") return code if __name__ == "__main__": sys.exit(main())