Files
diary-news/backend/app/scripts/backfill_search_suggestions.py

157 lines
5.1 KiB
Python
Raw Normal View History

"""回灌 search_title_suggestions 表。
- 迁移 0009 articles 加了 trigger,新写入的会自动维护
- 但迁移前已有的 articles 没经过 trigger,需要这个脚本一次性回填
- 同时可以手动跑一次 refresh_search_keywords()(可选,worker 也会跑)
用法:
cd backend
python -m app.scripts.backfill_search_suggestions
# 或 docker:
docker compose exec api python -m app.scripts.backfill_search_suggestions
设计:
- batch INSERT,避免逐行 trigger 重复触发(虽然 trigger 已经在迁移里创建,
重复执行对已存在的条目会先 DELETE INSERT,等价于刷新,无害)
- 进度条: 1000 篇打一行
- 失败: article 字段异常不会阻塞其他
"""
from __future__ import annotations
import asyncio
import logging
import sys
from datetime import datetime, timezone
from sqlalchemy import select, text
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import AsyncSessionLocal
from app.models.article import Article
from app.models.search_title_suggestion import SearchTitleSuggestion
logger = logging.getLogger("news.backfill_search")
logging.basicConfig(
level="INFO",
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
MAX_TITLE_LEN = 50 # 跟迁移里的 trigger 一致
BATCH_SIZE = 500
def _build_prefix_keys(text_value: str) -> list[str]:
"""'美联储宣布...' 生成 ['','美联储','美联储宣',...,'美联储宣布...']"""
text_value = (text_value or "")[:MAX_TITLE_LEN]
if not text_value:
return []
return [text_value[:n] for n in range(1, len(text_value) + 1)]
async def _process_article_batch(
session: AsyncSession,
articles: list[Article],
) -> int:
"""处理一批 articles,UPSERT 到 search_title_suggestions。
返回成功插入/更新的条数
"""
rows = []
for art in articles:
if art.title_zh and len(art.title_zh.strip()) > 0:
src_text = art.title_zh.strip()[:MAX_TITLE_LEN]
lang = "zh"
elif art.title and len(art.title.strip()) > 0:
src_text = art.title.strip()[:MAX_TITLE_LEN]
lang = "src"
else:
continue
rows.append(
{
"article_id": art.id,
"title_lang": lang,
"prefix_keys": _build_prefix_keys(src_text),
"published_at": art.published_at,
}
)
if not rows:
return 0
# 用 PG 原生 ON CONFLICT 实现 UPSERT(基于 article_id 唯一约束)
# 注意:表没建 unique on article_id,所以先 DELETE 再 INSERT
# 性能:批量 DELETE 在 article_id 上没索引,可能慢;临时加索引:
# CREATE INDEX IF NOT EXISTS tmp_idx ON search_title_suggestions(article_id);
# 简化:每个 batch 内逐条 DELETE 再 INSERT(慢但稳)
# 替代方案:直接 TRUNCATE + 全量重灌(回填场景下更简单)
for r in rows:
await session.execute(
text("DELETE FROM search_title_suggestions WHERE article_id = :aid"),
{"aid": r["article_id"]},
)
# bulk insert
await session.execute(pg_insert(SearchTitleSuggestion), rows)
await session.commit()
return len(rows)
async def backfill() -> None:
"""主流程:分批拉 articles,回灌 search_title_suggestions。"""
started = datetime.now(timezone.utc)
async with AsyncSessionLocal() as session:
# 总数
total = (await session.execute(select(Article.id))).all()
total_count = len(total)
logger.info("backfill start: %d articles to process", total_count)
processed = 0
last_id = 0
while True:
rows = (
await session.execute(
select(Article)
.where(Article.id > last_id)
.order_by(Article.id)
.limit(BATCH_SIZE)
)
).scalars().all()
if not rows:
break
n = await _process_article_batch(session, list(rows))
processed += n
last_id = rows[-1].id
logger.info(
"progress: %d / %d (%.1f%%)",
processed, total_count,
processed / total_count * 100 if total_count else 0,
)
elapsed = (datetime.now(timezone.utc) - started).total_seconds()
logger.info("backfill done: %d rows in %.1fs", processed, elapsed)
# 顺便触发一次 search_keywords 刷新(让词频表也有数据)
logger.info("triggering refresh_search_keywords()...")
async with AsyncSessionLocal() as session:
try:
await session.execute(text("SELECT refresh_search_keywords()"))
await session.commit()
logger.info("refresh_search_keywords() done")
except Exception as e:
logger.exception("refresh_search_keywords failed: %s (worker 03:00 会再跑)", e)
def main() -> int:
try:
asyncio.run(backfill())
except KeyboardInterrupt:
logger.warning("interrupted")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())