Files
diary-news/backend/app/workers/__main__.py
mavis c3aa0f0cb6 feat(search): 智能搜索建议 - 固化候选词表 (search_keywords + search_title_suggestions)
后端:
- alembic 0009: 两张固化表 + GIN prefix_keys 索引 + articles trigger
- /api/v1/search/suggestions: 混合 A(高频词 ts_stat) + B(真实标题) + 冷启动 fallback
- worker 每日 03:00 + 启动时刷新 search_keywords
- 顺便填 commit 11 TODO: articles.title_zh_tsv + GIN 索引(未来 FTS 基础)

前端:
- NInput -> NAutoComplete + debounce 250ms
- 选标题 -> 跳详情;选关键词 -> 填入 + 触发搜索
- AbortController 防 race condition

性能: prefix_keys @> ARRAY[prefix] 走 GIN 亚毫秒,100w 行也稳
2026-06-15 18:26:35 +08:00

169 lines
5.6 KiB
Python

"""Worker 入口:启动调度器 + 异步任务。
`docker compose exec worker python -m app.workers`
"""
from __future__ import annotations
import asyncio
import logging
import signal
from datetime import datetime, timedelta, timezone
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import select, text
from app.config import settings
from app.database import AsyncSessionLocal
from app.models.source import Source
from app.services.llm.enrichment import enrichment_loop
from app.workers.pipeline import fetch_one_source, run_once, translation_loop
logger = logging.getLogger("news.worker")
logging.basicConfig(
level=settings.log_level,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
async def _refresh_search_keywords() -> None:
"""每日刷新 search_keywords(ts_stat 词频表)。
- 调用 PG 函数 refresh_search_keywords()(迁移 0009 创建)
- 全量 truncate + insert,词频会变,不适合增量
- 失败也不应阻塞 worker,只记 log
"""
try:
async with AsyncSessionLocal() as s:
await s.execute(text("SELECT refresh_search_keywords()"))
await s.commit()
logger.info("search_keywords refreshed")
except Exception as e:
logger.exception("search_keywords refresh failed: %s", e)
async def _rebuild_jobs(scheduler: AsyncIOScheduler) -> None:
"""从 sources 表动态构建 job(可热更新)。
只调度有抓取语义的源(rss / html_list / tg_channel);
api_push 是被动接收,不进 fetch 调度。
"""
scheduler.remove_all_jobs()
async with AsyncSessionLocal() as s:
rows = (await s.execute(select(Source).where(Source.enabled.is_(True)))).scalars()
sources = list(rows)
if not sources:
logger.warning("no enabled sources; scheduler idle")
return
for src in sources:
# api_push 源不抓取(由 /api/v1/ingest 被动接收),跳过调度
if src.kind.value == "api_push":
logger.debug("skip scheduling api_push source: %s", src.slug)
continue
trigger = (
CronTrigger.from_crontab(src.fetch_cron)
if src.fetch_cron
else IntervalTrigger(minutes=src.fetch_interval_min)
)
scheduler.add_job(
fetch_one_source,
trigger=trigger,
args=[src.id],
id=f"src:{src.slug}",
replace_existing=True,
max_instances=1,
coalesce=True,
misfire_grace_time=300,
)
logger.info("scheduled %s every %s", src.slug, src.fetch_cron or f"{src.fetch_interval_min}m")
async def _daily_rebuild() -> None:
"""每天 00:30 重建 job 列表(支持运行时新增源)。"""
scheduler = AsyncIOScheduler()
# 临时实例,只为重建用
# 实际用全局 scheduler 实例
pass
def build_scheduler() -> AsyncIOScheduler:
sched = AsyncIOScheduler(timezone="Asia/Hong_Kong")
return sched
async def main() -> None:
scheduler = build_scheduler()
await _rebuild_jobs(scheduler)
# 每天 00:30 重建一次
scheduler.add_job(
_rebuild_jobs,
trigger=CronTrigger(hour=0, minute=30),
args=[scheduler],
id="rebuild_jobs",
replace_existing=True,
)
# 启动时立即跑一次(只一次,用 DateTrigger 避免 IntervalTrigger 被 max_instances 拒绝刷日志)
scheduler.add_job(
run_once,
trigger=DateTrigger(run_date=datetime.now(timezone.utc)),
id="startup_run",
)
# === 搜索建议相关 ===
# 每日凌晨 03:00 刷新 search_keywords(ts_stat 词频)
scheduler.add_job(
_refresh_search_keywords,
trigger=CronTrigger(hour=3, minute=0),
id="refresh_search_keywords",
replace_existing=True,
)
# 启动时延迟 10 秒跑一次(冷启动友好,worker 起来时 search_keywords 就有数据;
# 延迟是等 DB 完全就绪 + 不和 startup_run 抢资源)
scheduler.add_job(
_refresh_search_keywords,
trigger=DateTrigger(run_date=datetime.now() + timedelta(seconds=10)),
id="startup_refresh_search_keywords",
)
logger.info("scheduled: refresh_search_keywords daily 03:00 + on startup (+10s)")
scheduler.start()
logger.info("scheduler started with %d jobs", len(scheduler.get_jobs()))
# 独立的翻译后台循环(不和 RSS 抓取并行;1 篇/秒)
translation_task = asyncio.create_task(translation_loop(), name="translation_loop")
logger.info("translation_loop task scheduled (1 article/sec)")
# 独立的 LLM 增强后台循环(翻译完成后,跑 4 项 LLM 任务)
enrichment_task = asyncio.create_task(enrichment_loop(), name="enrichment_loop")
logger.info("enrichment_loop task scheduled (scans translated articles)")
stop = asyncio.Event()
def _signal_handler():
logger.info("shutdown signal received")
stop.set()
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, _signal_handler)
except NotImplementedError:
# Windows 等不支持
pass
await stop.wait()
logger.info("stopping scheduler and background loops")
for t in (translation_task, enrichment_task):
t.cancel()
try:
await t
except asyncio.CancelledError:
pass
scheduler.shutdown(wait=False)
if __name__ == "__main__":
asyncio.run(main())