"""LLM 智能增强服务(翻译后调)。 4 个独立任务: 1. format — 排版译文(写入 body_zh_formatted) 2. classify — 分类(写入 category) 3. image — 生成插图(写入 image_ai_url) 4. commentary — 写点评(写入 commentary) 设计: - 任务入口: enrich_article(article_id, settings_row) - 任务间互不影响:每个任务独立 try/except + 写 status - 全部任务共走 LlmClient 的全局限速 - 若设置 enabled=False,只跳过(不调 LLM) """ from __future__ import annotations import asyncio import logging from typing import Any from sqlalchemy import select from app.database import AsyncSessionLocal from app.models.article import Article from app.models.llm_setting import LlmSetting from app.schemas.llm import get_default_prompts from app.services.llm.client import LlmClient logger = logging.getLogger("news.llm.enrichment") # === 获取当前设置(行锁 + 缓存刷新)=== async def get_setting() -> LlmSetting: """读 llm_settings 单行;不存在则用默认值插入。""" async with AsyncSessionLocal() as session: row = (await session.execute(select(LlmSetting).where(LlmSetting.id == 1))).scalar_one_or_none() if row is None: defaults = get_default_prompts() row = LlmSetting( id=1, format_prompt=defaults["format_prompt"], classify_prompt=defaults["classify_prompt"], commentary_prompt=defaults["commentary_prompt"], image_prompt_template=defaults["image_prompt_template"], ) session.add(row) await session.commit() await session.refresh(row) return row # === 单任务:format === async def _enrich_format(article: Article, setting: LlmSetting, client: LlmClient) -> None: prompt = (setting.format_prompt or get_default_prompts()["format_prompt"]).format( body=(article.body_zh_text or "")[:6000] ) text = await client.chat( system="你是中文新闻排版助手,只输出排版后的纯文本。", user=prompt, temperature=0.3, max_tokens=2000, ) # 极简 HTML 包裹:按段切 +
parts = [f"
{p.strip()}
" for p in text.split("\n\n") if p.strip()] article.body_zh_formatted = "\n".join(parts) or None article.format_status = "ok" # === 单任务:classify === async def _enrich_classify(article: Article, setting: LlmSetting, client: LlmClient) -> None: prompt = (setting.classify_prompt or get_default_prompts()["classify_prompt"]).format( title=(article.title_zh or article.title)[:200], summary=(article.summary_zh or "")[:400], ) result = await client.classify_json( system="你是新闻分类助手,只返回 JSON。", user=prompt, ) cats = result.get("categories") or [] if isinstance(cats, list) and cats: article.category = ",".join(str(c).strip() for c in cats[:3])[:32] article.classify_status = "ok" # === 单任务:image === async def _enrich_image(article: Article, setting: LlmSetting, client: LlmClient) -> None: template = (setting.image_prompt_template or get_default_prompts()["image_prompt_template"]) # 默认用 title_zh(若有),否则用原文 title title_for_prompt = (article.title_zh or article.title or "")[:200] prompt = template.format(title=title_for_prompt) url = await client.generate_image(prompt, size=setting.image_size) article.image_ai_url = url article.image_ai_status = "ok" # === 单任务:commentary === async def _enrich_commentary(article: Article, setting: LlmSetting, client: LlmClient) -> None: prompt = (setting.commentary_prompt or get_default_prompts()["commentary_prompt"]).format( title=(article.title_zh or article.title)[:200], body=(article.body_zh_text or "")[:3000], ) text = await client.chat( system="你是资深新闻评论员。", user=prompt, temperature=0.6, max_tokens=600, ) article.commentary = text or None article.commentary_status = "ok" # === 总编排:enrich_article === async def enrich_article(article_id: int) -> dict[str, str]: """对单篇文章做 4 项 LLM 增强。 返回 {task: status} 字典(用于日志)。 """ async with AsyncSessionLocal() as session: art = ( await session.execute(select(Article).where(Article.id == article_id)) ).scalar_one_or_none() if not art: logger.warning("enrich_article: id=%s not found", article_id) return {} if not (art.title_zh or art.body_zh_text): logger.info("enrich_article: id=%s no translation yet, skip", article_id) return {} # 拉取设置 setting = await get_setting() if not setting.enabled: logger.info("enrich_article: llm disabled, skip id=%s", article_id) return {"format": "skipped", "classify": "skipped", "image": "skipped", "commentary": "skipped"} # 用配置生成 client(允许热改设置) client = LlmClient( chat_model=setting.chat_model, image_model=setting.image_model, interval_sec=setting.interval_sec, ) results: dict[str, str] = {} async with AsyncSessionLocal() as session: art = ( await session.execute(select(Article).where(Article.id == article_id)) ).scalar_one_or_none() if not art: return {} # 4 个任务(互不影响);format / classify / commentary 是 chat,image 是 image # 串行执行(已经过 client 内部 Semaphore),但每个 try/except 独立 tasks: list[tuple[str, Any]] = [ ("format", _enrich_format(art, setting, client)), ("classify", _enrich_classify(art, setting, client)), ("image", _enrich_image(art, setting, client)), ("commentary", _enrich_commentary(art, setting, client)), ] for name, coro in tasks: try: await coro results[name] = "ok" except Exception as e: logger.exception("enrich %s failed for article %s: %s", name, article_id, e) results[name] = f"failed:{type(e).__name__}" # 标 status if name == "format": art.format_status = "failed" elif name == "classify": art.classify_status = "failed" elif name == "image": art.image_ai_status = "failed" elif name == "commentary": art.commentary_status = "failed" await session.commit() logger.info("enrich_article id=%s: %s", article_id, results) return results # === 后台循环 === # 与 translation_loop 一样,常驻从队列里取文章 ENRICHMENT_INTERVAL_SEC = 5.0 # 没活时等待 ENRICHMENT_BATCH_SIZE = 1 async def enrichment_loop() -> None: """扫描已翻译但未 enrich 的文章(任一 *_status 为 pending/n/a 且 translation_status=ok)。 跟 translation_loop 一样常驻。 """ logger.info("enrichment_loop started") # 等一下让翻译先跑 await asyncio.sleep(10) while True: try: async with AsyncSessionLocal() as session: # 已翻译完成 + 4 个状态中至少有一个是 pending rows = ( await session.execute( select(Article) .where( Article.translation_status == "ok", Article.title_zh.is_not(None), ) .order_by(Article.translated_at.asc().nullslast(), Article.id.asc()) .limit(ENRICHMENT_BATCH_SIZE * 5) # 多取几个找需要 enrich 的 ) ).scalars() candidates = list(rows) # 过滤:任一 *_status 是 pending todo_ids: list[int] = [] for a in candidates: statuses = [ a.format_status or "pending", a.classify_status or "pending", a.image_ai_status or "pending", a.commentary_status or "pending", ] if any(s in ("pending", "failed", "n/a") for s in statuses): todo_ids.append(a.id) if len(todo_ids) >= ENRICHMENT_BATCH_SIZE: break if not todo_ids: await asyncio.sleep(ENRICHMENT_INTERVAL_SEC) continue for aid in todo_ids: try: await enrich_article(aid) except Exception as e: logger.exception("enrich_article %s in loop failed: %s", aid, e) await asyncio.sleep(0.5) # 文章间轻节流 except Exception as e: logger.exception("enrichment_loop error: %s", e) await asyncio.sleep(ENRICHMENT_INTERVAL_SEC)