Files
diary-news/scripts/fix_enrich_loop.py

267 lines
11 KiB
Python
Raw Normal View History

2026-06-11 17:24:46 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""一键修复 diary-news 的 enrichment_loop bug。
真因:`services/llm/enrichment.py:405-419` 的查询用
`order by id ASC + limit 160 + 内存过滤 status`,
- 162 篇最早的文章已经被 enrich (4 status ok)
- 662 n/a 的文章 id > 388354, 160 limit 之外
- 每次 while True 循环都看到这 162 篇已 ok,filter 命中 0 todo=0 continue 死循环
修法: where 条件改成"任一 LLM 状态 != 'ok'",精准定位待 enrich 的文章
用法:
$env:REMOTE_PASS = '<root 密码>'
python scripts/fix_enrich_loop.py [--host ...] [--port ...] [--user ...] [--compose-dir ...] [--wait 120]
退出码:
0 = 修复 + enrich 跑起来( n/a ok 的变化)
1 = 修复但 enrich 未观察到跑(可能是 0 候选 / LLM 调不通)
2 = 部署失败
"""
from __future__ import annotations
import argparse
import base64
import os
import sys
import time
import paramiko
# ============== 修复后的 enrichment.py 关键段(只动 where/limit) ==============
# 完整文件本地读,然后原样上传
ENRICHMENT_PY_LOCAL = r"D:\selftools\diary-news\backend\app\services\llm\enrichment.py"
# ============== 配置 ==============
DEFAULT_HOST = "207.57.129.228"
DEFAULT_PORT = 19717
DEFAULT_USER = "root"
DEFAULT_COMPOSE = "/srv/news"
DEFAULT_WAIT_SEC = 120 # 等几分钟看 enrich 是否在跑
def ssh_exec(c: paramiko.SSHClient, cmd: str, timeout: int = 300) -> tuple[int, str, str]:
"""执行远程命令,返回 (rc, stdout, stderr)。出错抛 SSHException。"""
si, so, se = c.exec_command(cmd, timeout=timeout, get_pty=True)
out = so.read().decode(errors="replace")
err = se.read().decode(errors="replace")
rc = so.channel.recv_exit_status()
return rc, out, err
def put_file(c: paramiko.SSHClient, remote_path: str, content_bytes: bytes) -> None:
"""把本地文件原样传到 remote_path(用 base64 避开 shell quoting)。"""
b64 = base64.b64encode(content_bytes).decode("ascii")
cmd = (
f"bash -lc 'mkdir -p \"$(dirname {remote_path})\" && "
f"echo {b64} | base64 -d > {remote_path} && "
f"wc -l {remote_path}'"
)
rc, out, err = ssh_exec(c, cmd, timeout=60)
if rc != 0:
print(f"[stderr] {err.rstrip()}")
raise RuntimeError(f"put_file 失败 rc={rc}")
print(f" ✓ 写入 {remote_path} {out.strip()}")
def put_text_via_file(c: paramiko.SSHClient, remote_path: str, text: str) -> None:
"""用 base64 + heredoc 写文本(避免 shell 转义噩梦)。"""
put_file(c, remote_path, text.encode("utf-8"))
def get_text(c: paramiko.SSHClient, remote_path: str) -> str | None:
"""读远程文件,文件不存在返回 None。"""
cmd = f"bash -lc 'if [ -f {remote_path} ]; then cat {remote_path}; fi'"
rc, out, err = ssh_exec(c, cmd, timeout=30)
if rc != 0 or not out.strip():
return None
return out
# ============== 主流程 ==============
def main() -> int:
ap = argparse.ArgumentParser(description="一键修复 enrichment_loop bug")
ap.add_argument("--host", default=os.environ.get("REMOTE_HOST", DEFAULT_HOST))
ap.add_argument("--port", type=int, default=int(os.environ.get("REMOTE_PORT", DEFAULT_PORT)))
ap.add_argument("--user", default=os.environ.get("REMOTE_USER", DEFAULT_USER))
ap.add_argument("--password", default=os.environ.get("REMOTE_PASS", ""))
ap.add_argument("--compose-dir", default=os.environ.get("COMPOSE_DIR", DEFAULT_COMPOSE))
ap.add_argument("--wait", type=int, default=DEFAULT_WAIT_SEC,
help="部署后等多少秒再检查 enrich 跑了多少(默认 120)")
ap.add_argument("--no-build", action="store_true",
help="跳过 docker build(只重启容器,代码改动不会被采纳)")
ap.add_argument("--no-restart", action="store_true",
help="跳过 docker up -d(只复制代码 + build)")
ap.add_argument("--dry-run", action="store_true",
help="只比对文件,不改不重启")
ap.add_argument("--force-recreate", action="store_true",
help="服务器文件不存在时,直接创建新文件(不要求存在)")
args = ap.parse_args()
if not args.password:
print("ERROR: 需要 REMOTE_PASS 环境变量或 --password", file=sys.stderr)
return 2
print(f"==== 目标: {args.user}@{args.host}:{args.port} ====")
print(f"==== compose: {args.compose_dir} ====")
print(f"==== 修复前等待: {args.wait}s ====\n")
# 0) 读本地改完的 enrichment.py
if not os.path.exists(ENRICHMENT_PY_LOCAL):
print(f"ERROR: 本地文件不存在 {ENRICHMENT_PY_LOCAL}", file=sys.stderr)
return 2
with open(ENRICHMENT_PY_LOCAL, encoding="utf-8") as f:
local_content = f.read()
print(f"[1/6] 本地 enrichment.py {len(local_content)} 字符,{local_content.count(chr(10))}")
# 1) SSH 连
print(f"\n[2/6] 连接 SSH ...")
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
c.connect(args.host, port=args.port, username=args.user, password=args.password,
timeout=30, banner_timeout=30, auth_timeout=30,
allow_agent=False, look_for_keys=False)
except Exception as e:
print(f" ✗ SSH 连接失败: {e}")
return 2
print(f" ✓ SSH 连接成功")
remote_py = f"{args.compose_dir}/backend/app/services/llm/enrichment.py"
# 2) 拿服务器版本比对
print(f"\n[3/6] 比对服务器版 enrichment.py ...")
remote_content = get_text(c, remote_py)
if remote_content is None:
# 不直接 exit,先看一下 llm/ 目录到底有什么(.bak 备份可能在)
print(f" ! 服务器文件不存在: {remote_py}")
llm_dir = f"{args.compose_dir}/backend/app/services/llm/"
rc, ls_out, _ = ssh_exec(c, f"bash -lc 'ls -la {llm_dir} 2>&1'", timeout=15)
print(f"\n --- {llm_dir} 目录内容 ---")
print(ls_out.rstrip())
print(f"\n 解读:")
print(f" - 如果有 enrichment.py.bak.* 备份在 → 上次部署成功过,文件被外部操作清掉")
print(f" - 如果连 .bak 都没有 → llm/ 目录可能被整体删了,需要重新创建")
print(f" - 如果是 ENOENT 整个目录 → /srv/news/backend/app 目录有问题")
print(f"\n 建议 SSH 上去确认:")
print(f" ssh root@{args.host} -p {args.port}")
print(f" ls -la {args.compose_dir}/backend/app/services/llm/")
print(f" ls -la {args.compose_dir}/ # 看 backend/ 是否在")
print(f"\n 想要我重写脚本继续往下走(把本地版创建为新文件),加 --force-recreate 即可:")
print(f" python {sys.argv[0]} --force-recreate")
if not getattr(args, "force_recreate", False):
c.close()
return 2
print(f"\n --force-recreate 启用,直接创建新文件(不备份)")
# 创建父目录 + 写
put_text_via_file(c, remote_py, local_content)
print(f" ✓ 已在服务器创建 {remote_py}")
elif remote_content == local_content:
print(f" ✓ 服务器已是最新版本(无需重传)")
else:
if args.dry_run:
print(f" ! 服务器与本地不一致(差异 {len(remote_content)-len(local_content)} 字节),--dry-run 跳过覆盖")
c.close()
return 0
# 备份
ts = time.strftime("%Y%m%d_%H%M%S")
bak = f"{remote_py}.bak.{ts}"
rc, _, _ = ssh_exec(c, f"bash -lc 'cp -f {remote_py} {bak} && echo {bak}'", timeout=15)
if rc != 0:
print(f" ✗ 备份失败"); c.close(); return 2
print(f" ✓ 备份到 {bak}")
# 覆盖
put_text_via_file(c, remote_py, local_content)
print(f" ✓ 覆盖服务器 enrichment.py")
# 3) 重建 worker 镜像
if args.no_build:
print(f"\n[4/6] --no-build,跳过 docker build")
else:
print(f"\n[4/6] docker compose build worker ...")
rc, out, err = ssh_exec(c, f"cd {args.compose_dir} && docker compose build worker", timeout=600)
if rc != 0:
print(f" ✗ build 失败 rc={rc}")
print((out + err)[-2000:])
c.close()
return 2
# 只打最后 3 行(成功的标志)
tail = "\n".join((out + err).strip().splitlines()[-3:])
print(f" ✓ build 成功(尾行): {tail[:300]}")
# 4) 重启 worker
if args.no_restart:
print(f"\n[5/6] --no-restart,跳过 up -d")
else:
print(f"\n[5/6] docker compose up -d worker ...")
rc, out, err = ssh_exec(c, f"cd {args.compose_dir} && docker compose up -d worker", timeout=120)
if rc != 0:
print(f" ✗ 重启失败 rc={rc}")
print((out + err)[-2000:])
c.close()
return 2
# 找 "Container ... Started" 这一行
started = [l.strip() for l in (out + err).splitlines() if "Started" in l and "worker" in l]
print(f"{started[-1] if started else 'restarted'}")
# 5) 等 + 看 enrich 状态
print(f"\n[6/6] 等 {args.wait}s 让 worker 起来 + enrichment_loop 跑几批 ...")
time.sleep(10)
# 看 enrichment_loop 启动
rc, out, _ = ssh_exec(c, f"cd {args.compose_dir} && docker compose logs --tail=30 worker 2>&1 | grep -iE 'enrich|started' | head -10", timeout=20)
if "enrichment_loop started" in out:
print(" ✓ enrichment_loop 已启动")
else:
print(f" ! enrichment_loop 启动标志未找到,日志:")
for line in out.splitlines()[-5:]:
print(f" {line}")
# 等 wait 秒
print(f"\n 等待 {args.wait}s 让 enrich 真跑起来 ...")
time.sleep(args.wait)
# 6) 看 enrich_article 日志 + n/a 数量
rc, log_out, _ = ssh_exec(c, f"cd {args.compose_dir} && docker compose logs --tail=500 worker 2>&1 | grep -E 'enrich_article' | head -10", timeout=20)
enrich_count = len([l for l in log_out.splitlines() if "enrich_article" in l])
print(f"\n === enrich_article 日志: {enrich_count} 条 ===")
for line in log_out.splitlines()[:5]:
print(f" {line}")
# 当前 n/a 数量
rc, sql_out, _ = ssh_exec(c, f"cd {args.compose_dir} && docker compose exec -T postgres psql -U news -d news -c \"SELECT classify_status, count(*) FROM articles GROUP BY classify_status ORDER BY count(*) DESC;\"", timeout=30)
print(f"\n === 当前 classify_status 分布 ===")
print(sql_out.rstrip())
# 判结果
rc_ok = 0
if enrich_count == 0:
# 看是不是 n/a 数变了(说明 enrichment 跑了但 logger 没打 — 极少见)
m = re.search(r"\b(\d+)\s*\|\s*(\d+)\b", sql_out) # 粗略抓两个数
# 简单点:让用户自己看
print(f"\n ⚠ enrich_article 日志 0 条 — enrich 任务可能没在跑")
print(f" 排查:")
print(f" docker compose logs worker 2>&1 | grep -E 'enrich|ERROR' | tail -20")
rc_ok = 1
else:
# 看 n/a 数 - 跟 663 对比
n_a_match = re.search(r"n/a\s*\|\s*(\d+)", sql_out)
if n_a_match:
n_a = int(n_a_match.group(1))
if n_a < 663:
print(f"\n ✓ n/a 数从 663 降到 {n_a} — 修复成功,enrich 在跑")
else:
print(f"\n ⚠ n/a 数 {n_a} 没变(还在 663+),但有 enrich 日志 — 看具体错")
rc_ok = 1
c.close()
print(f"\n==== 结束 (rc={rc_ok}) ====")
return rc_ok
if __name__ == "__main__":
import re
sys.exit(main())