Files
diary-news/backend/app/scripts/clean_translations.py
xiaji 8dccf08126 feat(translate): 增加译文清洗 — pipeline 接入源头防御 + 批量清洗历史脚本
- 新增 app/services/translation/text_clean.py
  clean_markdown_asterisks / clean_html_inner_text / wrap_html 共用工具,
  清洗 LLM 输出残留的 ** / * / *** markdown 标记
- 改 pipeline.translate_article: 写库前清洗 tr_title/tr_body,
  新翻译不再带 **;同时把私有 _wrap_html 替换为公开 wrap_html
- 新增 app/scripts/clean_translations.py
  批量清洗历史脏数据 — 5 字段(title_zh/body_zh_text/body_zh_html/
  body_zh_formatted/summary_zh),支持 dry-run/limit/source-slug/field
2026-06-16 22:12:45 +08:00

338 lines
12 KiB
Python

"""批量清洗历史译文里的 markdown 星号标记(`**` / `*` / `***`)。
背景:
- LLM 翻译时偶尔把 markdown 加粗标记原样带进译文,前端展示就成了 `**FBI**局长` 这种脏数据
- 已发现的脏模式(LLM 输出不严谨):
**FBI**局长**卡什·帕特尔**表示,该机构挫败了一起针对周日发生在**白宫南草坪**的**UFC**活动的阴谋。
- 期望:
FBI局长卡什·帕特尔表示,该机构挫败了一起针对周日发生在白宫南草坪的UFC活动的阴谋。
清洗规则(详见 app.services.translation.text_clean):
***text*** -> text (粗+斜)
**text** -> text (粗) — 循环直到稳定,处理连续多对
*text* -> text (斜)
兜底:所有零散 `*` 一律删除(应对 LLM 输出错位)
字段清洗范围:
- title_zh 直接清洗
- body_zh_text 直接清洗
- body_zh_html 用清洗后的 body_zh_text 重新 wrap(保证一致)
- body_zh_formatted BeautifulSoup 解析,只清洗文本节点(保留 diary-para class + style)
- summary_zh 直接清洗
不动:
- commentary / commentary_meituan(评论是另外的 LLM 产物,不在"翻译"范畴)
- translation_status / translation_engine(状态/引擎标记本身没错)
工作流程:
1. 在 pipeline.py 的 translate_article() 写库前已接入防御性清洗(commit 后的 worker 不会再产生 `**`)
2. 这个脚本专门清理历史脏数据 — 跑一次即可,后续靠 pipeline 端防御
用法(在 worker 容器里):
# 1) 先 dry-run 看看会改多少 / 样本对比
docker compose exec worker python -m app.scripts.clean_translations --dry-run --show-samples 5
# 2) 全量清洗
docker compose exec worker python -m app.scripts.clean_translations
# 3) 分批:每次 200 篇,避免一次性锁太久
docker compose exec worker python -m app.scripts.clean_translations --batch-size 200
# 4) 只清洗某个源
docker compose exec worker python -m app.scripts.clean_translations \\
--source-slug bbc-world
# 5) 只清洗某个字段(排错用)
docker compose exec worker python -m app.scripts.clean_translations \\
--field body_zh_text --limit 10 --dry-run
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import sys
from typing import Iterable
from sqlalchemy import or_, select
from app.database import AsyncSessionLocal
from app.models.article import Article
from app.models.source import Source
from app.services.translation.text_clean import (
clean_html_inner_text,
clean_markdown_asterisks,
wrap_html,
)
logger = logging.getLogger("news.scripts.clean_translations")
# === SQL 过滤:任一翻译字段含 * 就捞出来 ===
# LIKE 里的 `*` 是字面字符(只有 `%` 和 `_` 才是 wildcard),无需 escape
_ASTERISK_COLUMNS = (
Article.title_zh,
Article.body_zh_text,
Article.body_zh_html,
Article.body_zh_formatted,
Article.summary_zh,
)
def _scan_sql(*, source_slug: str | None):
"""构造扫描 SQL。"""
stmt = select(Article).where(
or_(*(col.like("%*%") for col in _ASTERISK_COLUMNS))
).order_by(Article.id.asc())
if source_slug:
# 注意:Source filter 在 main() 里手动处理(下面 async 块),
# 因为这里不能 await source 查询 — 简单做法是先查 source id 再拼 where
return stmt, source_slug
return stmt, None
async def fetch_candidates(
*, source_slug: str | None, batch_size: int | None
) -> list[Article]:
"""从 DB 捞出含 * 的文章列表(轻量 — 不读全文,后面再按需读)。"""
async with AsyncSessionLocal() as session:
stmt, slug = _scan_sql(source_slug=source_slug)
if slug:
src = (
await session.execute(select(Source).where(Source.slug == slug))
).scalar_one_or_none()
if not src:
print(f"!! source_slug '{slug}' 不存在", file=sys.stderr)
return []
stmt = stmt.where(Article.source_id == src.id)
if batch_size is not None:
stmt = stmt.limit(batch_size)
rows = (await session.execute(stmt)).scalars().all()
return list(rows)
# === 清洗单篇文章 ===
def clean_one(
art: Article, *, fields: set[str]
) -> dict[str, tuple[int, int]]:
"""清洗一篇文章的指定字段,返回 {field: (before_len, after_len)}。
fields 是允许清洗的字段集合(由 --field 参数控制)。
"""
changes: dict[str, tuple[int, int]] = {}
# --- title_zh ---
if "title_zh" in fields and art.title_zh:
cleaned = clean_markdown_asterisks(art.title_zh)
if cleaned != art.title_zh:
changes["title_zh"] = (len(art.title_zh), len(cleaned or ""))
art.title_zh = cleaned or None
# --- body_zh_text ---
body_text_changed = False
if "body_zh_text" in fields and art.body_zh_text:
cleaned = clean_markdown_asterisks(art.body_zh_text)
if cleaned != art.body_zh_text:
changes["body_zh_text"] = (len(art.body_zh_text), len(cleaned or ""))
art.body_zh_text = cleaned or None
body_text_changed = True
# --- body_zh_html ---
# 策略:用清洗后的 body_zh_text 重新 wrap(保证两个字段一致)
# 触发条件:body_zh_text 被改 / 或原 body_zh_html 本身含 * 但 body_zh_text 不含(老数据不一致)
if "body_zh_html" in fields:
if body_text_changed:
new_html = wrap_html(art.body_zh_text)
old_html = art.body_zh_html or ""
if (new_html or "") != old_html:
changes["body_zh_html"] = (len(old_html), len(new_html or ""))
art.body_zh_html = new_html
elif art.body_zh_html and "*" in art.body_zh_html:
# body_zh_text 没改但 body_zh_html 仍含 *:BeautifulSoup 兜底清洗
cleaned_html = clean_html_inner_text(art.body_zh_html)
if cleaned_html and cleaned_html != art.body_zh_html:
changes["body_zh_html"] = (
len(art.body_zh_html), len(cleaned_html),
)
art.body_zh_html = cleaned_html
# --- body_zh_formatted ---
# 不能直接重新生成(会丢 diary-para class + 内联 style),只能用 BS 清洗文本节点
if (
"body_zh_formatted" in fields
and art.body_zh_formatted
and "*" in art.body_zh_formatted
):
cleaned = clean_html_inner_text(art.body_zh_formatted)
if cleaned and cleaned != art.body_zh_formatted:
changes["body_zh_formatted"] = (
len(art.body_zh_formatted), len(cleaned),
)
art.body_zh_formatted = cleaned
# --- summary_zh ---
if "summary_zh" in fields and art.summary_zh:
cleaned = clean_markdown_asterisks(art.summary_zh)
if cleaned != art.summary_zh:
changes["summary_zh"] = (len(art.summary_zh), len(cleaned or ""))
art.summary_zh = cleaned or None
return changes
# === 主流程 ===
ALL_FIELDS = ("title_zh", "body_zh_text", "body_zh_html", "body_zh_formatted", "summary_zh")
async def commit_batch(arts: list[Article]) -> None:
"""批量提交(避免每条都 round-trip)。"""
if not arts:
return
async with AsyncSessionLocal() as session:
for art in arts:
await session.merge(art)
await session.commit()
def _preview_diff(art: Article, fields: set[str]) -> str:
"""打印一条样本的清洗前后对比(dry-run 用)。"""
lines: list[str] = []
art_copy_title = clean_markdown_asterisks(art.title_zh) if (
"title_zh" in fields and art.title_zh
) else art.title_zh
art_copy_body = clean_markdown_asterisks(art.body_zh_text) if (
"body_zh_text" in fields and art.body_zh_text
) else art.body_zh_text
if art.title_zh and art_copy_title != art.title_zh:
lines.append(f" TITLE before: {art.title_zh!r}")
lines.append(f" TITLE after : {art_copy_title!r}")
if art.body_zh_text and art_copy_body != art.body_zh_text:
body_before = art.body_zh_text
body_after = art_copy_body
# 只截前 200 字符(样本,避免刷屏)
if len(body_before) > 200 or len(body_after) > 200:
lines.append(f" BODY before: {body_before[:200]!r}...")
lines.append(f" BODY after : {body_after[:200]!r}...")
else:
lines.append(f" BODY before: {body_before!r}")
lines.append(f" BODY after : {body_after!r}")
return "\n".join(lines)
async def main() -> int:
p = argparse.ArgumentParser(
description="批量清洗历史译文里的 markdown 星号标记(** / * / ***)",
)
p.add_argument(
"--dry-run",
action="store_true",
help="只打印待改列表 + 样本对比,不动数据库",
)
p.add_argument(
"--limit",
type=int,
default=None,
help="最多处理多少条文章(分批用,避免一次性锁太久)",
)
p.add_argument(
"--batch-size",
type=int,
default=None,
help="从 DB 一次捞多少条(默认 = limit, 即一次全部)",
)
p.add_argument(
"--source-slug",
default=None,
help="限定某个采集源(按 slug)",
)
p.add_argument(
"--field",
choices=ALL_FIELDS + ("all",),
default="all",
help="只清洗指定字段(排错用;默认 all = 清洗全部 5 个字段)",
)
p.add_argument(
"--show-samples",
type=int,
default=0,
metavar="N",
help="dry-run 时额外打印前 N 条样本的清洗前后对比",
)
p.add_argument(
"--show-stats",
action="store_true",
help="按字段分组统计改动次数 + 总字符差",
)
args = p.parse_args()
# --- 字段白名单 ---
if args.field == "all":
fields = set(ALL_FIELDS)
else:
fields = {args.field}
# --- 扫描 ---
fetch_limit = args.batch_size if args.batch_size is not None else args.limit
candidates = await fetch_candidates(
source_slug=args.source_slug, batch_size=fetch_limit,
)
if not candidates:
print("✅ 没找到含 `*` 标记的文章,无需清洗")
return 0
print(
f"找到 {len(candidates)} 条含 `*` 标记的文章"
f"(mode={'DRY-RUN' if args.dry_run else 'EXECUTE'}, "
f"fields={sorted(fields)}, source={args.source_slug or 'ALL'})"
)
# --- 处理 ---
field_stats: dict[str, int] = {f: 0 for f in ALL_FIELDS}
field_char_delta: dict[str, int] = {f: 0 for f in ALL_FIELDS}
dirty: list[Article] = [] # 有改动的文章(dry-run 用)
modified_arts: list[Article] = [] # 真正要 commit 的
for art in candidates:
changes = clean_one(art, fields=fields)
if not changes:
# scan SQL 命中 LIKE '%*%' 但清洗函数判定无变化(极少见,
# 比如 * 在 escaped 位置) — 跳过
continue
for field, (before_len, after_len) in changes.items():
field_stats[field] += 1
field_char_delta[field] += before_len - after_len # 正数 = 字符减少
dirty.append(art)
if not args.dry_run:
modified_arts.append(art)
if args.show_samples and len(dirty) <= args.show_samples:
print(f"\n[样本 #{len(dirty)}] id={art.id} source_id={art.source_id}")
print(_preview_diff(art, fields))
# --- 提交 ---
if not args.dry_run:
await commit_batch(modified_arts)
# --- 统计输出 ---
changed_n = len(dirty)
print(f"\n实际改动: {changed_n} 条 (mode={'DRY-RUN' if args.dry_run else 'EXECUTE'})")
if args.show_stats:
print("\n按字段统计:")
for field in ALL_FIELDS:
n = field_stats[field]
delta = field_char_delta[field]
if n == 0:
continue
print(f" {field:<22} {n:>5} 条 字符差 {delta:+d}")
if args.dry_run:
print(f"\n[DRY-RUN] 共 {changed_n} 条待改,实际未改动。去掉 --dry-run 真正执行。")
else:
print(f"\n✅ 已清洗 {changed_n} 条,提交完毕。")
return 0
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
sys.exit(asyncio.run(main()))