diff --git a/backend/app/scripts/clean_translations.py b/backend/app/scripts/clean_translations.py new file mode 100644 index 0000000..f83e056 --- /dev/null +++ b/backend/app/scripts/clean_translations.py @@ -0,0 +1,338 @@ +"""批量清洗历史译文里的 markdown 星号标记(`**` / `*` / `***`)。 + +背景: +- LLM 翻译时偶尔把 markdown 加粗标记原样带进译文,前端展示就成了 `**FBI**局长` 这种脏数据 +- 已发现的脏模式(LLM 输出不严谨): + **FBI**局长**卡什·帕特尔**表示,该机构挫败了一起针对周日发生在**白宫南草坪**的**UFC**活动的阴谋。 +- 期望: + FBI局长卡什·帕特尔表示,该机构挫败了一起针对周日发生在白宫南草坪的UFC活动的阴谋。 + +清洗规则(详见 app.services.translation.text_clean): + ***text*** -> text (粗+斜) + **text** -> text (粗) — 循环直到稳定,处理连续多对 + *text* -> text (斜) + 兜底:所有零散 `*` 一律删除(应对 LLM 输出错位) + +字段清洗范围: + - title_zh 直接清洗 + - body_zh_text 直接清洗 + - body_zh_html 用清洗后的 body_zh_text 重新 wrap(保证一致) + - body_zh_formatted BeautifulSoup 解析,只清洗文本节点(保留 diary-para class + style) + - summary_zh 直接清洗 + +不动: + - commentary / commentary_meituan(评论是另外的 LLM 产物,不在"翻译"范畴) + - translation_status / translation_engine(状态/引擎标记本身没错) + +工作流程: + 1. 在 pipeline.py 的 translate_article() 写库前已接入防御性清洗(commit 后的 worker 不会再产生 `**`) + 2. 这个脚本专门清理历史脏数据 — 跑一次即可,后续靠 pipeline 端防御 + +用法(在 worker 容器里): + + # 1) 先 dry-run 看看会改多少 / 样本对比 + docker compose exec worker python -m app.scripts.clean_translations --dry-run --show-samples 5 + + # 2) 全量清洗 + docker compose exec worker python -m app.scripts.clean_translations + + # 3) 分批:每次 200 篇,避免一次性锁太久 + docker compose exec worker python -m app.scripts.clean_translations --batch-size 200 + + # 4) 只清洗某个源 + docker compose exec worker python -m app.scripts.clean_translations \\ + --source-slug bbc-world + + # 5) 只清洗某个字段(排错用) + docker compose exec worker python -m app.scripts.clean_translations \\ + --field body_zh_text --limit 10 --dry-run +""" +from __future__ import annotations + +import argparse +import asyncio +import logging +import sys +from typing import Iterable + +from sqlalchemy import or_, select + +from app.database import AsyncSessionLocal +from app.models.article import Article +from app.models.source import Source +from app.services.translation.text_clean import ( + clean_html_inner_text, + clean_markdown_asterisks, + wrap_html, +) + +logger = logging.getLogger("news.scripts.clean_translations") + + +# === SQL 过滤:任一翻译字段含 * 就捞出来 === +# LIKE 里的 `*` 是字面字符(只有 `%` 和 `_` 才是 wildcard),无需 escape +_ASTERISK_COLUMNS = ( + Article.title_zh, + Article.body_zh_text, + Article.body_zh_html, + Article.body_zh_formatted, + Article.summary_zh, +) + + +def _scan_sql(*, source_slug: str | None): + """构造扫描 SQL。""" + stmt = select(Article).where( + or_(*(col.like("%*%") for col in _ASTERISK_COLUMNS)) + ).order_by(Article.id.asc()) + if source_slug: + # 注意:Source filter 在 main() 里手动处理(下面 async 块), + # 因为这里不能 await source 查询 — 简单做法是先查 source id 再拼 where + return stmt, source_slug + return stmt, None + + +async def fetch_candidates( + *, source_slug: str | None, batch_size: int | None +) -> list[Article]: + """从 DB 捞出含 * 的文章列表(轻量 — 不读全文,后面再按需读)。""" + async with AsyncSessionLocal() as session: + stmt, slug = _scan_sql(source_slug=source_slug) + if slug: + src = ( + await session.execute(select(Source).where(Source.slug == slug)) + ).scalar_one_or_none() + if not src: + print(f"!! source_slug '{slug}' 不存在", file=sys.stderr) + return [] + stmt = stmt.where(Article.source_id == src.id) + if batch_size is not None: + stmt = stmt.limit(batch_size) + rows = (await session.execute(stmt)).scalars().all() + return list(rows) + + +# === 清洗单篇文章 === +def clean_one( + art: Article, *, fields: set[str] +) -> dict[str, tuple[int, int]]: + """清洗一篇文章的指定字段,返回 {field: (before_len, after_len)}。 + + fields 是允许清洗的字段集合(由 --field 参数控制)。 + """ + changes: dict[str, tuple[int, int]] = {} + + # --- title_zh --- + if "title_zh" in fields and art.title_zh: + cleaned = clean_markdown_asterisks(art.title_zh) + if cleaned != art.title_zh: + changes["title_zh"] = (len(art.title_zh), len(cleaned or "")) + art.title_zh = cleaned or None + + # --- body_zh_text --- + body_text_changed = False + if "body_zh_text" in fields and art.body_zh_text: + cleaned = clean_markdown_asterisks(art.body_zh_text) + if cleaned != art.body_zh_text: + changes["body_zh_text"] = (len(art.body_zh_text), len(cleaned or "")) + art.body_zh_text = cleaned or None + body_text_changed = True + + # --- body_zh_html --- + # 策略:用清洗后的 body_zh_text 重新 wrap(保证两个字段一致) + # 触发条件:body_zh_text 被改 / 或原 body_zh_html 本身含 * 但 body_zh_text 不含(老数据不一致) + if "body_zh_html" in fields: + if body_text_changed: + new_html = wrap_html(art.body_zh_text) + old_html = art.body_zh_html or "" + if (new_html or "") != old_html: + changes["body_zh_html"] = (len(old_html), len(new_html or "")) + art.body_zh_html = new_html + elif art.body_zh_html and "*" in art.body_zh_html: + # body_zh_text 没改但 body_zh_html 仍含 *:BeautifulSoup 兜底清洗 + cleaned_html = clean_html_inner_text(art.body_zh_html) + if cleaned_html and cleaned_html != art.body_zh_html: + changes["body_zh_html"] = ( + len(art.body_zh_html), len(cleaned_html), + ) + art.body_zh_html = cleaned_html + + # --- body_zh_formatted --- + # 不能直接重新生成(会丢 diary-para class + 内联 style),只能用 BS 清洗文本节点 + if ( + "body_zh_formatted" in fields + and art.body_zh_formatted + and "*" in art.body_zh_formatted + ): + cleaned = clean_html_inner_text(art.body_zh_formatted) + if cleaned and cleaned != art.body_zh_formatted: + changes["body_zh_formatted"] = ( + len(art.body_zh_formatted), len(cleaned), + ) + art.body_zh_formatted = cleaned + + # --- summary_zh --- + if "summary_zh" in fields and art.summary_zh: + cleaned = clean_markdown_asterisks(art.summary_zh) + if cleaned != art.summary_zh: + changes["summary_zh"] = (len(art.summary_zh), len(cleaned or "")) + art.summary_zh = cleaned or None + + return changes + + +# === 主流程 === +ALL_FIELDS = ("title_zh", "body_zh_text", "body_zh_html", "body_zh_formatted", "summary_zh") + + +async def commit_batch(arts: list[Article]) -> None: + """批量提交(避免每条都 round-trip)。""" + if not arts: + return + async with AsyncSessionLocal() as session: + for art in arts: + await session.merge(art) + await session.commit() + + +def _preview_diff(art: Article, fields: set[str]) -> str: + """打印一条样本的清洗前后对比(dry-run 用)。""" + lines: list[str] = [] + art_copy_title = clean_markdown_asterisks(art.title_zh) if ( + "title_zh" in fields and art.title_zh + ) else art.title_zh + art_copy_body = clean_markdown_asterisks(art.body_zh_text) if ( + "body_zh_text" in fields and art.body_zh_text + ) else art.body_zh_text + if art.title_zh and art_copy_title != art.title_zh: + lines.append(f" TITLE before: {art.title_zh!r}") + lines.append(f" TITLE after : {art_copy_title!r}") + if art.body_zh_text and art_copy_body != art.body_zh_text: + body_before = art.body_zh_text + body_after = art_copy_body + # 只截前 200 字符(样本,避免刷屏) + if len(body_before) > 200 or len(body_after) > 200: + lines.append(f" BODY before: {body_before[:200]!r}...") + lines.append(f" BODY after : {body_after[:200]!r}...") + else: + lines.append(f" BODY before: {body_before!r}") + lines.append(f" BODY after : {body_after!r}") + return "\n".join(lines) + + +async def main() -> int: + p = argparse.ArgumentParser( + description="批量清洗历史译文里的 markdown 星号标记(** / * / ***)", + ) + p.add_argument( + "--dry-run", + action="store_true", + help="只打印待改列表 + 样本对比,不动数据库", + ) + p.add_argument( + "--limit", + type=int, + default=None, + help="最多处理多少条文章(分批用,避免一次性锁太久)", + ) + p.add_argument( + "--batch-size", + type=int, + default=None, + help="从 DB 一次捞多少条(默认 = limit, 即一次全部)", + ) + p.add_argument( + "--source-slug", + default=None, + help="限定某个采集源(按 slug)", + ) + p.add_argument( + "--field", + choices=ALL_FIELDS + ("all",), + default="all", + help="只清洗指定字段(排错用;默认 all = 清洗全部 5 个字段)", + ) + p.add_argument( + "--show-samples", + type=int, + default=0, + metavar="N", + help="dry-run 时额外打印前 N 条样本的清洗前后对比", + ) + p.add_argument( + "--show-stats", + action="store_true", + help="按字段分组统计改动次数 + 总字符差", + ) + args = p.parse_args() + + # --- 字段白名单 --- + if args.field == "all": + fields = set(ALL_FIELDS) + else: + fields = {args.field} + + # --- 扫描 --- + fetch_limit = args.batch_size if args.batch_size is not None else args.limit + candidates = await fetch_candidates( + source_slug=args.source_slug, batch_size=fetch_limit, + ) + if not candidates: + print("✅ 没找到含 `*` 标记的文章,无需清洗") + return 0 + + print( + f"找到 {len(candidates)} 条含 `*` 标记的文章" + f"(mode={'DRY-RUN' if args.dry_run else 'EXECUTE'}, " + f"fields={sorted(fields)}, source={args.source_slug or 'ALL'})" + ) + + # --- 处理 --- + field_stats: dict[str, int] = {f: 0 for f in ALL_FIELDS} + field_char_delta: dict[str, int] = {f: 0 for f in ALL_FIELDS} + dirty: list[Article] = [] # 有改动的文章(dry-run 用) + modified_arts: list[Article] = [] # 真正要 commit 的 + + for art in candidates: + changes = clean_one(art, fields=fields) + if not changes: + # scan SQL 命中 LIKE '%*%' 但清洗函数判定无变化(极少见, + # 比如 * 在 escaped 位置) — 跳过 + continue + for field, (before_len, after_len) in changes.items(): + field_stats[field] += 1 + field_char_delta[field] += before_len - after_len # 正数 = 字符减少 + dirty.append(art) + if not args.dry_run: + modified_arts.append(art) + if args.show_samples and len(dirty) <= args.show_samples: + print(f"\n[样本 #{len(dirty)}] id={art.id} source_id={art.source_id}") + print(_preview_diff(art, fields)) + + # --- 提交 --- + if not args.dry_run: + await commit_batch(modified_arts) + + # --- 统计输出 --- + changed_n = len(dirty) + print(f"\n实际改动: {changed_n} 条 (mode={'DRY-RUN' if args.dry_run else 'EXECUTE'})") + if args.show_stats: + print("\n按字段统计:") + for field in ALL_FIELDS: + n = field_stats[field] + delta = field_char_delta[field] + if n == 0: + continue + print(f" {field:<22} {n:>5} 条 字符差 {delta:+d}") + + if args.dry_run: + print(f"\n[DRY-RUN] 共 {changed_n} 条待改,实际未改动。去掉 --dry-run 真正执行。") + else: + print(f"\n✅ 已清洗 {changed_n} 条,提交完毕。") + + return 0 + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") + sys.exit(asyncio.run(main())) \ No newline at end of file diff --git a/backend/app/services/translation/text_clean.py b/backend/app/services/translation/text_clean.py new file mode 100644 index 0000000..82e040a --- /dev/null +++ b/backend/app/services/translation/text_clean.py @@ -0,0 +1,121 @@ +"""译文文本清洗工具。 + +应用场景: +- LLM 翻译时偶尔把 markdown 标记(加粗 `**`、`*`、`***`)原样带进译文里, + 前端展示出来就成了 `**FBI**局长` 这种带星号的脏数据。 +- enrichment 阶段也会把 `**` 带到 `body_zh_formatted` / `summary_zh`。 + +提供两个核心函数: +- `clean_markdown_asterisks(text)`:清洗字符串里的 `*` / `**` / `***` 标记 +- `clean_html_inner_text(html)`:BeautifulSoup 解析 HTML,只清洗文本节点, + 保留标签结构和内联 style + +另附 `wrap_html(text)`:把清洗后的 body_zh_text 包成简单 `

` HTML, + 之前在 pipeline.py 是私有 `_wrap_html`,提到此处供复用。 + +设计原则: +- 不引入额外依赖(只用 `re` + 项目已有的 `beautifulsoup4`) +- 对 None / 空串安全返回 None / 空串 +- 处理顺序从长到短,避免 `**` 被 `*` 先吃掉 +- 反复循环直到稳定,应对 `**a****b**` 这种连续多对 +- 兜底删除所有残留 `*`,保守但符合"去掉星号"的用户意图 +""" +from __future__ import annotations + +import re + +from bs4 import BeautifulSoup + + +# === 核心清洗函数 === + +# ***text*** → text(粗+斜体) +_ASTERISK_TRIPLE_RE = re.compile(r"\*\*\*([^*]+?)\*\*\*") +# **text** → text(粗体) +_ASTERISK_DOUBLE_RE = re.compile(r"\*\*([^*]+?)\*\*") +# *text* → text(斜体,要求 text 不为空且不与 * 相邻,避免误伤 `*2*3*` 这种) +# 第一个负向回看 (? str | None: + """清洗字符串里的 markdown 星号标记残留。 + + 处理顺序: + 1. `***text***` -> `text` + 2. `**text**` -> `text`(循环直到稳定,处理 `**a****b**` 这种连续) + 3. `*text*` -> `text` + 4. 兜底:残留的 `*` / `**` / `***` 一律删除(LLM 输出不严谨的脏数据) + + 对 None / 空串安全返回原值。 + """ + if not text: + return text + + # 1+2) 先把 *** / ** 多轮替换,直到稳定(处理嵌套/连续多对) + prev: str | None = None + while prev != text: + prev = text + text = _ASTERISK_TRIPLE_RE.sub(r"\1", text) + text = _ASTERISK_DOUBLE_RE.sub(r"\1", text) + + # 3) 单星号斜体 + text = _ASTERISK_SINGLE_RE.sub(r"\1", text) + + # 4) 兜底:删掉所有零散 * + text = _ANY_ASTERISK_RE.sub("", text) + + return text + + +def clean_html_inner_text(html: str | None) -> str | None: + """清洗 HTML 内的文本节点(保留标签结构和属性)。 + + 用途:`body_zh_formatted` 这种由 LLM 排版产物 —— + 不能整个重新生成(会丢 `diary-para` class 和内联 style), + 只能用 BeautifulSoup 找到所有文本节点单独清洗。 + + 对 None / 空串安全返回原值。 + """ + if not html: + return html + soup = BeautifulSoup(html, "html.parser") + changed = False + for node in list(soup.find_all(string=True)): + # 跳过纯空白文本节点 + original = str(node) + if not original or not original.strip(): + continue + cleaned = clean_markdown_asterisks(original) + if cleaned != original: + node.replace_with(cleaned) + changed = True + return str(soup) if changed else html + + +# === 公开版 wrap_html(原 pipeline._wrap_html,提到此处供脚本复用)=== + +def wrap_html(text: str | None) -> str | None: + """把清洗后的译文纯文本包成简单的 `

` 段落 HTML。 + + 内部会自动调用 `clean_markdown_asterisks` 清洗 `**` / `*` / `***`, + 调用方无需"先清洗再 wrap"——这是幂等的,即使输入已清洗也是 no-op。 + + 行为: + - 按 `\n\n` 切段,空段过滤 + - 每段包 `

...

` + - 段落之间用 `\n` 拼接 + + 对 None / 空串返回 None。 + """ + if not text: + return None + cleaned = clean_markdown_asterisks(text) + if not cleaned: + return None + parts = [f"

{p.strip()}

" for p in cleaned.split("\n\n") if p.strip()] + return "\n".join(parts) if parts else None \ No newline at end of file diff --git a/backend/app/workers/pipeline.py b/backend/app/workers/pipeline.py index e470b80..7a9079f 100644 --- a/backend/app/workers/pipeline.py +++ b/backend/app/workers/pipeline.py @@ -19,6 +19,7 @@ from app.models.source import Source, SourceKind from app.services.fetchers import get_fetcher from app.services.fetchers.base import FetchedItem, url_hash from app.services.translation.service import service as translation_service +from app.services.translation.text_clean import clean_markdown_asterisks, wrap_html logger = logging.getLogger("news.pipeline") @@ -232,15 +233,20 @@ async def translate_article(article_id: int) -> None: await session.commit() return + # 写库前清洗:LLM 偶尔会把 markdown 加粗标记 ** / * 带进译文。 + # 源头控制比事后批量洗更稳 — 历史脏数据由 scripts/clean_translations.py 处理。 + tr_title_clean = clean_markdown_asterisks(tr_title.text) + tr_body_clean = clean_markdown_asterisks(tr_body) + # 写回 async with AsyncSessionLocal() as session: art = ( await session.execute(select(Article).where(Article.id == article_id_ref)) ).scalar_one_or_none() if art: - art.title_zh = tr_title.text if tr_title.text else None - art.body_zh_text = tr_body or None - art.body_zh_html = _wrap_html(tr_body) if tr_body else None + art.title_zh = tr_title_clean or None + art.body_zh_text = tr_body_clean or None + art.body_zh_html = wrap_html(tr_body_clean) if tr_body_clean else None art.translation_status = status art.translation_engine = engine_label art.translation_chars = total_chars @@ -293,14 +299,6 @@ def _split_long_para(para: str, max_chars: int) -> list[str]: return parts -def _wrap_html(text: str) -> str: - """把译文包成 HTML 段落。""" - from bs4 import BeautifulSoup - - parts = [f"

{p.strip()}

" for p in text.split("\n\n") if p.strip()] - return "\n".join(parts) if parts else "" - - # === 全量跑(供测试 / 手动触发) === async def run_once() -> None: async with AsyncSessionLocal() as session: