"""Worker 入口:启动调度器 + 异步任务。 `docker compose exec worker python -m app.workers` """ from __future__ import annotations import asyncio import logging import signal from datetime import datetime, timezone from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger from sqlalchemy import select from app.config import settings from app.database import AsyncSessionLocal from app.models.source import Source from app.services.llm.enrichment import enrichment_loop from app.workers.pipeline import fetch_one_source, run_once, translation_loop logger = logging.getLogger("news.worker") logging.basicConfig( level=settings.log_level, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) async def _rebuild_jobs(scheduler: AsyncIOScheduler) -> None: """从 sources 表动态构建 job(可热更新)。""" scheduler.remove_all_jobs() async with AsyncSessionLocal() as s: rows = (await s.execute(select(Source).where(Source.enabled.is_(True)))).scalars() sources = list(rows) if not sources: logger.warning("no enabled sources; scheduler idle") return for src in sources: trigger = ( CronTrigger.from_crontab(src.fetch_cron) if src.fetch_cron else IntervalTrigger(minutes=src.fetch_interval_min) ) scheduler.add_job( fetch_one_source, trigger=trigger, args=[src.id], id=f"src:{src.slug}", replace_existing=True, max_instances=1, coalesce=True, misfire_grace_time=300, ) logger.info("scheduled %s every %s", src.slug, src.fetch_cron or f"{src.fetch_interval_min}m") async def _daily_rebuild() -> None: """每天 00:30 重建 job 列表(支持运行时新增源)。""" scheduler = AsyncIOScheduler() # 临时实例,只为重建用 # 实际用全局 scheduler 实例 pass def build_scheduler() -> AsyncIOScheduler: sched = AsyncIOScheduler(timezone="Asia/Hong_Kong") return sched async def main() -> None: scheduler = build_scheduler() await _rebuild_jobs(scheduler) # 每天 00:30 重建一次 scheduler.add_job( _rebuild_jobs, trigger=CronTrigger(hour=0, minute=30), args=[scheduler], id="rebuild_jobs", replace_existing=True, ) # 启动时立即跑一次 scheduler.add_job( run_once, trigger=IntervalTrigger(minutes=0), id="startup_run", next_run_time=datetime.now(timezone.utc), ) scheduler.start() logger.info("scheduler started with %d jobs", len(scheduler.get_jobs())) # 独立的翻译后台循环(不和 RSS 抓取并行;1 篇/秒) translation_task = asyncio.create_task(translation_loop(), name="translation_loop") logger.info("translation_loop task scheduled (1 article/sec)") # 独立的 LLM 增强后台循环(翻译完成后,跑 4 项 LLM 任务) enrichment_task = asyncio.create_task(enrichment_loop(), name="enrichment_loop") logger.info("enrichment_loop task scheduled (scans translated articles)") stop = asyncio.Event() def _signal_handler(): logger.info("shutdown signal received") stop.set() loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): try: loop.add_signal_handler(sig, _signal_handler) except NotImplementedError: # Windows 等不支持 pass await stop.wait() logger.info("stopping scheduler and background loops") for t in (translation_task, enrichment_task): t.cancel() try: await t except asyncio.CancelledError: pass scheduler.shutdown(wait=False) if __name__ == "__main__": asyncio.run(main())