From 35c0da1670697bf05a848ea264f3319f8bb74005 Mon Sep 17 00:00:00 2001 From: Mavis Date: Mon, 8 Jun 2026 15:59:14 +0800 Subject: [PATCH] =?UTF-8?q?feat(scripts):=20=E6=96=B0=E5=A2=9E=20backfill?= =?UTF-8?q?=5Fbody.py=20=E5=9B=9E=E5=A1=AB=20body=20=E7=9F=AD=E7=9A=84?= =?UTF-8?q?=E6=96=87=E7=AB=A0(=E9=87=8D=E6=96=B0=E6=8A=93=E5=85=A8?= =?UTF-8?q?=E6=96=87)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/backfill_body.py | 162 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 162 insertions(+) create mode 100644 scripts/backfill_body.py diff --git a/scripts/backfill_body.py b/scripts/backfill_body.py new file mode 100644 index 0000000..f903939 --- /dev/null +++ b/scripts/backfill_body.py @@ -0,0 +1,162 @@ +"""回填历史 body 短(<500)的文章 — 重新抓全文 + UPDATE。 + +用法: + python scripts/backfill_body.py # 跑全部 + python scripts/backfill_body.py --dry-run # 只统计 + python scripts/backfill_body.py --limit 50 # 只跑 50 条 + python scripts/backfill_body.py --source nhk-world # 只跑指定源 +""" +from __future__ import annotations + +import argparse +import asyncio +import logging +import os +import sys +from pathlib import Path + +# 让脚本能 import app.* +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "backend")) + +from sqlalchemy import select, update +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database import AsyncSessionLocal +from app.models.article import Article +from app.models.source import Source +from app.services.fetchers import get_fetcher +from app.services.fetchers.base import FetchedItem, url_hash +from sqlalchemy.dialects.postgresql import insert as pg_insert + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") +logger = logging.getLogger("news.backfill") + + +async def backfill( + body_threshold: int = 500, + source_slug: str | None = None, + limit: int | None = None, + reset_translation: bool = False, + dry_run: bool = False, +) -> dict: + """回填 body 短的文章。 + + 流程: + 1. 查 body_text < threshold 的文章(url, source_id) + 2. 拿对应 source 的 url + headers_json + 3. 用 RSS fetcher 重新 fetch + 4. 用 url_hash 匹配,找到对应 article + 5. UPDATE body_text + body_html + 6. (可选)reset translation_status='pending' + 清空 title_zh/body_zh_* 触发 worker 重译 + """ + stats = {"scanned": 0, "matched": 0, "updated": 0, "skipped": 0, "failed": 0} + + async with AsyncSessionLocal() as session: + # 1) 找短 body 文章 + stmt = ( + select(Article) + .where((Article.body_text.is_(None)) | (Article.body_text == "") | (Article.translation_status != "ok")) + # 简化:只看 body 短的(无论翻译状态) + ) + if source_slug: + stmt = stmt.join(Source, Source.id == Article.source_id).where(Source.slug == source_slug) + # 全部扫一次(在内存里过滤 short body,避免 SQL 长度函数复杂) + rows = (await session.execute(stmt.order_by(Article.id.desc()))).scalars() + all_articles = list(rows) + # 过滤 short body + short_articles = [a for a in all_articles if len(a.body_text or "") < body_threshold] + if limit: + short_articles = short_articles[:limit] + stats["scanned"] = len(short_articles) + logger.info("found %d short-body articles (threshold=%d)", len(short_articles), body_threshold) + + if dry_run: + return stats + + # 2) 按 source_id 分组,每个 source 跑一次 fetch(避免重复抓 feed) + from collections import defaultdict + by_source: dict[int, list[Article]] = defaultdict(list) + for a in short_articles: + by_source[a.source_id].append(a) + + for source_id, articles in by_source.items(): + async with AsyncSessionLocal() as session2: + src = (await session2.execute(select(Source).where(Source.id == source_id))).scalar_one_or_none() + if not src: + logger.warning("source %s missing, skip", source_id) + continue + if not src.enabled: + logger.info("source %s disabled, skip", src.slug) + continue + + # 3) 抓 feed + try: + fetcher = get_fetcher(src.kind.value, url=src.url, headers=src.headers_json) + items = await fetcher.fetch() + except Exception as e: + logger.exception("fetch failed for %s: %s", src.slug, e) + stats["failed"] += len(articles) + continue + + # 建 url_hash → FetchedItem 索引 + item_by_hash = {url_hash(it.url): it for it in items} + logger.info("source %s: fetched %d items, %d short articles to match", src.slug, len(items), len(articles)) + + # 4) UPDATE 匹配的 + for art in articles: + h = url_hash(art.url) + item = item_by_hash.get(h) + if not item: + stats["skipped"] += 1 + continue + if not item.body_text or len(item.body_text) < body_threshold: + stats["skipped"] += 1 + continue + stats["matched"] += 1 + update_vals = { + "body_text": (item.body_text or "")[:65535], + "body_html": (item.body_html or "")[:65535] if item.body_html else None, + } + if reset_translation: + update_vals.update({ + "translation_status": "pending", + "title_zh": None, + "body_zh_text": None, + "body_zh_html": None, + "body_zh_formatted": None, + "translated_at": None, + "translation_engine": None, + "translation_chars": 0, + }) + async with AsyncSessionLocal() as session3: + await session3.execute( + update(Article).where(Article.id == art.id).values(**update_vals) + ) + await session3.commit() + stats["updated"] += 1 + + return stats + + +def main() -> None: + p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + p.add_argument("--threshold", type=int, default=500, help="body 短于这个字符数才算 short") + p.add_argument("--source", default=None, help="只跑指定源 slug") + p.add_argument("--limit", type=int, default=None, help="最多处理 N 条") + p.add_argument("--reset-translation", action="store_true", help="回填后重置 translation_status=pending 触发重译") + p.add_argument("--dry-run", action="store_true") + args = p.parse_args() + stats = asyncio.run(backfill( + body_threshold=args.threshold, + source_slug=args.source, + limit=args.limit, + reset_translation=args.reset_translation, + dry_run=args.dry_run, + )) + print("\n=== backfill 结果 ===") + for k, v in stats.items(): + print(f" {k}: {v}") + + +if __name__ == "__main__": + main()