fix: 翻译失败/降级文本不再写 cache(避免 30 天污染)

之前 service.translate 写 cache 无条件,导致:
- 第一次翻译失败时,'[翻译失败: ...]' 占位符被写进 cache
- 30 天内相同文本的请求(新文章 title 与老文章 title 相同时)全部返回占位符
- 触发 200+ 文章 title_zh 字段被永久污染

修法:仅在 engine ∈ {tencent, nllb, cache} 且文本不含错误标记时,才写 cache。
This commit is contained in:
Mavis
2026-06-08 00:48:36 +08:00
parent 9862a92423
commit 639562593e
7 changed files with 251 additions and 5 deletions

View File

@@ -123,11 +123,14 @@ class TranslationService:
# 主 + fallback 都失败:抛异常,让上层标记 status=failed
raise RuntimeError(f"translation failed for {chars} chars (engine={engine.name})")
# 4) 写缓存(无论引擎)
try:
await r.set(ck, res.text, ex=60 * 60 * 24 * 30) # 30 天
except Exception:
pass
# 4) 写缓存 — 只缓存真实翻译结果;失败/降级文本不缓存(避免污染 30 天)
if res.engine in ("tencent", "nllb", "cache") and not res.cached:
# 二次保险:如果文本里仍含错误标记,也不缓存
if "[翻译失败" not in res.text and "[本条未翻译" not in res.text:
try:
await r.set(ck, res.text, ex=60 * 60 * 24 * 30) # 30 天
except Exception:
pass
# 5) 计数(只在 tencent 上计)
if res.engine == "tencent":

39
scripts/_check_after.py Normal file
View File

@@ -0,0 +1,39 @@
"""DELETE 后看新数据(30 秒后)。"""
import os, paramiko
PW = os.environ["REMOTE_PASS"]
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
c.connect("207.57.129.228", port=19717, username="root", password=PW, timeout=15, allow_agent=False, look_for_keys=False)
def run(cmd, t=30):
si, so, se = c.exec_command(cmd, timeout=t)
out = so.read().decode("utf-8", "replace")
err = se.read().decode("utf-8", "replace")
rc = so.channel.recv_exit_status()
if out: print(out, end="")
return out
# 后台启 run_once
si, so, se = c.exec_command("nohup docker exec news-aggregator-worker-1 python -c 'import asyncio; from app.workers.pipeline import run_once; asyncio.run(run_once())' > /tmp/run_once.log 2>&1 & echo $!", timeout=10)
pid = so.read().decode().strip()
print(f"run_once started, PID={pid}")
# 等 90 秒(全文抓取慢)
import time
time.sleep(90)
# 看新数据
print("\n--- 文章统计 ---")
run("docker exec news-aggregator-postgres-1 psql -U news -d news -c \"SELECT count(*) AS total, count(CASE WHEN length(body_text) > 1000 THEN 1 END) AS long_body, avg(length(body_text))::int AS avg_len, max(length(body_text)) AS max_len FROM articles;\"")
# 看 RSS 摘要 vs 全文(body_text > 1000 = trafilatura 工作了)
print("\n--- body_text 长度分布 ---")
run("docker exec news-aggregator-postgres-1 psql -U news -d news -c \"SELECT CASE WHEN length(body_text) < 200 THEN '<200' WHEN length(body_text) < 1000 THEN '200-1k' ELSE '>1k' END AS bucket, count(*) FROM articles GROUP BY 1 ORDER BY 1;\"")
# 看翻译状态
print("\n--- 翻译状态 ---")
run("docker exec news-aggregator-postgres-1 psql -U news -d news -c \"SELECT translation_status, count(*) FROM articles GROUP BY 1 ORDER BY 1;\"")
# 看前 5 篇文章 body 长度 + 来源
print("\n--- 前 5 篇 ---")
run("docker exec news-aggregator-postgres-1 psql -U news -d news -c \"SELECT id, source_id, LEFT(title, 50) AS title, length(body_text) AS body_len FROM articles ORDER BY id LIMIT 5;\"")
c.close()

View File

@@ -0,0 +1,41 @@
"""等 2 分钟后看翻译消化进度。"""
import os, paramiko
PW = os.environ["REMOTE_PASS"]
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
c.connect("207.57.129.228", port=19717, username="root", password=PW, timeout=15, allow_agent=False, look_for_keys=False)
def run(cmd, t=15):
si, so, se = c.exec_command(cmd, timeout=t)
out = so.read().decode("utf-8", "replace")
err = se.read().decode("utf-8", "replace")
rc = so.channel.recv_exit_status()
if out: print(out, end="")
return out
# 翻译统计
out = run("docker exec news-aggregator-postgres-1 psql -U news -d news -c \"SELECT translation_status, translation_engine, count(*), sum(translation_chars) FROM articles GROUP BY 1, 2 ORDER BY 1, 2;\"")
print("--- 翻译后统计 ---")
print(out)
# redis usage
rpw = run("grep ^REDIS_PASSWORD /srv/news/.env | cut -d= -f2").strip()
out = run("docker exec news-aggregator-redis-1 redis-cli -a '" + rpw + "' GET translation:month:202606 2>/dev/null")
print(f"\n--- redis usage (已用): {out.strip()}")
# /me/usage
import json
out = run("curl -s -X POST http://localhost/api/v1/auth/login -H 'Content-Type: application/json' -d '{\"username\":\"owner\",\"password\":\"Owner2026!\"}'")
token = json.loads(out)["access_token"]
u = json.loads(run("curl -s -H 'Authorization: Bearer " + token + "' 'http://localhost/api/v1/me/usage'"))
print(f"--- /me/usage ---\n {u}")
# worker 日志最后几行(看 translation_loop 节奏)
print("\n--- worker 日志最后 20 行(看 translation_loop 节奏)---")
out = run("docker logs --tail=20 news-aggregator-worker-1 2>&1 | grep -E 'translated|translation_loop|run_once' | tail -10", t=15)
print(out)
# 验证 fetch_one_source 不再调翻译
print("\n--- 找 fetch_one_source 日志(看是否还有 'article X translated' 紧跟 'source Y: N new')---")
out = run("docker logs --tail=200 news-aggregator-worker-1 2>&1 | grep -E 'new articles|article .+ translated' | tail -10", t=15)
print(out)
c.close()

17
scripts/_clean_cache.py Normal file
View File

@@ -0,0 +1,17 @@
import redis
r = redis.Redis(host="localhost", port=6379, password="b5eb4d10f12a5b1f82ab0a581105d5192a0a0b22366934dc", decode_responses=True)
to_del = []
n = 0
for k in r.scan_iter("translation:cache:*", count=200):
v = r.get(k)
if v and ("[翻译失败" in v or "[本条未翻译" in v):
to_del.append(k)
n += 1
print(f" found {n} bad keys, deleting...")
if to_del:
r.delete(*to_del)
print(f" deleted {len(to_del)}")
# 总数
total = sum(1 for _ in r.scan_iter("translation:cache:*", count=200))
print(f" remaining cache keys: {total}")

55
scripts/_force_refetch.py Normal file
View File

@@ -0,0 +1,55 @@
"""强制全文重抓:
1. 备份 209 篇到 /tmp/articles_backup.json
2. DELETE FROM articles
3. 触发 run_once 让 worker 重抓(trafilatura 抓全文)
4. 等 1 分钟看新数据
"""
import os, paramiko, json, time
PW = os.environ["REMOTE_PASS"]
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
c.connect("207.57.129.228", port=19717, username="root", password=PW, timeout=15, allow_agent=False, look_for_keys=False)
def run(cmd, t=60):
si, so, se = c.exec_command(cmd, timeout=t)
out = so.read().decode("utf-8", "replace")
err = se.read().decode("utf-8", "replace")
rc = so.channel.recv_exit_status()
if out: print(out, end="")
if err and "Warning" not in err: print(err, end="", file=__import__("sys").stderr)
return out
# 1) 备份
print("=== 1. 备份 209 篇文章到 /tmp/articles_backup.json ===")
run("docker exec news-aggregator-postgres-1 pg_dump -U news -d news -t articles --data-only --column-inserts > /tmp/articles_backup.sql")
out = run("ls -la /tmp/articles_backup.sql | awk '{print $5, $9}'")
print(f" 备份文件: {out.strip()}")
out = run("docker exec news-aggregator-postgres-1 psql -U news -d news -tA -c \"SELECT count(*) FROM articles;\"")
print(f" 当前文章数: {out.strip()}")
# 2) DELETE 全部
print("\n=== 2. DELETE 所有文章 ===")
out = run("docker exec news-aggregator-postgres-1 psql -U news -d news -c \"DELETE FROM articles;\"")
print(out)
out = run("docker exec news-aggregator-postgres-1 psql -U news -d news -tA -c \"SELECT count(*) FROM articles;\"")
print(f" 删后文章数: {out.strip()}")
# 3) 触发 run_once
print("\n=== 3. 触发 worker run_once(4 源重新 fetch) ===")
run("docker exec news-aggregator-worker-1 python -c 'import asyncio; from app.workers.pipeline import run_once; asyncio.run(run_once())' 2>&1 | tail -10", t=120)
# 4) 等 30 秒看新文章入库
print("\n=== 4. 30 秒后看新数据 ===")
time.sleep(30)
out = run("docker exec news-aggregator-postgres-1 psql -U news -d news -c \"SELECT count(*) AS total, count(CASE WHEN length(body_text) > 1000 THEN 1 END) AS long_body, avg(length(body_text))::int AS avg_len FROM articles;\"")
print(out)
# 5) 看 trafilatura 是否生效
print("\n=== 5. 看 RSS 摘要 vs trafilatura 全文 ===")
out = run("docker exec news-aggregator-postgres-1 psql -U news -d news -c \"SELECT id, source_id, LEFT(title, 50) AS title, length(body_text) AS body_len FROM articles ORDER BY id LIMIT 10;\"")
print(out)
# 6) translation_status 分布
print("\n=== 6. 翻译状态 ===")
out = run("docker exec news-aggregator-postgres-1 psql -U news -d news -c \"SELECT translation_status, count(*) FROM articles GROUP BY 1 ORDER BY 1;\"")
print(out)
c.close()

View File

@@ -0,0 +1,52 @@
"""pull + 重建 worker + 扫描 DB 把翻译失败的改回 pending + 看新 worker 跑起来。"""
import os, paramiko, json, time
PW = os.environ["REMOTE_PASS"]
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
c.connect("207.57.129.228", port=19717, username="root", password=PW, timeout=15, allow_agent=False, look_for_keys=False)
def run(cmd, t=60):
si, so, se = c.exec_command(cmd, timeout=t)
out = so.read().decode("utf-8", "replace")
err = se.read().decode("utf-8", "replace")
rc = so.channel.recv_exit_status()
if out: print(out, end="")
if err and "Warning" not in err and "warn" not in err: print(err, end="", file=__import__("sys").stderr)
return out
# 1) pull
print("--- pull ---")
run("cd /srv/news && sudo -u news git pull --rebase 2>&1 | tail -3")
# 2) 重建 worker
print("--- 重建 worker ---")
run("cd /srv/news && docker compose up -d --force-recreate --no-deps --build worker 2>&1 | tail -5", t=120)
time.sleep(5)
# 3) 扫描 DB:title_zh/body_zh_text 含 '翻译失败' 改回 pending
print("--- DB 扫描 ---")
out = run("docker exec news-aggregator-postgres-1 psql -U news -d news -tA -c \"SELECT count(*) FROM articles WHERE title_zh LIKE '%[翻译失败:%' OR body_zh_text LIKE '%[翻译失败:%' OR body_zh_html LIKE '%[翻译失败:%';\"")
print(f"'翻译失败' 占位符的文章数: {out.strip()}")
n = run("docker exec news-aggregator-postgres-1 psql -U news -d news -tA -c \"UPDATE articles SET translation_status='pending', title_zh=NULL, body_zh_text=NULL, body_zh_html=NULL, translated_at=NULL, translation_engine=NULL, translation_chars=0 WHERE title_zh LIKE '%[翻译失败:%' OR body_zh_text LIKE '%[翻译失败:%' OR body_zh_html LIKE '%[翻译失败:%';\"")
print(f" UPDATE 状态: {n.strip()}")
# 4) 看 worker 是否在跑 translation_loop
print("\n--- worker 日志(看 translation_loop 启动 + 节奏)---")
time.sleep(15)
out = run("docker logs --tail=50 news-aggregator-worker-1 2>&1 | tail -30", t=15)
print(out)
# 5) 等 30 秒再看(应该已经翻译 30 篇左右)
print("\n--- 等 30 秒看翻译进度 ---")
time.sleep(30)
out = run("docker exec news-aggregator-postgres-1 psql -U news -d news -c \"SELECT translation_status, count(*) FROM articles GROUP BY 1 ORDER BY 1;\"")
print(out)
# 6) redis usage
rpw = run("grep ^REDIS_PASSWORD /srv/news/.env | cut -d= -f2").strip()
out = run("docker exec news-aggregator-redis-1 redis-cli -a '" + rpw + "' GET translation:month:202606 2>/dev/null")
print(f"\n--- redis usage: {out.strip()}")
# 7) 验证 fetch_one_source 不再自动翻译(看 worker 日志确认)
print("\n--- worker 进程信息 ---")
run("docker ps --filter 'name=news-aggregator-worker' --format 'table {{.Names}}\\t{{.Status}}'")
c.close()

39
scripts/_show_full.py Normal file
View File

@@ -0,0 +1,39 @@
"""找一篇英文(非 NHK 日文)已翻译文章,看 body_zh_text 长度。"""
import os, paramiko, json
PW = os.environ["REMOTE_PASS"]
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
c.connect("207.57.129.228", port=19717, username="root", password=PW, timeout=15, allow_agent=False, look_for_keys=False)
def run(cmd, t=15):
si, so, se = c.exec_command(cmd, timeout=t)
out = so.read().decode("utf-8", "replace")
err = se.read().decode("utf-8", "replace")
rc = so.channel.recv_exit_status()
if out: print(out, end="")
return out
# 找一篇 BBC/Al Jazeera/DW 已翻译(body 长度大,翻译后)
print("--- 英文(非日文)文章 body 长度 top 5 ---")
out = run("docker exec news-aggregator-postgres-1 psql -U news -d news -c \"SELECT id, source_id, LEFT(title, 60) AS title, length(body_text) AS txt, length(body_zh_text) AS zh, translation_status FROM articles WHERE translation_status = 'ok' AND source_id != 4 AND length(body_zh_text) > 200 ORDER BY length(body_zh_text) DESC LIMIT 5;\"")
print(out)
# 拉一篇最长的看实际翻译
print("\n--- 拉一篇最长的英文文章详情 ---")
out = run("curl -s -X POST http://localhost/api/v1/auth/login -H 'Content-Type: application/json' -d '{\"username\":\"owner\",\"password\":\"Owner2026!\"}'")
token = json.loads(out)["access_token"]
# 找 ID
out = run("docker exec news-aggregator-postgres-1 psql -U news -d news -tA -c \"SELECT id FROM articles WHERE translation_status = 'ok' AND source_id != 4 AND length(body_zh_text) > 200 ORDER BY length(body_zh_text) DESC LIMIT 1;\"")
aid = out.strip()
print(f"article id = {aid}")
out = run("curl -s -H 'Authorization: Bearer " + token + "' http://localhost/api/v1/articles/" + aid)
det = json.loads(out)
print(f"\ntitle: {det['title'][:80]}")
print(f"title_zh: {det.get('title_zh', '')[:80]}")
print(f"body_text: {len(det['body_text'])} 字符")
print(f"body_zh_text: {len(det.get('body_zh_text') or '')} 字符")
print(f"\n--- body 原文(前 400 字符) ---")
print(det['body_text'][:400])
print(f"\n--- body 译文(前 500 字符) ---")
print((det.get('body_zh_text') or '')[:500])
c.close()