refactor(search): 只展示 keyword 续接词,去掉 titles 段
产品决定:搜索建议只展示 ts_stat 高频词续接(如'美'→美国/美军/美国政府), 不要真实文章 id 提示(用户认为这种'文章#566871'是噪音,没连续性)。 改动: - SearchSuggestionsResponse 去 title,只剩 query + keywords - SearchService 只查 search_keywords,fallback 路径也只针对 keywords - Feed.vue: 删掉 suggestTitles 状态 + SuggestTitleOption 类型联合, renderSuggestion 简化成 '词' 标签 + 词文本 + 右侧 weight 数字 - 0011 迁移: 删 search_title_suggestions 表 + 3 索引 + trigger + 函数 (trigger 在每篇文章 INSERT/UPDATE 都会跑,删了能省掉无用性能损耗) - 删除: app/models/search_title_suggestion.py + backfill_search_suggestions.py 替换成: app/scripts/refresh_search_keywords.py(只跑一次词频刷新)
This commit is contained in:
@@ -1,156 +0,0 @@
|
||||
"""回灌 search_title_suggestions 表。
|
||||
|
||||
- 迁移 0009 给 articles 加了 trigger,新写入的会自动维护
|
||||
- 但迁移前已有的 articles 没经过 trigger,需要这个脚本一次性回填
|
||||
- 同时可以手动跑一次 refresh_search_keywords()(可选,worker 也会跑)
|
||||
|
||||
用法:
|
||||
cd backend
|
||||
python -m app.scripts.backfill_search_suggestions
|
||||
# 或 docker:
|
||||
docker compose exec api python -m app.scripts.backfill_search_suggestions
|
||||
|
||||
设计:
|
||||
- 用 batch INSERT,避免逐行 trigger 重复触发(虽然 trigger 已经在迁移里创建,
|
||||
重复执行对已存在的条目会先 DELETE 再 INSERT,等价于刷新,无害)
|
||||
- 进度条:每 1000 篇打一行
|
||||
- 失败:有 article 字段异常不会阻塞其他
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from sqlalchemy import select, text
|
||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.database import AsyncSessionLocal
|
||||
from app.models.article import Article
|
||||
from app.models.search_title_suggestion import SearchTitleSuggestion
|
||||
|
||||
logger = logging.getLogger("news.backfill_search")
|
||||
logging.basicConfig(
|
||||
level="INFO",
|
||||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||||
)
|
||||
|
||||
|
||||
MAX_TITLE_LEN = 50 # 跟迁移里的 trigger 一致
|
||||
BATCH_SIZE = 500
|
||||
|
||||
|
||||
def _build_prefix_keys(text_value: str) -> list[str]:
|
||||
"""从 '美联储宣布...' 生成 ['美','美联储','美联储宣',...,'美联储宣布...']"""
|
||||
text_value = (text_value or "")[:MAX_TITLE_LEN]
|
||||
if not text_value:
|
||||
return []
|
||||
return [text_value[:n] for n in range(1, len(text_value) + 1)]
|
||||
|
||||
|
||||
async def _process_article_batch(
|
||||
session: AsyncSession,
|
||||
articles: list[Article],
|
||||
) -> int:
|
||||
"""处理一批 articles,UPSERT 到 search_title_suggestions。
|
||||
|
||||
返回成功插入/更新的条数。
|
||||
"""
|
||||
rows = []
|
||||
for art in articles:
|
||||
if art.title_zh and len(art.title_zh.strip()) > 0:
|
||||
src_text = art.title_zh.strip()[:MAX_TITLE_LEN]
|
||||
lang = "zh"
|
||||
elif art.title and len(art.title.strip()) > 0:
|
||||
src_text = art.title.strip()[:MAX_TITLE_LEN]
|
||||
lang = "src"
|
||||
else:
|
||||
continue
|
||||
|
||||
rows.append(
|
||||
{
|
||||
"article_id": art.id,
|
||||
"title_lang": lang,
|
||||
"prefix_keys": _build_prefix_keys(src_text),
|
||||
"published_at": art.published_at,
|
||||
}
|
||||
)
|
||||
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
# 用 PG 原生 ON CONFLICT 实现 UPSERT(基于 article_id 唯一约束)
|
||||
# 注意:表没建 unique on article_id,所以先 DELETE 再 INSERT
|
||||
# 性能:批量 DELETE 在 article_id 上没索引,可能慢;临时加索引:
|
||||
# CREATE INDEX IF NOT EXISTS tmp_idx ON search_title_suggestions(article_id);
|
||||
# 简化:每个 batch 内逐条 DELETE 再 INSERT(慢但稳)
|
||||
# 替代方案:直接 TRUNCATE + 全量重灌(回填场景下更简单)
|
||||
for r in rows:
|
||||
await session.execute(
|
||||
text("DELETE FROM search_title_suggestions WHERE article_id = :aid"),
|
||||
{"aid": r["article_id"]},
|
||||
)
|
||||
# bulk insert
|
||||
await session.execute(pg_insert(SearchTitleSuggestion), rows)
|
||||
await session.commit()
|
||||
return len(rows)
|
||||
|
||||
|
||||
async def backfill() -> None:
|
||||
"""主流程:分批拉 articles,回灌 search_title_suggestions。"""
|
||||
started = datetime.now(timezone.utc)
|
||||
async with AsyncSessionLocal() as session:
|
||||
# 总数
|
||||
total = (await session.execute(select(Article.id))).all()
|
||||
total_count = len(total)
|
||||
logger.info("backfill start: %d articles to process", total_count)
|
||||
|
||||
processed = 0
|
||||
last_id = 0
|
||||
while True:
|
||||
rows = (
|
||||
await session.execute(
|
||||
select(Article)
|
||||
.where(Article.id > last_id)
|
||||
.order_by(Article.id)
|
||||
.limit(BATCH_SIZE)
|
||||
)
|
||||
).scalars().all()
|
||||
if not rows:
|
||||
break
|
||||
n = await _process_article_batch(session, list(rows))
|
||||
processed += n
|
||||
last_id = rows[-1].id
|
||||
logger.info(
|
||||
"progress: %d / %d (%.1f%%)",
|
||||
processed, total_count,
|
||||
processed / total_count * 100 if total_count else 0,
|
||||
)
|
||||
|
||||
elapsed = (datetime.now(timezone.utc) - started).total_seconds()
|
||||
logger.info("backfill done: %d rows in %.1fs", processed, elapsed)
|
||||
|
||||
# 顺便触发一次 search_keywords 刷新(让词频表也有数据)
|
||||
logger.info("triggering refresh_search_keywords()...")
|
||||
async with AsyncSessionLocal() as session:
|
||||
try:
|
||||
await session.execute(text("SELECT refresh_search_keywords()"))
|
||||
await session.commit()
|
||||
logger.info("refresh_search_keywords() done")
|
||||
except Exception as e:
|
||||
logger.exception("refresh_search_keywords failed: %s (worker 03:00 会再跑)", e)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
try:
|
||||
asyncio.run(backfill())
|
||||
except KeyboardInterrupt:
|
||||
logger.warning("interrupted")
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
48
backend/app/scripts/refresh_search_keywords.py
Normal file
48
backend/app/scripts/refresh_search_keywords.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""刷新 search_keywords(立即跑一次,不依赖 worker 03:00 调度)。
|
||||
|
||||
历史:
|
||||
- 最初版本是回灌 search_title_suggestions(articles trigger 维护的真实标题)
|
||||
- 0011 迁移删了 search_title_suggestions(产品决定只展示 keyword 续接词)
|
||||
- 现在脚本只做一件事:立即跑一次 refresh_search_keywords()
|
||||
|
||||
用法:
|
||||
docker compose exec api python -m app.scripts.refresh_search_keywords
|
||||
# 预期: search_keywords refreshed
|
||||
|
||||
性能:ts_stat 1545 篇文章全量聚合 ~88s(每天 worker 03:00 会自动跑一次,通常不需要手动)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from sqlalchemy import text
|
||||
|
||||
from app.database import AsyncSessionLocal
|
||||
|
||||
logger = logging.getLogger("news.refresh_keywords")
|
||||
logging.basicConfig(
|
||||
level="INFO",
|
||||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||||
)
|
||||
|
||||
|
||||
async def refresh() -> None:
|
||||
async with AsyncSessionLocal() as s:
|
||||
await s.execute(text("SELECT refresh_search_keywords()"))
|
||||
await s.commit()
|
||||
logger.info("search_keywords refreshed")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
try:
|
||||
asyncio.run(refresh())
|
||||
except KeyboardInterrupt:
|
||||
logger.warning("interrupted")
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user