feat(scripts): 添加 retranslate_history 脚本,支持软/硬重译 + dry-run + 按源/数量过滤

This commit is contained in:
Mavis
2026-06-11 09:28:26 +08:00
parent d90c5955f5
commit 4b8d776aac

View File

@@ -0,0 +1,255 @@
"""批量重译历史文章。
用途:
- 翻译链切换后(智谱/星火/腾讯 TMT 顺序调整),让所有历史文章走新链重译
- 发现"翻译失败 / 译文明显退化 / 译文缺失"的文章,统一重跑
模式:
- soft(默认):只把 translation_status 改回 pending,worker 会按新链重译;
保留 LLM 排版 / 分类 / 评论 / 插图(避免无谓重跑 LLM)
- hard:显式清空所有译文相关字段 + 标 pending,排版/分类/插图/评论也清掉,
等于把这篇当新文章处理(慎用,会重跑 enrich)
判定"翻译是否失败"(is_bad_translation):
- status 字段层面: pending / failed / partial
- 内容层面启发式:
- title_zh 为空
- body_zh_text 为空
- 译文里有 [本条未翻译 / [翻译失败 等标记
- body_zh_text 跟 body_text 完全一样(疑似未翻)
- 长文译文里中文比例 < 30%(几乎没翻译)
用法(在 worker 容器里):
# 仅预览,不动
docker compose exec worker python -m app.scripts.retranslate_history --dry-run
# 软重译,全量
docker compose exec worker python -m app.scripts.retranslate_history --mode soft
# 软重译,先试 50 篇
docker compose exec worker python -m app.scripts.retranslate_history --limit 50
# 硬重译,限定源
docker compose exec worker python -m app.scripts.retranslate_history \\
--mode hard --source-slug bbc-world --limit 100
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import re
import sys
from typing import Iterable
from sqlalchemy import select
from app.database import AsyncSessionLocal
from app.models.article import Article
from app.models.source import Source
logger = logging.getLogger("news.scripts.retranslate_history")
# 译文"翻车"标记(从 service.py / tencent.py 残留 marker 推断)
_BAD_MARKERS = [
"[本条未翻译",
"[翻译失败",
"未翻译:",
"Translation failed",
"translation failed",
]
def _is_chinese_char(ch: str) -> bool:
"""粗略判断一个字符是否落在 CJK 范围。"""
cp = ord(ch)
return (
0x4E00 <= cp <= 0x9FFF # CJK Unified
or 0x3400 <= cp <= 0x4DBF # CJK Extension A
or 0x20000 <= cp <= 0x2A6DF # CJK Extension B
or 0x3040 <= cp <= 0x309F # Hiragana
or 0x30A0 <= cp <= 0x30FF # Katakana
or 0xFF00 <= cp <= 0xFFEF # Fullwidth
)
def _chinese_ratio(text: str) -> float:
if not text:
return 0.0
cnt = sum(1 for ch in text if _is_chinese_char(ch))
return cnt / max(1, len(text))
def is_bad_translation(art: Article) -> tuple[bool, str]:
"""判断一条文章是否需要重译。返回 (need, reason)。"""
# 1) 状态层面
if art.translation_status in ("pending", "failed", "partial"):
return True, f"status={art.translation_status}"
# 2) status=ok 但内容缺失(可能是写入失败,或者老数据没填)
if not art.title_zh or not art.title_zh.strip():
return True, "title_zh empty"
if not art.body_zh_text or not art.body_zh_text.strip():
return True, "body_zh_text empty"
# 3) 译文里有"翻车"标记
haystack = (art.title_zh or "") + "\n" + (art.body_zh_text or "")
for marker in _BAD_MARKERS:
if marker in haystack:
return True, f"contains marker '{marker}'"
# 4) 译文跟原文完全一样(疑似没翻)
# 仅在原文是英文/日文时判定;原文中文时不需重译
if art.body_text and art.body_zh_text:
same = art.body_zh_text.strip() == art.body_text.strip()
if same:
src_ratio = _chinese_ratio(art.body_text)
# 原文几乎无中文 = 几乎肯定是外文
if src_ratio < 0.05:
return True, "translation identical to source (likely untranslated)"
# 5) 译文"几乎全是英文/日文"(短文翻译失败回退到原文)
zh = art.body_zh_text
if len(zh) > 200 and _chinese_ratio(zh) < 0.30:
return True, f"translation low Chinese ratio ({_chinese_ratio(zh):.0%})"
return False, "ok"
async def scan_bad_articles(
*,
source_slug: str | None,
limit: int | None,
) -> list[tuple[Article, str]]:
"""扫描需要重译的文章,返回 (article, reason) 列表。"""
async with AsyncSessionLocal() as session:
stmt = select(Article).order_by(Article.id.asc())
if source_slug:
src = (
await session.execute(select(Source).where(Source.slug == source_slug))
).scalar_one_or_none()
if not src:
print(f"!! source_slug '{source_slug}' 不存在", file=sys.stderr)
return []
stmt = stmt.where(Article.source_id == src.id)
rows = (await session.execute(stmt)).scalars().all()
bad: list[tuple[Article, str]] = []
for art in rows:
need, reason = is_bad_translation(art)
if need:
bad.append((art, reason))
if limit is not None and len(bad) >= limit:
break
return bad
async def retranslate_articles(
articles: Iterable[Article],
*,
mode: str,
dry_run: bool,
) -> int:
"""把需要重译的文章状态改回 pending(soft/hard),等 worker 接手。
返回实际改动行数。
"""
if mode not in ("soft", "hard"):
print(f"!! 未知 mode '{mode}'(可选: soft / hard)", file=sys.stderr)
return 0
changed = 0
async with AsyncSessionLocal() as session:
for art in articles:
if dry_run:
print(
f"[DRY] id={art.id:>6} status={art.translation_status:<8} "
f"engine={art.translation_engine or '-':<12} source_id={art.source_id}"
)
changed += 1
continue
art.translation_status = "pending"
if mode == "hard":
# 硬重译:清空所有译文相关字段,等 worker 重新跑
art.title_zh = None
art.body_zh_text = None
art.body_zh_html = None
art.summary_zh = None
art.translation_engine = None
art.translation_chars = 0
art.translated_at = None
# 注:enrichment 字段(format/commentary/image_ai)故意保留 —
# 它们跟翻译链无关,清掉会浪费 LLM 调用
# 如果用户想要完全重跑,需要手动调 /admin/translation/rerun
# 提交(批量 commit,避免循环里 round-trip)
await session.flush()
changed += 1
if not dry_run:
await session.commit()
return changed
async def main() -> int:
p = argparse.ArgumentParser(
description="扫描并标记需要重译的历史文章",
)
p.add_argument(
"--mode",
choices=["soft", "hard"],
default="soft",
help="soft=只改 status 回 pending(默认);hard=清空所有译文相关字段",
)
p.add_argument(
"--source-slug",
default=None,
help="限定某个采集源(按 slug)",
)
p.add_argument(
"--limit",
type=int,
default=None,
help="最多处理多少条(用于分批,避免一次性塞爆队列)",
)
p.add_argument(
"--dry-run",
action="store_true",
help="只打印待重译列表,不动数据库",
)
p.add_argument(
"--show-stats",
action="store_true",
help="额外按 reason 分组统计(配合 --dry-run 使用)",
)
args = p.parse_args()
# 1) 扫描
bad = await scan_bad_articles(source_slug=args.source_slug, limit=args.limit)
if not bad:
print("✅ 没有发现需要重译的文章")
return 0
print(f"发现 {len(bad)} 条需要重译的文章(模式={args.mode}, source={args.source_slug or 'ALL'})")
if args.show_stats:
from collections import Counter
stats = Counter(reason for _, reason in bad)
for reason, cnt in stats.most_common():
print(f" - {reason:<48} {cnt:>4}")
# 2) 处理
articles = [art for art, _ in bad]
changed = await retranslate_articles(articles, mode=args.mode, dry_run=args.dry_run)
if args.dry_run:
print(f"\n[DRY-RUN] 共 {changed} 条,实际未改动。去掉 --dry-run 真正执行。")
else:
print(f"\n✅ 已将 {changed} 条标为 pending,等 worker 拉起重译。")
return 0
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
sys.exit(asyncio.run(main()))