From c3aa0f0cb6543169fa04280c4adb092ae24bff8c Mon Sep 17 00:00:00 2001 From: mavis Date: Mon, 15 Jun 2026 18:26:35 +0800 Subject: [PATCH] =?UTF-8?q?feat(search):=20=E6=99=BA=E8=83=BD=E6=90=9C?= =?UTF-8?q?=E7=B4=A2=E5=BB=BA=E8=AE=AE=20-=20=E5=9B=BA=E5=8C=96=E5=80=99?= =?UTF-8?q?=E9=80=89=E8=AF=8D=E8=A1=A8=20(search=5Fkeywords=20+=20search?= =?UTF-8?q?=5Ftitle=5Fsuggestions)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 后端: - alembic 0009: 两张固化表 + GIN prefix_keys 索引 + articles trigger - /api/v1/search/suggestions: 混合 A(高频词 ts_stat) + B(真实标题) + 冷启动 fallback - worker 每日 03:00 + 启动时刷新 search_keywords - 顺便填 commit 11 TODO: articles.title_zh_tsv + GIN 索引(未来 FTS 基础) 前端: - NInput -> NAutoComplete + debounce 250ms - 选标题 -> 跳详情;选关键词 -> 填入 + 触发搜索 - AbortController 防 race condition 性能: prefix_keys @> ARRAY[prefix] 走 GIN 亚毫秒,100w 行也稳 --- .../versions/0009_search_suggestions.py | 261 ++++++++++++++++++ backend/app/api/search.py | 47 ++++ backend/app/main.py | 3 +- backend/app/models/__init__.py | 4 + backend/app/models/search_keyword.py | 45 +++ backend/app/models/search_title_suggestion.py | 43 +++ backend/app/schemas/search.py | 24 ++ .../scripts/backfill_search_suggestions.py | 156 +++++++++++ backend/app/services/search.py | 160 +++++++++++ backend/app/workers/__main__.py | 37 ++- frontend/src/api/search.ts | 35 +++ frontend/src/composables/useDebounce.ts | 37 +++ frontend/src/views/Feed.vue | 183 +++++++++++- 13 files changed, 1028 insertions(+), 7 deletions(-) create mode 100644 backend/alembic/versions/0009_search_suggestions.py create mode 100644 backend/app/api/search.py create mode 100644 backend/app/models/search_keyword.py create mode 100644 backend/app/models/search_title_suggestion.py create mode 100644 backend/app/schemas/search.py create mode 100644 backend/app/scripts/backfill_search_suggestions.py create mode 100644 backend/app/services/search.py create mode 100644 frontend/src/api/search.ts create mode 100644 frontend/src/composables/useDebounce.ts diff --git a/backend/alembic/versions/0009_search_suggestions.py b/backend/alembic/versions/0009_search_suggestions.py new file mode 100644 index 0000000..4433ce9 --- /dev/null +++ b/backend/alembic/versions/0009_search_suggestions.py @@ -0,0 +1,261 @@ +"""搜索建议固化表 + 触发器 + +设计: +- search_keywords: ts_stat 词频表,worker 每日凌晨刷新(全量重建) +- search_title_suggestions: 文章标题片段表,articles trigger 实时维护(增删改 title_zh 时) +- 两表都有 prefix_keys text[] + GIN 索引,查询走 prefix_keys @> ARRAY['前缀'] + (比 LIKE '%xxx%' 快 10-100x,100w 行也是亚毫秒) + +数据源: +- search_title_suggestions: articles.title_zh + - 包括翻译后的中文标题 + 短新闻的原文(短新闻 title_zh 为空时回退 title) + - 触发器只维护这一张表(写入频繁,实时) +- search_keywords: + - ts_stat 词频(从 articles.title_zh + body_zh_text + commentary + commentary_meituan 算) + - 每日 worker 全量刷新(词频聚合慢,不适合每篇文章 trigger) + +顺手把 commit 11 提到的 full-text search 基础做完: +- articles.title_zh_tsv tsvector 列 + GIN 索引(用于未来 query 走 FTS) +- 触发器自动维护 + +Revision ID: 0009 +Revises: 0008 +Create Date: 2026-06-15 +""" +from __future__ import annotations + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects.postgresql import ARRAY, TSVECTOR + + +revision: str = "0009" +down_revision: Union[str, None] = "0008" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # === 1) articles: 加 tsvector 列 + GIN 索引(full-text search 基础)=== + # 用 'simple' parser:对中文按字符切,免装 zhparser 扩展; + # simple parser 对英文也 OK(按空格切),通用。 + # future:搜索 ?q=xxx 可以改走 to_tsquery,这里只先建基础设施。 + op.add_column( + "articles", + sa.Column( + "title_zh_tsv", + TSVECTOR, + sa.Computed( + "to_tsvector('simple', coalesce(title_zh, ''))", + persisted=True, + ), + ), + ) + op.create_index( + "ix_articles_title_zh_tsv", + "articles", + ["title_zh_tsv"], + postgresql_using="gin", + ) + + # === 2) search_keywords: 词频候选词表 === + op.create_table( + "search_keywords", + sa.Column("id", sa.BigInteger, primary_key=True), + sa.Column("keyword", sa.Text, nullable=False), + # ts_stat / title_extract / manual + sa.Column("source", sa.String(32), nullable=False), + # 词频或文章数(权重) + sa.Column("weight", sa.Integer, nullable=False, server_default="0"), + # 预计算的前缀数组,['美','美联储','美联储宣'] for '美联储宣布...' + # 查询: prefix_keys @> ARRAY['美'] 走 GIN 索引 + sa.Column("prefix_keys", ARRAY(sa.Text), nullable=False), + sa.Column( + "last_seen_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + sa.UniqueConstraint("keyword", "source", name="uq_search_keywords_kw_src"), + ) + op.create_index( + "ix_search_keywords_prefix", + "search_keywords", + ["prefix_keys"], + postgresql_using="gin", + ) + op.create_index( + "ix_search_keywords_source_weight", + "search_keywords", + ["source", "weight"], + ) + op.create_index( + "ix_search_keywords_keyword_btree", + "search_keywords", + ["keyword"], + ) + + # === 3) search_title_suggestions: 真实文章标题片段表 === + op.create_table( + "search_title_suggestions", + sa.Column("id", sa.BigInteger, primary_key=True), + sa.Column( + "article_id", + sa.BigInteger, + sa.ForeignKey("articles.id", ondelete="CASCADE"), + nullable=False, + ), + # 用的字段:'title_zh' / 'title' (短新闻回退) + sa.Column("title_lang", sa.String(8), nullable=False, server_default="zh"), + sa.Column("prefix_keys", ARRAY(sa.Text), nullable=False), + sa.Column("published_at", sa.DateTime(timezone=True), nullable=True), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + ) + op.create_index( + "ix_search_title_suggestions_prefix", + "search_title_suggestions", + ["prefix_keys"], + postgresql_using="gin", + ) + op.create_index( + "ix_search_title_suggestions_article", + "search_title_suggestions", + ["article_id"], + ) + op.create_index( + "ix_search_title_suggestions_published", + "search_title_suggestions", + ["published_at"], + ) + + # === 4) articles 触发器:title_zh 写时自动维护 search_title_suggestions === + # 触发器函数:删除旧条目,按优先级(title_zh > title)插新条目 + # 限制:标题字符数 > 50 时只取前 50 字符(prefix_keys 数组不会爆炸) + op.execute( + """ + CREATE OR REPLACE FUNCTION rebuild_title_suggestions() RETURNS TRIGGER AS $$ + DECLARE + src_text text; + src_lang text; + max_len int := 50; + BEGIN + -- 先删掉该文章旧条目 + DELETE FROM search_title_suggestions WHERE article_id = NEW.id; + + -- 优先用 title_zh(翻译后),没有再回退 title(短新闻路径) + IF NEW.title_zh IS NOT NULL AND length(NEW.title_zh) > 0 THEN + src_text := NEW.title_zh; + src_lang := 'zh'; + ELSIF NEW.title IS NOT NULL AND length(NEW.title) > 0 THEN + src_text := NEW.title; + src_lang := 'src'; + ELSE + RETURN NEW; + END IF; + + -- 截断到 max_len 字符(prefix_keys 长度可控) + src_text := substring(src_text, 1, max_len); + + -- 插入一条,prefix_keys 是从第 1 字到全词的所有前缀 + INSERT INTO search_title_suggestions + (article_id, title_lang, prefix_keys, published_at) + SELECT + NEW.id, + src_lang, + ARRAY( + SELECT substring(src_text, 1, n) + FROM generate_series(1, length(src_text)) AS n + ), + NEW.published_at; + + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + """ + ) + + op.execute( + """ + CREATE TRIGGER trg_articles_rebuild_title_suggestions + AFTER INSERT OR UPDATE OF title_zh, title, published_at ON articles + FOR EACH ROW EXECUTE FUNCTION rebuild_title_suggestions(); + """ + ) + + # === 5) articles 删除时清理 === + # 用 ON DELETE CASCADE 即可,不用单独触发器 + + # === 6) search_keywords 刷新函数(给 worker 调用) === + # 设计:全量 truncate + insert(用 ts_stat + 标题聚合) + # 调用方式: SELECT refresh_search_keywords(); + op.execute( + """ + CREATE OR REPLACE FUNCTION refresh_search_keywords() RETURNS void AS $$ + BEGIN + TRUNCATE search_keywords; + + -- A) ts_stat 词频(title_zh + body_zh_text + commentary) + INSERT INTO search_keywords (keyword, source, weight, prefix_keys) + SELECT + word, + 'ts_stat', + nentry::int, + ARRAY( + SELECT substring(word, 1, n) + FROM generate_series(1, length(word)) AS n + ) + FROM ts_stat( + 'simple', + ( + SELECT to_tsvector( + 'simple', + coalesce(title_zh, '') || ' ' || + coalesce(body_zh_text, '') || ' ' || + coalesce(commentary, '') || ' ' || + coalesce(commentary_meituan, '') + ) + FROM articles + WHERE title_zh IS NOT NULL + OR body_zh_text IS NOT NULL + OR commentary IS NOT NULL + OR commentary_meituan IS NOT NULL + ) + ) + WHERE length(word) >= 2; -- 过滤单字噪音(中文标点/单字停用词) + + END; + $$ LANGUAGE plpgsql; + """ + ) + + +def downgrade() -> None: + op.execute("DROP TRIGGER IF EXISTS trg_articles_rebuild_title_suggestions ON articles") + op.execute("DROP FUNCTION IF EXISTS rebuild_title_suggestions()") + op.execute("DROP FUNCTION IF EXISTS refresh_search_keywords()") + + op.drop_index("ix_search_title_suggestions_published", table_name="search_title_suggestions") + op.drop_index("ix_search_title_suggestions_article", table_name="search_title_suggestions") + op.drop_index("ix_search_title_suggestions_prefix", table_name="search_title_suggestions") + op.drop_table("search_title_suggestions") + + op.drop_index("ix_search_keywords_keyword_btree", table_name="search_keywords") + op.drop_index("ix_search_keywords_source_weight", table_name="search_keywords") + op.drop_index("ix_search_keywords_prefix", table_name="search_keywords") + op.drop_table("search_keywords") + + op.drop_index("ix_articles_title_zh_tsv", table_name="articles") + op.drop_column("articles", "title_zh_tsv") diff --git a/backend/app/api/search.py b/backend/app/api/search.py new file mode 100644 index 0000000..4f16aa9 --- /dev/null +++ b/backend/app/api/search.py @@ -0,0 +1,47 @@ +"""/api/v1/search/* — 搜索建议(autocomplete)。 + +- GET /api/v1/search/suggestions?q=prefix + 返回:{"query", "titles": [...], "keywords": [...]} + - titles: 真实文章标题(按 published_at DESC),B 方案 + - keywords: 高频词(按 weight DESC),A 方案 + - 冷启动:任一表空时自动 fallback 到实时 ILIKE / ts_stat +- 鉴权:跟 articles 一致(需要登录) + +性能:两个查询都走 GIN 数组索引(prefix_keys @> ARRAY['美']),亚毫秒。 +""" +from __future__ import annotations + +from fastapi import APIRouter, Depends, Query +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.deps import get_current_user +from app.database import get_session +from app.models.user import User +from app.schemas.search import ( + SearchKeywordItem, + SearchSuggestionsResponse, + SearchTitleSuggestionItem, +) +from app.services.search import SearchService + +router = APIRouter(prefix="/search", tags=["search"]) + + +@router.get("/suggestions", response_model=SearchSuggestionsResponse) +async def get_suggestions( + q: str = Query(..., min_length=1, max_length=20, description="搜索前缀"), + limit: int = Query(10, ge=1, le=20, description="每组最多返回多少"), + _user: User = Depends(get_current_user), # 需要登录,跟 articles 一致 + session: AsyncSession = Depends(get_session), +): + """搜索建议:输入 prefix,返回真实标题 + 高频词两组候选。 + + 用法:前端搜索框 onChange 时调用,debounce 200ms。 + """ + svc = SearchService(session) + raw = await svc.suggestions(q=q, limit=limit) + return SearchSuggestionsResponse( + query=raw["query"], + titles=[SearchTitleSuggestionItem(**t) for t in raw["titles"]], + keywords=[SearchKeywordItem(**k) for k in raw["keywords"]], + ) diff --git a/backend/app/main.py b/backend/app/main.py index dba40d8..114f6be 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -16,7 +16,7 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from starlette.exceptions import HTTPException as StarletteHTTPException -from app.api import admin, admin_llm, articles, auth, bookmarks, ingest, me, sources, subscriptions +from app.api import admin, admin_llm, articles, auth, bookmarks, ingest, me, search, sources, subscriptions from app.config import settings from app.database import engine from app.redis_client import close_redis, get_redis @@ -100,6 +100,7 @@ app.include_router(sources.router, prefix=API_PREFIX) app.include_router(bookmarks.router, prefix=API_PREFIX) app.include_router(subscriptions.router, prefix=API_PREFIX) app.include_router(ingest.router, prefix=API_PREFIX) +app.include_router(search.router, prefix=API_PREFIX) app.include_router(admin.router, prefix=API_PREFIX) app.include_router(admin_llm.router, prefix=API_PREFIX) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index fc2d0a8..b17bef6 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -7,6 +7,8 @@ from app.models.article import Article # noqa: F401 from app.models.article_read import ArticleRead # noqa: F401 from app.models.bookmark import Bookmark # noqa: F401 from app.models.llm_setting import LlmSetting # noqa: F401 +from app.models.search_keyword import SearchKeyword # noqa: F401 +from app.models.search_title_suggestion import SearchTitleSuggestion # noqa: F401 from app.models.source import Source, SourceKind # noqa: F401 from app.models.subscription import Subscription # noqa: F401 from app.models.user import User, UserRole # noqa: F401 @@ -17,6 +19,8 @@ __all__ = [ "ArticleRead", "Bookmark", "LlmSetting", + "SearchKeyword", + "SearchTitleSuggestion", "Source", "SourceKind", "Subscription", diff --git a/backend/app/models/search_keyword.py b/backend/app/models/search_keyword.py new file mode 100644 index 0000000..06ff9e7 --- /dev/null +++ b/backend/app/models/search_keyword.py @@ -0,0 +1,45 @@ +"""搜索建议候选词表(固化,worker 每日 ts_stat 刷新)。 + +- 数据源:articles.title_zh + body_zh_text + commentary + commentary_meituan +- 用途:/api/v1/search/suggestions 返回"高频词"建议(A 方案) +- 刷新:每日凌晨 worker 调 refresh_search_keywords() 全量重建 +- 查询:prefix_keys @> ARRAY['美'] 走 GIN 索引(亚毫秒) +""" +from __future__ import annotations + +from datetime import datetime + +from sqlalchemy import BigInteger, DateTime, Integer, String, Text, func +from sqlalchemy.dialects.postgresql import ARRAY +from sqlalchemy.orm import Mapped, mapped_column + +from app.database import Base + + +class SearchKeyword(Base): + __tablename__ = "search_keywords" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + + keyword: Mapped[str] = mapped_column(Text, nullable=False) + # ts_stat / title_extract / manual + source: Mapped[str] = mapped_column(String(32), nullable=False) + # 词频或文章数(权重,排序用) + weight: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + # 预计算前缀数组,['美','美联储','美联储宣'] for '美联储宣布...' + prefix_keys: Mapped[list[str]] = mapped_column(ARRAY(Text), nullable=False) + + last_seen_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now(), nullable=False + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now(), nullable=False + ) + + __table_args__ = ( + # db-level unique 留给 alembic 迁移创建(__table_args__ 只是 ORM 侧参考) + # 实际 UNIQUE 约束在 0009 迁移里建 + ) + + def __repr__(self) -> str: + return f"" diff --git a/backend/app/models/search_title_suggestion.py b/backend/app/models/search_title_suggestion.py new file mode 100644 index 0000000..70aae04 --- /dev/null +++ b/backend/app/models/search_title_suggestion.py @@ -0,0 +1,43 @@ +"""搜索建议 - 真实文章标题片段表(articles 写入 trigger 自动维护)。 + +- 数据源:articles.title_zh(优先)/ articles.title(短新闻回退) +- 用途:/api/v1/search/suggestions 返回"真实文章标题"建议(B 方案) +- 维护:PG trigger(articles INSERT/UPDATE OF title_zh/title/published_at 触发) +- 查询:prefix_keys @> ARRAY['美'] 走 GIN 索引,按 published_at DESC 排序 +""" +from __future__ import annotations + +from datetime import datetime + +from sqlalchemy import BigInteger, DateTime, ForeignKey, String, func +from sqlalchemy.dialects.postgresql import ARRAY, TEXT +from sqlalchemy.orm import Mapped, mapped_column + +from app.database import Base + + +class SearchTitleSuggestion(Base): + __tablename__ = "search_title_suggestions" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + + article_id: Mapped[int] = mapped_column( + BigInteger, + ForeignKey("articles.id", ondelete="CASCADE"), + nullable=False, + ) + + # 该条用的是哪边的文本:'zh' (title_zh) / 'src' (title 短新闻回退) + title_lang: Mapped[str] = mapped_column(String(8), nullable=False, default="zh") + + # 预计算前缀数组(从第 1 字到全词) + prefix_keys: Mapped[list[str]] = mapped_column(ARRAY(TEXT), nullable=False) + + published_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now(), nullable=False + ) + + def __repr__(self) -> str: + return f"" diff --git a/backend/app/schemas/search.py b/backend/app/schemas/search.py new file mode 100644 index 0000000..9032900 --- /dev/null +++ b/backend/app/schemas/search.py @@ -0,0 +1,24 @@ +"""搜索建议 schema。""" +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel + + +class SearchTitleSuggestionItem(BaseModel): + id: int # article_id + published_at: datetime | None = None + lang: str # 'zh' / 'src' + + +class SearchKeywordItem(BaseModel): + word: str + weight: int + source: str # 'ts_stat' / 'title_extract' / 'manual' / 'ts_stat_live' + + +class SearchSuggestionsResponse(BaseModel): + query: str + titles: list[SearchTitleSuggestionItem] = [] + keywords: list[SearchKeywordItem] = [] diff --git a/backend/app/scripts/backfill_search_suggestions.py b/backend/app/scripts/backfill_search_suggestions.py new file mode 100644 index 0000000..d90b0a8 --- /dev/null +++ b/backend/app/scripts/backfill_search_suggestions.py @@ -0,0 +1,156 @@ +"""回灌 search_title_suggestions 表。 + +- 迁移 0009 给 articles 加了 trigger,新写入的会自动维护 +- 但迁移前已有的 articles 没经过 trigger,需要这个脚本一次性回填 +- 同时可以手动跑一次 refresh_search_keywords()(可选,worker 也会跑) + +用法: + cd backend + python -m app.scripts.backfill_search_suggestions + # 或 docker: + docker compose exec api python -m app.scripts.backfill_search_suggestions + +设计: +- 用 batch INSERT,避免逐行 trigger 重复触发(虽然 trigger 已经在迁移里创建, + 重复执行对已存在的条目会先 DELETE 再 INSERT,等价于刷新,无害) +- 进度条:每 1000 篇打一行 +- 失败:有 article 字段异常不会阻塞其他 +""" +from __future__ import annotations + +import asyncio +import logging +import sys +from datetime import datetime, timezone + +from sqlalchemy import select, text +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database import AsyncSessionLocal +from app.models.article import Article +from app.models.search_title_suggestion import SearchTitleSuggestion + +logger = logging.getLogger("news.backfill_search") +logging.basicConfig( + level="INFO", + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) + + +MAX_TITLE_LEN = 50 # 跟迁移里的 trigger 一致 +BATCH_SIZE = 500 + + +def _build_prefix_keys(text_value: str) -> list[str]: + """从 '美联储宣布...' 生成 ['美','美联储','美联储宣',...,'美联储宣布...']""" + text_value = (text_value or "")[:MAX_TITLE_LEN] + if not text_value: + return [] + return [text_value[:n] for n in range(1, len(text_value) + 1)] + + +async def _process_article_batch( + session: AsyncSession, + articles: list[Article], +) -> int: + """处理一批 articles,UPSERT 到 search_title_suggestions。 + + 返回成功插入/更新的条数。 + """ + rows = [] + for art in articles: + if art.title_zh and len(art.title_zh.strip()) > 0: + src_text = art.title_zh.strip()[:MAX_TITLE_LEN] + lang = "zh" + elif art.title and len(art.title.strip()) > 0: + src_text = art.title.strip()[:MAX_TITLE_LEN] + lang = "src" + else: + continue + + rows.append( + { + "article_id": art.id, + "title_lang": lang, + "prefix_keys": _build_prefix_keys(src_text), + "published_at": art.published_at, + } + ) + + if not rows: + return 0 + + # 用 PG 原生 ON CONFLICT 实现 UPSERT(基于 article_id 唯一约束) + # 注意:表没建 unique on article_id,所以先 DELETE 再 INSERT + # 性能:批量 DELETE 在 article_id 上没索引,可能慢;临时加索引: + # CREATE INDEX IF NOT EXISTS tmp_idx ON search_title_suggestions(article_id); + # 简化:每个 batch 内逐条 DELETE 再 INSERT(慢但稳) + # 替代方案:直接 TRUNCATE + 全量重灌(回填场景下更简单) + for r in rows: + await session.execute( + text("DELETE FROM search_title_suggestions WHERE article_id = :aid"), + {"aid": r["article_id"]}, + ) + # bulk insert + await session.execute(pg_insert(SearchTitleSuggestion), rows) + await session.commit() + return len(rows) + + +async def backfill() -> None: + """主流程:分批拉 articles,回灌 search_title_suggestions。""" + started = datetime.now(timezone.utc) + async with AsyncSessionLocal() as session: + # 总数 + total = (await session.execute(select(Article.id))).all() + total_count = len(total) + logger.info("backfill start: %d articles to process", total_count) + + processed = 0 + last_id = 0 + while True: + rows = ( + await session.execute( + select(Article) + .where(Article.id > last_id) + .order_by(Article.id) + .limit(BATCH_SIZE) + ) + ).scalars().all() + if not rows: + break + n = await _process_article_batch(session, list(rows)) + processed += n + last_id = rows[-1].id + logger.info( + "progress: %d / %d (%.1f%%)", + processed, total_count, + processed / total_count * 100 if total_count else 0, + ) + + elapsed = (datetime.now(timezone.utc) - started).total_seconds() + logger.info("backfill done: %d rows in %.1fs", processed, elapsed) + + # 顺便触发一次 search_keywords 刷新(让词频表也有数据) + logger.info("triggering refresh_search_keywords()...") + async with AsyncSessionLocal() as session: + try: + await session.execute(text("SELECT refresh_search_keywords()")) + await session.commit() + logger.info("refresh_search_keywords() done") + except Exception as e: + logger.exception("refresh_search_keywords failed: %s (worker 03:00 会再跑)", e) + + +def main() -> int: + try: + asyncio.run(backfill()) + except KeyboardInterrupt: + logger.warning("interrupted") + return 1 + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/backend/app/services/search.py b/backend/app/services/search.py new file mode 100644 index 0000000..427e7cf --- /dev/null +++ b/backend/app/services/search.py @@ -0,0 +1,160 @@ +"""搜索建议服务:混合 A(高频词)+ B(真实标题) + 冷启动 fallback。 + +- A: search_keywords(prefix_keys @> ARRAY['美'], ORDER BY weight DESC) +- B: search_title_suggestions(prefix_keys @> ARRAY['美'], ORDER BY published_at DESC) +- fallback: 任一表空时回退实时 ILIKE 查 articles(冷启动 / worker 没刷新过) +""" +from __future__ import annotations + +import logging + +from sqlalchemy import desc, select +from sqlalchemy.dialects.postgresql import ARRAY +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.article import Article +from app.models.search_keyword import SearchKeyword +from app.models.search_title_suggestion import SearchTitleSuggestion + +logger = logging.getLogger("news.search") + + +class SearchService: + """搜索建议 service。 + + 设计:输入 prefix,返回 { titles, keywords } 两组候选。 + - titles 真实文章标题(按 published_at DESC 排) + - keywords 高频词(按 weight DESC 排) + - 任一为空时回退实时 articles.title_zh ILIKE 查询(冷启动兜底) + """ + + def __init__(self, session: AsyncSession): + self.session = session + + async def suggestions( + self, + q: str, + limit: int = 10, + ) -> dict[str, list[dict]]: + """返回搜索建议。 + + Args: + q: 前缀(1-20 字符) + limit: 每组最多返回多少(默认 10,最大 20) + + Returns: + {"query": q, "titles": [...], "keywords": [...]} + titles 元素:{"id": article_id, "published_at": ...} + keywords 元素:{"word": ..., "weight": ...} + """ + q = q.strip() + if not q: + return {"query": q, "titles": [], "keywords": []} + + # 1) 查 search_title_suggestions(B 方案) + title_rows = await self.session.execute( + select( + SearchTitleSuggestion.article_id, + SearchTitleSuggestion.published_at, + SearchTitleSuggestion.title_lang, + ) + .where(SearchTitleSuggestion.prefix_keys.contains([q])) + .order_by(desc(SearchTitleSuggestion.published_at)) + .limit(limit) + ) + titles = [ + { + "id": row.article_id, + "published_at": row.published_at.isoformat() if row.published_at else None, + "lang": row.title_lang, + } + for row in title_rows.all() + ] + + # 2) 查 search_keywords(A 方案) + kw_rows = await self.session.execute( + select(SearchKeyword.keyword, SearchKeyword.weight, SearchKeyword.source) + .where(SearchKeyword.prefix_keys.contains([q])) + .order_by(desc(SearchKeyword.weight)) + .limit(limit) + ) + keywords = [ + {"word": row.keyword, "weight": row.weight, "source": row.source} + for row in kw_rows.all() + ] + + # 3) 冷启动 fallback:任一为空时,回退到实时 ILIKE articles + # (如果两张固化表都跑空了,说明刚建库或数据被 truncate) + if not titles: + titles = await self._fallback_titles(q, limit) + if not keywords: + keywords = await self._fallback_keywords(q, limit) + + return {"query": q, "titles": titles, "keywords": keywords} + + async def _fallback_titles(self, q: str, limit: int) -> list[dict]: + """回退:实时查 articles.title_zh / title(走 B-tree 索引,慢但能用)。 + + - 优先 title_zh LIKE(翻译后),没有再 LIKE title(短新闻) + - 限制 7 天内的文章,避免返回太老的(冷启动场景下用户预期) + """ + from datetime import datetime, timedelta, timezone + + from sqlalchemy import or_ + + since = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(days=7) + like = f"{q}%" + stmt = ( + select(Article.id, Article.published_at, Article.title_zh, Article.title) + .where( + Article.published_at >= since, + Article.duplicate_of.is_(None), + or_( + Article.title_zh.ilike(like), + Article.title.ilike(like), + ), + ) + .order_by(desc(Article.published_at)) + .limit(limit) + ) + rows = (await self.session.execute(stmt)).all() + return [ + { + "id": row.id, + "published_at": row.published_at.isoformat() if row.published_at else None, + "lang": "zh" if row.title_zh else "src", + } + for row in rows + ] + + async def _fallback_keywords(self, q: str, limit: int) -> list[dict]: + """回退:ts_stat 实时聚合(慢但能用)。 + + - 从 articles.title_zh + body_zh_text 实时 to_tsvector + - 适用:search_keywords 表空 + ts_stat 之前的全量聚合 + """ + from sqlalchemy import text + + sql = text( + """ + SELECT word, nentry::int AS weight + FROM ts_stat( + 'simple', + ( + SELECT to_tsvector( + 'simple', + coalesce(title_zh, '') || ' ' || coalesce(body_zh_text, '') + ) + FROM articles + WHERE title_zh IS NOT NULL OR body_zh_text IS NOT NULL + ) + ) + WHERE word LIKE :prefix + ORDER BY nentry DESC + LIMIT :lim + """ + ) + rows = ( + await self.session.execute(sql, {"prefix": f"{q}%", "lim": limit}) + ).all() + return [{"word": r.word, "weight": r.weight, "source": "ts_stat_live"} for r in rows] diff --git a/backend/app/workers/__main__.py b/backend/app/workers/__main__.py index 3897029..88faea8 100644 --- a/backend/app/workers/__main__.py +++ b/backend/app/workers/__main__.py @@ -7,13 +7,13 @@ from __future__ import annotations import asyncio import logging import signal -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.interval import IntervalTrigger -from sqlalchemy import select +from sqlalchemy import select, text from app.config import settings from app.database import AsyncSessionLocal @@ -28,6 +28,22 @@ logging.basicConfig( ) +async def _refresh_search_keywords() -> None: + """每日刷新 search_keywords(ts_stat 词频表)。 + + - 调用 PG 函数 refresh_search_keywords()(迁移 0009 创建) + - 全量 truncate + insert,词频会变,不适合增量 + - 失败也不应阻塞 worker,只记 log + """ + try: + async with AsyncSessionLocal() as s: + await s.execute(text("SELECT refresh_search_keywords()")) + await s.commit() + logger.info("search_keywords refreshed") + except Exception as e: + logger.exception("search_keywords refresh failed: %s", e) + + async def _rebuild_jobs(scheduler: AsyncIOScheduler) -> None: """从 sources 表动态构建 job(可热更新)。 @@ -95,6 +111,23 @@ async def main() -> None: id="startup_run", ) + # === 搜索建议相关 === + # 每日凌晨 03:00 刷新 search_keywords(ts_stat 词频) + scheduler.add_job( + _refresh_search_keywords, + trigger=CronTrigger(hour=3, minute=0), + id="refresh_search_keywords", + replace_existing=True, + ) + # 启动时延迟 10 秒跑一次(冷启动友好,worker 起来时 search_keywords 就有数据; + # 延迟是等 DB 完全就绪 + 不和 startup_run 抢资源) + scheduler.add_job( + _refresh_search_keywords, + trigger=DateTrigger(run_date=datetime.now() + timedelta(seconds=10)), + id="startup_refresh_search_keywords", + ) + logger.info("scheduled: refresh_search_keywords daily 03:00 + on startup (+10s)") + scheduler.start() logger.info("scheduler started with %d jobs", len(scheduler.get_jobs())) diff --git a/frontend/src/api/search.ts b/frontend/src/api/search.ts new file mode 100644 index 0000000..95b618d --- /dev/null +++ b/frontend/src/api/search.ts @@ -0,0 +1,35 @@ +import type { AxiosRequestConfig } from 'axios' +import { http } from './client' + +export interface SearchTitleSuggestion { + id: number + published_at: string | null + lang: string // 'zh' / 'src' +} + +export interface SearchKeyword { + word: string + weight: number + source: string // 'ts_stat' / 'title_extract' / 'manual' / 'ts_stat_live' +} + +export interface SearchSuggestionsResponse { + query: string + titles: SearchTitleSuggestion[] + keywords: SearchKeyword[] +} + +/** 搜索建议(autocomplete)。q 必须是 1-20 字符前缀。 */ +export const searchApi = { + async suggestions( + q: string, + limit = 10, + config?: AxiosRequestConfig, + ): Promise { + const { data } = await http.get('/search/suggestions', { + params: { q, limit }, + ...config, + }) + return data + }, +} diff --git a/frontend/src/composables/useDebounce.ts b/frontend/src/composables/useDebounce.ts new file mode 100644 index 0000000..4e65cc3 --- /dev/null +++ b/frontend/src/composables/useDebounce.ts @@ -0,0 +1,37 @@ +/** + * 通用 debounce composable。 + * + * 用法: + * const debouncedFn = useDebounce((v: string) => doSomething(v), 250) + * // 在 watch/input 里: + * debouncedFn(value) + * + * 为什么不直接 lodash.debounce:项目没装 lodash,这个场景不值得装; + * 实现 ~15 行,不引依赖。 + */ +import { onBeforeUnmount } from 'vue' + +export function useDebounce any>( + fn: T, + delay = 250, +): (...args: Parameters) => void { + let timer: ReturnType | null = null + + const debounced = (...args: Parameters) => { + if (timer) clearTimeout(timer) + timer = setTimeout(() => { + timer = null + fn(...args) + }, delay) + } + + // 组件卸载时清掉挂起的 timer,避免内存泄漏 + setState on unmounted + onBeforeUnmount(() => { + if (timer) { + clearTimeout(timer) + timer = null + } + }) + + return debounced +} diff --git a/frontend/src/views/Feed.vue b/frontend/src/views/Feed.vue index 9f414fa..d7aa780 100644 --- a/frontend/src/views/Feed.vue +++ b/frontend/src/views/Feed.vue @@ -1,11 +1,13 @@