"""回填历史 body 短(<500)的文章 — 重新抓全文 + UPDATE。 用法: python scripts/backfill_body.py # 跑全部 python scripts/backfill_body.py --dry-run # 只统计 python scripts/backfill_body.py --limit 50 # 只跑 50 条 python scripts/backfill_body.py --source nhk-world # 只跑指定源 """ from __future__ import annotations import argparse import asyncio import logging import os import sys from pathlib import Path # 让脚本能 import app.* sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "backend")) from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from app.database import AsyncSessionLocal from app.models.article import Article from app.models.source import Source from app.services.fetchers import get_fetcher from app.services.fetchers.base import FetchedItem, url_hash from sqlalchemy.dialects.postgresql import insert as pg_insert logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") logger = logging.getLogger("news.backfill") async def backfill( body_threshold: int = 500, source_slug: str | None = None, limit: int | None = None, reset_translation: bool = False, dry_run: bool = False, ) -> dict: """回填 body 短的文章。 流程: 1. 查 body_text < threshold 的文章(url, source_id) 2. 拿对应 source 的 url + headers_json 3. 用 RSS fetcher 重新 fetch 4. 用 url_hash 匹配,找到对应 article 5. UPDATE body_text + body_html 6. (可选)reset translation_status='pending' + 清空 title_zh/body_zh_* 触发 worker 重译 """ stats = {"scanned": 0, "matched": 0, "updated": 0, "skipped": 0, "failed": 0} async with AsyncSessionLocal() as session: # 1) 找短 body 文章 stmt = ( select(Article) .where((Article.body_text.is_(None)) | (Article.body_text == "") | (Article.translation_status != "ok")) # 简化:只看 body 短的(无论翻译状态) ) if source_slug: stmt = stmt.join(Source, Source.id == Article.source_id).where(Source.slug == source_slug) # 全部扫一次(在内存里过滤 short body,避免 SQL 长度函数复杂) rows = (await session.execute(stmt.order_by(Article.id.desc()))).scalars() all_articles = list(rows) # 过滤 short body short_articles = [a for a in all_articles if len(a.body_text or "") < body_threshold] if limit: short_articles = short_articles[:limit] stats["scanned"] = len(short_articles) logger.info("found %d short-body articles (threshold=%d)", len(short_articles), body_threshold) if dry_run: return stats # 2) 按 source_id 分组,每个 source 跑一次 fetch(避免重复抓 feed) from collections import defaultdict by_source: dict[int, list[Article]] = defaultdict(list) for a in short_articles: by_source[a.source_id].append(a) for source_id, articles in by_source.items(): async with AsyncSessionLocal() as session2: src = (await session2.execute(select(Source).where(Source.id == source_id))).scalar_one_or_none() if not src: logger.warning("source %s missing, skip", source_id) continue if not src.enabled: logger.info("source %s disabled, skip", src.slug) continue # 3) 抓 feed try: fetcher = get_fetcher(src.kind.value, url=src.url, headers=src.headers_json) items = await fetcher.fetch() except Exception as e: logger.exception("fetch failed for %s: %s", src.slug, e) stats["failed"] += len(articles) continue # 建 url_hash → FetchedItem 索引 item_by_hash = {url_hash(it.url): it for it in items} logger.info("source %s: fetched %d items, %d short articles to match", src.slug, len(items), len(articles)) # 4) UPDATE 匹配的 for art in articles: h = url_hash(art.url) item = item_by_hash.get(h) if not item: stats["skipped"] += 1 continue if not item.body_text or len(item.body_text) < body_threshold: stats["skipped"] += 1 continue stats["matched"] += 1 update_vals = { "body_text": (item.body_text or "")[:65535], "body_html": (item.body_html or "")[:65535] if item.body_html else None, } if reset_translation: update_vals.update({ "translation_status": "pending", "title_zh": None, "body_zh_text": None, "body_zh_html": None, "body_zh_formatted": None, "translated_at": None, "translation_engine": None, "translation_chars": 0, }) async with AsyncSessionLocal() as session3: await session3.execute( update(Article).where(Article.id == art.id).values(**update_vals) ) await session3.commit() stats["updated"] += 1 return stats def main() -> None: p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) p.add_argument("--threshold", type=int, default=500, help="body 短于这个字符数才算 short") p.add_argument("--source", default=None, help="只跑指定源 slug") p.add_argument("--limit", type=int, default=None, help="最多处理 N 条") p.add_argument("--reset-translation", action="store_true", help="回填后重置 translation_status=pending 触发重译") p.add_argument("--dry-run", action="store_true") args = p.parse_args() stats = asyncio.run(backfill( body_threshold=args.threshold, source_slug=args.source, limit=args.limit, reset_translation=args.reset_translation, dry_run=args.dry_run, )) print("\n=== backfill 结果 ===") for k, v in stats.items(): print(f" {k}: {v}") if __name__ == "__main__": main()