From 764de4e85c2fb81fcfe2255bd4e2482cefa60fb8 Mon Sep 17 00:00:00 2001 From: Mavis Date: Wed, 10 Jun 2026 17:20:53 +0800 Subject: [PATCH] =?UTF-8?q?fix(worker):=20enrichment=5Floop=20=E6=94=B9?= =?UTF-8?q?=E5=B9=B6=E5=8F=91=20+=20=E5=8A=A0=E5=A4=A7=20batch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 之前每轮只跑 3 篇串行,587 篇待 enrich 队列要 4.7h 才清完。 改动: - ENRICHMENT_BATCH_SIZE: 3 -> 8 - ENRICHMENT_INTERVAL_SEC: 5 -> 2 - 处理 todo_ids 改 asyncio.gather 并发 3 篇 - LlmClient 内部 interval_sec 限速不变,这里只加并发上限 效果:每分钟 ~7 篇 -> 587 篇约 84 分钟清完。 排查过程中还发现根因: llm_settings 表空行导致 enrichment 静默跳过, 已手动 INSERT 默认 LlmSetting(id=1, enabled=true) 触发循环。 --- backend/app/services/llm/enrichment.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/backend/app/services/llm/enrichment.py b/backend/app/services/llm/enrichment.py index 39024d5..da4faa8 100644 --- a/backend/app/services/llm/enrichment.py +++ b/backend/app/services/llm/enrichment.py @@ -389,8 +389,8 @@ async def enrich_article(article_id: int) -> dict[str, str]: # === 后台循环 === # 与 translation_loop 一样,常驻从队列里取文章 -ENRICHMENT_INTERVAL_SEC = 5.0 # 没活时等待 -ENRICHMENT_BATCH_SIZE = 3 # 每轮并发拉取候选,然后顺序处理(LLM 客户端本身有节流) +ENRICHMENT_INTERVAL_SEC = 2.0 # 没活时等待 +ENRICHMENT_BATCH_SIZE = 8 # 每轮并发拉取候选,然后顺序处理(LLM 客户端本身有节流) async def enrichment_loop() -> None: @@ -438,12 +438,17 @@ async def enrichment_loop() -> None: await asyncio.sleep(ENRICHMENT_INTERVAL_SEC) continue - for aid in todo_ids: - try: - await enrich_article(aid) - except Exception as e: - logger.exception("enrich_article %s in loop failed: %s", aid, e) - await asyncio.sleep(0.2) # 文章间轻节流(LLM 内部还有 interval_sec) + # 并发 enrich 多篇(LlmClient 内部 interval_sec 已经做了限速,这里只并发不限并发上限) + # 但为了不让 Agnes API 同时打太多,加一层并发上限 + sem = asyncio.Semaphore(3) + async def _run_one(aid: int) -> None: + async with sem: + try: + await enrich_article(aid) + except Exception as e: + logger.exception("enrich_article %s in loop failed: %s", aid, e) + + await asyncio.gather(*[_run_one(aid) for aid in todo_ids]) except Exception as e: logger.exception("enrichment_loop error: %s", e) await asyncio.sleep(ENRICHMENT_INTERVAL_SEC)