"""回灌 search_title_suggestions 表。 - 迁移 0009 给 articles 加了 trigger,新写入的会自动维护 - 但迁移前已有的 articles 没经过 trigger,需要这个脚本一次性回填 - 同时可以手动跑一次 refresh_search_keywords()(可选,worker 也会跑) 用法: cd backend python -m app.scripts.backfill_search_suggestions # 或 docker: docker compose exec api python -m app.scripts.backfill_search_suggestions 设计: - 用 batch INSERT,避免逐行 trigger 重复触发(虽然 trigger 已经在迁移里创建, 重复执行对已存在的条目会先 DELETE 再 INSERT,等价于刷新,无害) - 进度条:每 1000 篇打一行 - 失败:有 article 字段异常不会阻塞其他 """ from __future__ import annotations import asyncio import logging import sys from datetime import datetime, timezone from sqlalchemy import select, text from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from app.database import AsyncSessionLocal from app.models.article import Article from app.models.search_title_suggestion import SearchTitleSuggestion logger = logging.getLogger("news.backfill_search") logging.basicConfig( level="INFO", format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) MAX_TITLE_LEN = 50 # 跟迁移里的 trigger 一致 BATCH_SIZE = 500 def _build_prefix_keys(text_value: str) -> list[str]: """从 '美联储宣布...' 生成 ['美','美联储','美联储宣',...,'美联储宣布...']""" text_value = (text_value or "")[:MAX_TITLE_LEN] if not text_value: return [] return [text_value[:n] for n in range(1, len(text_value) + 1)] async def _process_article_batch( session: AsyncSession, articles: list[Article], ) -> int: """处理一批 articles,UPSERT 到 search_title_suggestions。 返回成功插入/更新的条数。 """ rows = [] for art in articles: if art.title_zh and len(art.title_zh.strip()) > 0: src_text = art.title_zh.strip()[:MAX_TITLE_LEN] lang = "zh" elif art.title and len(art.title.strip()) > 0: src_text = art.title.strip()[:MAX_TITLE_LEN] lang = "src" else: continue rows.append( { "article_id": art.id, "title_lang": lang, "prefix_keys": _build_prefix_keys(src_text), "published_at": art.published_at, } ) if not rows: return 0 # 用 PG 原生 ON CONFLICT 实现 UPSERT(基于 article_id 唯一约束) # 注意:表没建 unique on article_id,所以先 DELETE 再 INSERT # 性能:批量 DELETE 在 article_id 上没索引,可能慢;临时加索引: # CREATE INDEX IF NOT EXISTS tmp_idx ON search_title_suggestions(article_id); # 简化:每个 batch 内逐条 DELETE 再 INSERT(慢但稳) # 替代方案:直接 TRUNCATE + 全量重灌(回填场景下更简单) for r in rows: await session.execute( text("DELETE FROM search_title_suggestions WHERE article_id = :aid"), {"aid": r["article_id"]}, ) # bulk insert await session.execute(pg_insert(SearchTitleSuggestion), rows) await session.commit() return len(rows) async def backfill() -> None: """主流程:分批拉 articles,回灌 search_title_suggestions。""" started = datetime.now(timezone.utc) async with AsyncSessionLocal() as session: # 总数 total = (await session.execute(select(Article.id))).all() total_count = len(total) logger.info("backfill start: %d articles to process", total_count) processed = 0 last_id = 0 while True: rows = ( await session.execute( select(Article) .where(Article.id > last_id) .order_by(Article.id) .limit(BATCH_SIZE) ) ).scalars().all() if not rows: break n = await _process_article_batch(session, list(rows)) processed += n last_id = rows[-1].id logger.info( "progress: %d / %d (%.1f%%)", processed, total_count, processed / total_count * 100 if total_count else 0, ) elapsed = (datetime.now(timezone.utc) - started).total_seconds() logger.info("backfill done: %d rows in %.1fs", processed, elapsed) # 顺便触发一次 search_keywords 刷新(让词频表也有数据) logger.info("triggering refresh_search_keywords()...") async with AsyncSessionLocal() as session: try: await session.execute(text("SELECT refresh_search_keywords()")) await session.commit() logger.info("refresh_search_keywords() done") except Exception as e: logger.exception("refresh_search_keywords failed: %s (worker 03:00 会再跑)", e) def main() -> int: try: asyncio.run(backfill()) except KeyboardInterrupt: logger.warning("interrupted") return 1 return 0 if __name__ == "__main__": sys.exit(main())