"""批量清洗历史译文里的 markdown 星号标记(`**` / `*` / `***`)。 背景: - LLM 翻译时偶尔把 markdown 加粗标记原样带进译文,前端展示就成了 `**FBI**局长` 这种脏数据 - 已发现的脏模式(LLM 输出不严谨): **FBI**局长**卡什·帕特尔**表示,该机构挫败了一起针对周日发生在**白宫南草坪**的**UFC**活动的阴谋。 - 期望: FBI局长卡什·帕特尔表示,该机构挫败了一起针对周日发生在白宫南草坪的UFC活动的阴谋。 清洗规则(详见 app.services.translation.text_clean): ***text*** -> text (粗+斜) **text** -> text (粗) — 循环直到稳定,处理连续多对 *text* -> text (斜) 兜底:所有零散 `*` 一律删除(应对 LLM 输出错位) 字段清洗范围: - title_zh 直接清洗 - body_zh_text 直接清洗 - body_zh_html 用清洗后的 body_zh_text 重新 wrap(保证一致) - body_zh_formatted BeautifulSoup 解析,只清洗文本节点(保留 diary-para class + style) - summary_zh 直接清洗 不动: - commentary / commentary_meituan(评论是另外的 LLM 产物,不在"翻译"范畴) - translation_status / translation_engine(状态/引擎标记本身没错) 工作流程: 1. 在 pipeline.py 的 translate_article() 写库前已接入防御性清洗(commit 后的 worker 不会再产生 `**`) 2. 这个脚本专门清理历史脏数据 — 跑一次即可,后续靠 pipeline 端防御 用法(在 worker 容器里): # 1) 先 dry-run 看看会改多少 / 样本对比 docker compose exec worker python -m app.scripts.clean_translations --dry-run --show-samples 5 # 2) 全量清洗 docker compose exec worker python -m app.scripts.clean_translations # 3) 分批:每次 200 篇,避免一次性锁太久 docker compose exec worker python -m app.scripts.clean_translations --batch-size 200 # 4) 只清洗某个源 docker compose exec worker python -m app.scripts.clean_translations \\ --source-slug bbc-world # 5) 只清洗某个字段(排错用) docker compose exec worker python -m app.scripts.clean_translations \\ --field body_zh_text --limit 10 --dry-run """ from __future__ import annotations import argparse import asyncio import logging import sys from typing import Iterable from sqlalchemy import or_, select from app.database import AsyncSessionLocal from app.models.article import Article from app.models.source import Source from app.services.translation.text_clean import ( clean_html_inner_text, clean_markdown_asterisks, wrap_html, ) logger = logging.getLogger("news.scripts.clean_translations") # === SQL 过滤:任一翻译字段含 * 就捞出来 === # LIKE 里的 `*` 是字面字符(只有 `%` 和 `_` 才是 wildcard),无需 escape _ASTERISK_COLUMNS = ( Article.title_zh, Article.body_zh_text, Article.body_zh_html, Article.body_zh_formatted, Article.summary_zh, ) def _scan_sql(*, source_slug: str | None): """构造扫描 SQL。""" stmt = select(Article).where( or_(*(col.like("%*%") for col in _ASTERISK_COLUMNS)) ).order_by(Article.id.asc()) if source_slug: # 注意:Source filter 在 main() 里手动处理(下面 async 块), # 因为这里不能 await source 查询 — 简单做法是先查 source id 再拼 where return stmt, source_slug return stmt, None async def fetch_candidates( *, source_slug: str | None, batch_size: int | None ) -> list[Article]: """从 DB 捞出含 * 的文章列表(轻量 — 不读全文,后面再按需读)。""" async with AsyncSessionLocal() as session: stmt, slug = _scan_sql(source_slug=source_slug) if slug: src = ( await session.execute(select(Source).where(Source.slug == slug)) ).scalar_one_or_none() if not src: print(f"!! source_slug '{slug}' 不存在", file=sys.stderr) return [] stmt = stmt.where(Article.source_id == src.id) if batch_size is not None: stmt = stmt.limit(batch_size) rows = (await session.execute(stmt)).scalars().all() return list(rows) # === 清洗单篇文章 === def clean_one( art: Article, *, fields: set[str] ) -> dict[str, tuple[int, int]]: """清洗一篇文章的指定字段,返回 {field: (before_len, after_len)}。 fields 是允许清洗的字段集合(由 --field 参数控制)。 """ changes: dict[str, tuple[int, int]] = {} # --- title_zh --- if "title_zh" in fields and art.title_zh: cleaned = clean_markdown_asterisks(art.title_zh) if cleaned != art.title_zh: changes["title_zh"] = (len(art.title_zh), len(cleaned or "")) art.title_zh = cleaned or None # --- body_zh_text --- body_text_changed = False if "body_zh_text" in fields and art.body_zh_text: cleaned = clean_markdown_asterisks(art.body_zh_text) if cleaned != art.body_zh_text: changes["body_zh_text"] = (len(art.body_zh_text), len(cleaned or "")) art.body_zh_text = cleaned or None body_text_changed = True # --- body_zh_html --- # 策略:用清洗后的 body_zh_text 重新 wrap(保证两个字段一致) # 触发条件:body_zh_text 被改 / 或原 body_zh_html 本身含 * 但 body_zh_text 不含(老数据不一致) if "body_zh_html" in fields: if body_text_changed: new_html = wrap_html(art.body_zh_text) old_html = art.body_zh_html or "" if (new_html or "") != old_html: changes["body_zh_html"] = (len(old_html), len(new_html or "")) art.body_zh_html = new_html elif art.body_zh_html and "*" in art.body_zh_html: # body_zh_text 没改但 body_zh_html 仍含 *:BeautifulSoup 兜底清洗 cleaned_html = clean_html_inner_text(art.body_zh_html) if cleaned_html and cleaned_html != art.body_zh_html: changes["body_zh_html"] = ( len(art.body_zh_html), len(cleaned_html), ) art.body_zh_html = cleaned_html # --- body_zh_formatted --- # 不能直接重新生成(会丢 diary-para class + 内联 style),只能用 BS 清洗文本节点 if ( "body_zh_formatted" in fields and art.body_zh_formatted and "*" in art.body_zh_formatted ): cleaned = clean_html_inner_text(art.body_zh_formatted) if cleaned and cleaned != art.body_zh_formatted: changes["body_zh_formatted"] = ( len(art.body_zh_formatted), len(cleaned), ) art.body_zh_formatted = cleaned # --- summary_zh --- if "summary_zh" in fields and art.summary_zh: cleaned = clean_markdown_asterisks(art.summary_zh) if cleaned != art.summary_zh: changes["summary_zh"] = (len(art.summary_zh), len(cleaned or "")) art.summary_zh = cleaned or None return changes # === 主流程 === ALL_FIELDS = ("title_zh", "body_zh_text", "body_zh_html", "body_zh_formatted", "summary_zh") async def commit_batch(arts: list[Article]) -> None: """批量提交(避免每条都 round-trip)。""" if not arts: return async with AsyncSessionLocal() as session: for art in arts: await session.merge(art) await session.commit() def _preview_diff(art: Article, fields: set[str]) -> str: """打印一条样本的清洗前后对比(dry-run 用)。""" lines: list[str] = [] art_copy_title = clean_markdown_asterisks(art.title_zh) if ( "title_zh" in fields and art.title_zh ) else art.title_zh art_copy_body = clean_markdown_asterisks(art.body_zh_text) if ( "body_zh_text" in fields and art.body_zh_text ) else art.body_zh_text if art.title_zh and art_copy_title != art.title_zh: lines.append(f" TITLE before: {art.title_zh!r}") lines.append(f" TITLE after : {art_copy_title!r}") if art.body_zh_text and art_copy_body != art.body_zh_text: body_before = art.body_zh_text body_after = art_copy_body # 只截前 200 字符(样本,避免刷屏) if len(body_before) > 200 or len(body_after) > 200: lines.append(f" BODY before: {body_before[:200]!r}...") lines.append(f" BODY after : {body_after[:200]!r}...") else: lines.append(f" BODY before: {body_before!r}") lines.append(f" BODY after : {body_after!r}") return "\n".join(lines) async def main() -> int: p = argparse.ArgumentParser( description="批量清洗历史译文里的 markdown 星号标记(** / * / ***)", ) p.add_argument( "--dry-run", action="store_true", help="只打印待改列表 + 样本对比,不动数据库", ) p.add_argument( "--limit", type=int, default=None, help="最多处理多少条文章(分批用,避免一次性锁太久)", ) p.add_argument( "--batch-size", type=int, default=None, help="从 DB 一次捞多少条(默认 = limit, 即一次全部)", ) p.add_argument( "--source-slug", default=None, help="限定某个采集源(按 slug)", ) p.add_argument( "--field", choices=ALL_FIELDS + ("all",), default="all", help="只清洗指定字段(排错用;默认 all = 清洗全部 5 个字段)", ) p.add_argument( "--show-samples", type=int, default=0, metavar="N", help="dry-run 时额外打印前 N 条样本的清洗前后对比", ) p.add_argument( "--show-stats", action="store_true", help="按字段分组统计改动次数 + 总字符差", ) args = p.parse_args() # --- 字段白名单 --- if args.field == "all": fields = set(ALL_FIELDS) else: fields = {args.field} # --- 扫描 --- fetch_limit = args.batch_size if args.batch_size is not None else args.limit candidates = await fetch_candidates( source_slug=args.source_slug, batch_size=fetch_limit, ) if not candidates: print("✅ 没找到含 `*` 标记的文章,无需清洗") return 0 print( f"找到 {len(candidates)} 条含 `*` 标记的文章" f"(mode={'DRY-RUN' if args.dry_run else 'EXECUTE'}, " f"fields={sorted(fields)}, source={args.source_slug or 'ALL'})" ) # --- 处理 --- field_stats: dict[str, int] = {f: 0 for f in ALL_FIELDS} field_char_delta: dict[str, int] = {f: 0 for f in ALL_FIELDS} dirty: list[Article] = [] # 有改动的文章(dry-run 用) modified_arts: list[Article] = [] # 真正要 commit 的 for art in candidates: changes = clean_one(art, fields=fields) if not changes: # scan SQL 命中 LIKE '%*%' 但清洗函数判定无变化(极少见, # 比如 * 在 escaped 位置) — 跳过 continue for field, (before_len, after_len) in changes.items(): field_stats[field] += 1 field_char_delta[field] += before_len - after_len # 正数 = 字符减少 dirty.append(art) if not args.dry_run: modified_arts.append(art) if args.show_samples and len(dirty) <= args.show_samples: print(f"\n[样本 #{len(dirty)}] id={art.id} source_id={art.source_id}") print(_preview_diff(art, fields)) # --- 提交 --- if not args.dry_run: await commit_batch(modified_arts) # --- 统计输出 --- changed_n = len(dirty) print(f"\n实际改动: {changed_n} 条 (mode={'DRY-RUN' if args.dry_run else 'EXECUTE'})") if args.show_stats: print("\n按字段统计:") for field in ALL_FIELDS: n = field_stats[field] delta = field_char_delta[field] if n == 0: continue print(f" {field:<22} {n:>5} 条 字符差 {delta:+d}") if args.dry_run: print(f"\n[DRY-RUN] 共 {changed_n} 条待改,实际未改动。去掉 --dry-run 真正执行。") else: print(f"\n✅ 已清洗 {changed_n} 条,提交完毕。") return 0 if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") sys.exit(asyncio.run(main()))