"""LLM 智能增强服务(翻译后调)。 5 个独立任务(按顺序): 1. classify — 分类 + 黑名单 gate(命中则删文章,后 4 步跳过) 2. format — 排版译文(写入 body_zh_formatted,容器用 .article-body + 段落 .diary-para) 3. image — 生成插图(写入 image_ai_url,prompt 用正文第一段) 4. commentary_angel — 写 Angel 评论(写入 commentary) 5. commentary_meituan — 写美团评论(写入 commentary_meituan) 双 provider 评论:Angel + 美团 大模型(LongCat) 并行,各自独立 try/except, 任一失败不影响另一个。commentary_engine 字段记录实际写入的 provider。 排版容器 CSS(固定,不再让用户改): - 字体: system-ui 字体栈 - 字号: 19px(2026-06 调大,从 17 → 19,提升阅读舒适度) - 行高: 1.7 - 颜色: #3e3e3e - 段落: margin-bottom 1.5em(自动空一行);class 名固定为 diary-para 黑名单机制: - classify 任务合并 llm_settings.blocklist_tags(全局) + source.blocklist_tags(per-source) - 注入到 prompt 的 {blocklist} 占位符,LLM 返回 {"drop": true, "categories": [...]} 则删文章 - 合并去重后为空 → classify 任务只产出 categories 不产出 drop 设计: - 任务入口: enrich_article(article_id) - 任务间互不影响:每个任务独立 try/except + 写 status - 全部任务共走 LlmClient 的全局限速 - 若设置 enabled=False,只跳过(不调 LLM) - 用户提示词模板可能不包含全部占位符,用 _safe_format 容错 """ from __future__ import annotations import asyncio import logging from typing import Any, Mapping from sqlalchemy import select from app.database import AsyncSessionLocal from app.models.article import Article from app.models.llm_setting import LlmSetting from app.models.source import Source from app.schemas.llm import get_default_prompts from app.services.llm.client import LlmClient from app.services.llm.providers import ( PROVIDER_ANGEL, PROVIDER_COMMENTARY_DEFAULTS, PROVIDER_MEITUAN, is_provider_enabled, ) logger = logging.getLogger("news.llm.enrichment") # === 排版容器固定 CSS(项目级固定,不再让用户改)=== # 同时内联到 body_zh_formatted 的容器 div 的 style 属性上, # 保证分享/邮件/导出场景下样式不丢;前端全局 .article-body 类做兜底。 ARTICLE_BODY_FONT_FAMILY = ( "system-ui, -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, " "'Helvetica Neue', sans-serif" ) ARTICLE_BODY_FONT_SIZE = "19px" ARTICLE_BODY_LINE_HEIGHT = "1.7" ARTICLE_BODY_COLOR = "#3e3e3e" ARTICLE_BODY_P_MARGIN_BOTTOM = "1.5em" # === 排版段落 class 名(项目级固定,前端 .diary-para 兜底)=== DIARY_PARA_CLASS = "diary-para" # === 插图默认尺寸(适中,不再用 1024x768)=== # 写死到 enrichment 里,行为稳定;setting.image_size 仍可由用户在 UI 改, # 但默认行为不依赖它,避免意外被改成很大。 DEFAULT_IMAGE_SIZE = "512x384" DEFAULT_IMAGE_FIRST_PARA_CHARS = 400 # 提取第一段最多用这么多字 DEFAULT_IMAGE_MAX_TAGS = 5 # 分类标签上限(多标签) class _SafeDict(dict): """missing 返回 {key} 本身(占位符原样保留),不抛 KeyError。""" def __missing__(self, key: str) -> str: # type: ignore[override] return "{" + key + "}" def _safe_format(template: str, vars_: Mapping[str, Any]) -> str: """用 _SafeDict 跑 str.format,缺失的占位符保留原样而不是 KeyError。 用途:数据库里用户已存的 prompt 模板可能是旧版的(只支持部分占位符), 新代码传了更多变量也不应崩。 防御: - 模板里出现的非占位符 `{` / `}`(比如示例 JSON `{"k": "v"}`)会被先 escape 成 `{{` / `}}`, 避免 str.format 误解析为占位符/格式说明符而抛 ValueError。 - 用户显式写的 `{{` / `}}`(标准 str.format 转义语法)会被原样保留,不被重复 escape。 """ import re placeholder_re = re.compile(r"\{([A-Za-z_][A-Za-z0-9_.\[\]]*)\}") sentinels: list[str] = [] sentinel_map: dict[str, str] = {} user_escape: list[str] = [] def _stash(m: re.Match) -> str: name = m.group(1) s = f"\x00PH{len(sentinels)}\x00" sentinels.append(name) sentinel_map[s] = name return s def _stash_brace(s: str) -> str: sentinel = f"\x00UE{len(user_escape)}\x00" user_escape.append(s) return sentinel # 1) 先 stash 用户显式 {{ / }} staged = template.replace("{{", _stash_brace("{{")).replace("}}", _stash_brace("}}")) # 2) stash 合法占位符 staged = placeholder_re.sub(_stash, staged) # 3) escape 剩下的单个 { / }(示例 JSON 等字面量) escaped = staged.replace("{", "{{").replace("}", "}}") # 4) 还原占位符 final = escaped for s, name in sentinel_map.items(): final = final.replace(s, "{" + name + "}") # 5) 还原用户显式 {{ / }} for i, raw in enumerate(user_escape): final = final.replace(f"\x00UE{i}\x00", raw) try: return final.format_map(_SafeDict(vars_)) except (KeyError, IndexError, ValueError) as e: # 极端情况兜底:按原文返回 logger.warning("_safe_format 解析失败,按原文返回: %s", e) return template # === 获取当前设置(行锁 + 缓存刷新)=== async def get_setting() -> LlmSetting: """读 llm_settings 单行;不存在则用默认值插入。""" async with AsyncSessionLocal() as session: row = (await session.execute(select(LlmSetting).where(LlmSetting.id == 1))).scalar_one_or_none() if row is None: defaults = get_default_prompts() row = LlmSetting( id=1, format_prompt=defaults["format_prompt"], classify_prompt=defaults["classify_prompt"], commentary_prompt=defaults["commentary_prompt"], image_prompt_template=defaults["image_prompt_template"], ) session.add(row) await session.commit() await session.refresh(row) return row # === 双 provider 评论 === # Angel: commentary / commentary_status(沿用旧字段,完全不动) # 美团: commentary_meituan / commentary_meituan_status / commentary_meituan_model / commentary_meituan_error # commentary_engine 记录实际写入的 provider:angel / meituan / "angel,meituan" # === 单任务:format === async def _enrich_format(article: Article, setting: LlmSetting, client: LlmClient) -> None: template = setting.format_prompt or get_default_prompts()["format_prompt"] prompt = _safe_format(template, {"body": (article.body_zh_text or "")[:6000]}) text = await client.chat( system="你是中文新闻排版助手,只输出排版后的纯文本。", user=prompt, temperature=0.3, max_tokens=2000, ) # 段落 class 名固定为 diary-para(项目级固定,前端 .diary-para 兜底); # CSS 仍内联到 style,保证分享/导出场景不丢。 parts = [ f'

{p.strip()}

' for p in text.split("\n\n") if p.strip() ] if not parts: article.body_zh_formatted = None else: article.body_zh_formatted = _wrap_article_body("\n".join(parts)) article.format_status = "ok" def _wrap_article_body(inner_html: str) -> str: """把排版好的段落包到带固定 CSS 的
里。 CSS 同时内联到 style 属性(分享/导出样式不丢)+ class 名(前端全局类可覆盖)。 """ inline_style = ( f"font-family:{ARTICLE_BODY_FONT_FAMILY};" f"font-size:{ARTICLE_BODY_FONT_SIZE};" f"line-height:{ARTICLE_BODY_LINE_HEIGHT};" f"color:{ARTICLE_BODY_COLOR};" ) # 段落样式也内联,保证 v-html 渲染时一定生效 p_style = f"margin:0 0 {ARTICLE_BODY_P_MARGIN_BOTTOM} 0;" # 内层 HTML 的

形式;把 style 插到 class 后面 inner_with_p_style = inner_html.replace( f'

', f'

', ) return f'

{inner_with_p_style}
' # === 单任务:classify (含黑名单 drop gate) === async def _enrich_classify( article: Article, setting: LlmSetting, client: LlmClient, blocklist: list[str], ) -> tuple[bool, list[str]]: """分类 + 黑名单判断。 返回 (drop, categories): - drop=True → 整篇文章应删除(分类命中 blocklist) - categories → 写入 article.category 的多标签列表 """ template = setting.classify_prompt or get_default_prompts()["classify_prompt"] # 老 prompt 可能只支持 {title}/{summary},不支持 {body} / {blocklist} —— _safe_format 兜底 vars_ = { "title": (article.title_zh or article.title)[:200], "summary": (article.summary_zh or "")[:400], "body": (article.body_zh_text or "")[:1500], "blocklist": "、".join(blocklist) if blocklist else "(无)", } prompt = _safe_format(template, vars_) result = await client.classify_json( system="你是新闻分类助手,只返回 JSON。", user=prompt, ) cats_raw = result.get("categories") or result.get("tags") or [] cats: list[str] = [] if isinstance(cats_raw, list): cats = [str(c).strip() for c in cats_raw[:DEFAULT_IMAGE_MAX_TAGS] if str(c).strip()] # 兼容两种来源:LLM 自己判断的 drop 字段,或后端兜底检查命中 drop_flag = bool(result.get("drop")) if not drop_flag and blocklist: # 兜底:即使 LLM 没正确返回 drop 字段,我们也用本地匹配兜底 bl_set = {b.strip() for b in blocklist if b and b.strip()} drop_flag = any(c in bl_set for c in cats) return drop_flag, cats def _merge_blocklist(setting: LlmSetting, source: Source | None) -> list[str]: """合并全局 + per-source blocklist,去重保序。""" out: list[str] = [] seen: set[str] = set() sources: list[list[str]] = [setting.blocklist_tags or []] if source is not None: sources.append(source.blocklist_tags or []) for src in sources: for t in src: t = (t or "").strip() if t and t not in seen: seen.add(t) out.append(t) return out # === 单任务:image === async def _enrich_image(article: Article, setting: LlmSetting, client: LlmClient) -> None: template = setting.image_prompt_template or get_default_prompts()["image_prompt_template"] # 用正文第一段作为 prompt(英文 prompt 走 title 仍可工作,所以 title 也带上作 fallback) first_para = _first_paragraph(article.body_zh_text or "", max_chars=DEFAULT_IMAGE_FIRST_PARA_CHARS) if not first_para: first_para = (article.title_zh or article.title or "")[:200] title_for_prompt = (article.title_zh or article.title or "")[:200] # template 同时支持 {body} 和 {title} 两种占位符;老的只支持 {title} 也能跑 prompt = _safe_format(template, {"body": first_para, "title": title_for_prompt}) url = await client.generate_image(prompt, size=DEFAULT_IMAGE_SIZE) article.image_ai_url = url article.image_ai_status = "ok" def _first_paragraph(text: str, max_chars: int) -> str: """取正文第一段(按 \\n\\n 切)。如果首段超长就截断。""" if not text: return "" for p in text.split("\n\n"): p = p.strip() if p: return p[:max_chars] return "" # === 单任务:commentary(provider 通用版)=== # provider=PROVIDER_ANGEL → 写入 commentary / commentary_status(老字段,完全不动) # provider=PROVIDER_MEITUAN → 写入 commentary_meituan / commentary_meituan_status / commentary_meituan_model / commentary_meituan_error def _default_commentary_prompt() -> str: return get_default_prompts()["commentary_prompt"] async def _enrich_commentary_angel( article: Article, setting: LlmSetting, client: LlmClient ) -> None: """Angel 评论 — 写入老字段(向后兼容)。""" template = setting.commentary_prompt or _default_commentary_prompt() prompt = _safe_format( template, { "title": (article.title_zh or article.title)[:200], "body": (article.body_zh_text or "")[:3000], }, ) defaults = PROVIDER_COMMENTARY_DEFAULTS[PROVIDER_ANGEL] text = await client.chat( system=defaults["system"], user=prompt, temperature=defaults["temperature"], max_tokens=defaults["max_tokens"], ) article.commentary = text or None article.commentary_status = "ok" # 记录 provider(已存在的 "angel" / 追加为 "angel,meituan") engines = set(filter(None, (article.commentary_engine or "").split(","))) engines.add(PROVIDER_ANGEL) article.commentary_engine = ",".join(sorted(engines)) async def _enrich_commentary_meituan( article: Article, setting: LlmSetting, client: LlmClient ) -> None: """美团评论 — 写入 commentary_meituan 等新字段。""" # 优先用 setting.meituan_commentary_prompt,留空用默认 template = setting.meituan_commentary_prompt or _default_commentary_prompt() prompt = _safe_format( template, { "title": (article.title_zh or article.title)[:200], "body": (article.body_zh_text or "")[:3000], }, ) defaults = PROVIDER_COMMENTARY_DEFAULTS[PROVIDER_MEITUAN] try: text = await client.chat( system=defaults["system"], user=prompt, temperature=defaults["temperature"], max_tokens=defaults["max_tokens"], ) article.commentary_meituan = text or None article.commentary_meituan_status = "ok" article.commentary_meituan_error = None article.commentary_meituan_model = client.chat_model engines = set(filter(None, (article.commentary_engine or "").split(","))) engines.add(PROVIDER_MEITUAN) article.commentary_engine = ",".join(sorted(engines)) except Exception as e: # 美团 provider 失败,标 failed 但不影响 Angel article.commentary_meituan_status = "failed" article.commentary_meituan_error = f"{type(e).__name__}: {e}"[:1000] article.commentary_meituan = None raise # === 总编排:enrich_article === async def enrich_article(article_id: int) -> dict[str, str]: """对单篇文章做 5 项 LLM 增强。 顺序:classify(黑名单 gate) → format → image → commentary(angel + meituan 并行) - classify 命中 blocklist → 整篇文章 DELETE,后续任务直接 return - 任一任务失败,只标 status 不影响其他任务 - 双 provider 评论:Angel 和美团 用 asyncio.gather 并行,任一失败不影响另一个 返回 {task: status} 字典(用于日志)。 """ async with AsyncSessionLocal() as session: art = ( await session.execute(select(Article).where(Article.id == article_id)) ).scalar_one_or_none() if not art: logger.warning("enrich_article: id=%s not found", article_id) return {} if not (art.title_zh or art.body_zh_text): logger.info("enrich_article: id=%s no translation yet, skip", article_id) return {} # 拉取设置 setting = await get_setting() if not setting.enabled: logger.info("enrich_article: llm disabled, skip id=%s", article_id) return { "format": "skipped", "classify": "skipped", "image": "skipped", "commentary_angel": "skipped", "commentary_meituan": "skipped", } # 用配置生成 client(允许热改设置) client = LlmClient( chat_model=setting.chat_model, image_model=setting.image_model, interval_sec=setting.interval_sec, ) # 美团 provider client(可能为 None = 未配置) meituan_client = None if is_provider_enabled(PROVIDER_MEITUAN, setting): from app.services.llm.providers import get_meituan_client meituan_client = get_meituan_client(setting) results: dict[str, str] = {} async with AsyncSessionLocal() as session: art = ( await session.execute(select(Article).where(Article.id == article_id)) ).scalar_one_or_none() if not art: return {} # === 1) classify(黑名单 gate,优先执行)=== blocklist = _merge_blocklist(setting, art.source if art.source_id else None) try: drop, cats = await _enrich_classify(art, setting, client, blocklist) art.classify_status = "ok" if cats: art.category = ",".join(cats)[:64] or None if drop: # 命中 blocklist → 删文章,后续 4 步全跳 logger.info( "enrich_article id=%s dropped (blocklist hit, cats=%s, blocklist=%s)", article_id, cats, blocklist, ) await session.delete(art) await session.commit() return { "classify": "dropped", "format": "skipped", "image": "skipped", "commentary_angel": "skipped", "commentary_meituan": "skipped", } except Exception as e: logger.exception("enrich classify failed for article %s: %s", article_id, e) art.classify_status = "failed" results["classify"] = f"failed:{type(e).__name__}" # classify 失败也继续(format/image/commentary 还能跑) # === 2) format === try: await _enrich_format(art, setting, client) results["format"] = "ok" except Exception as e: logger.exception("enrich format failed for article %s: %s", article_id, e) art.format_status = "failed" results["format"] = f"failed:{type(e).__name__}" # === 3) image === try: await _enrich_image(art, setting, client) results["image"] = "ok" except Exception as e: logger.exception("enrich image failed for article %s: %s", article_id, e) art.image_ai_status = "failed" results["image"] = f"failed:{type(e).__name__}" # === 4 + 5) commentary_angel + commentary_meituan 并行 === # 关键:每个 provider 独立的 try/except,任一失败不影响另一个 # 但 gather 需要返回 tuple,这里用嵌套函数封装 async def _safe_angel() -> None: try: await _enrich_commentary_angel(art, setting, client) results["commentary_angel"] = "ok" except Exception as e: logger.exception("enrich commentary_angel failed for article %s: %s", article_id, e) art.commentary_status = "failed" results["commentary_angel"] = f"failed:{type(e).__name__}" async def _safe_meituan() -> None: if meituan_client is None: art.commentary_meituan_status = "n/a" results["commentary_meituan"] = "n/a" return try: await _enrich_commentary_meituan(art, setting, meituan_client) results["commentary_meituan"] = "ok" except Exception as e: logger.exception("enrich commentary_meituan failed for article %s: %s", article_id, e) # status 已在内部置 failed results["commentary_meituan"] = f"failed:{type(e).__name__}" await asyncio.gather(_safe_angel(), _safe_meituan()) await session.commit() logger.info("enrich_article id=%s: %s", article_id, results) return results # === 后台循环 === # 与 translation_loop 一样,常驻从队列里取文章 ENRICHMENT_INTERVAL_SEC = 2.0 # 没活时等待 ENRICHMENT_BATCH_SIZE = 8 # 每轮并发拉取候选,然后顺序处理(LLM 客户端本身有节流) async def enrichment_loop() -> None: """扫描已翻译但未 enrich 的文章(任一 *_status 为 pending/n/a 且 translation_status=ok)。 跟 translation_loop 一样常驻。 """ logger.info("enrichment_loop started") # 等一下让翻译先跑 await asyncio.sleep(10) while True: try: async with AsyncSessionLocal() as session: # 精准定位待 enrich 的文章:已翻译 + 任一 LLM 状态 ∈ {n/a, pending, failed} # (不能用 order_by id ASC + 内存过滤:已 enrich 的文章 id 可能更小,会占满 limit, # 让 enrichment_loop 永远看不到后面大 id 的 n/a 文章 — 真实踩过的坑) rows = ( await session.execute( select(Article) .where( Article.translation_status == "ok", Article.title_zh.is_not(None), # 任一 LLM 状态不是 ok(包括 NULL) ( (Article.classify_status.is_(None)) | (Article.classify_status != "ok") | (Article.format_status.is_(None)) | (Article.format_status != "ok") | (Article.commentary_status.is_(None)) | (Article.commentary_status != "ok") | (Article.image_ai_status.is_(None)) | (Article.image_ai_status != "ok") | (Article.commentary_meituan_status.is_(None)) | (Article.commentary_meituan_status.in_(("n/a", "pending", "failed"))) ), ) .order_by(Article.id.asc()) .limit(ENRICHMENT_BATCH_SIZE * 5) # 比 batch 略多 ) ).scalars() candidates = list(rows) # 过滤:任一 *_status 是 pending(包括 NULL 和 n/a) todo_ids: list[int] = [] for a in candidates: statuses = [ a.format_status or "pending", a.classify_status or "pending", a.image_ai_status or "pending", a.commentary_status or "pending", a.commentary_meituan_status or "pending", ] if any(s in ("pending", "failed", "n/a") for s in statuses): todo_ids.append(a.id) if len(todo_ids) >= ENRICHMENT_BATCH_SIZE: break if not todo_ids: await asyncio.sleep(ENRICHMENT_INTERVAL_SEC) continue # 并发 enrich 多篇(LlmClient 内部 interval_sec 已经做了限速,这里只并发不限并发上限) # 但为了不让 LLM API 同时打太多,加一层并发上限 sem = asyncio.Semaphore(3) async def _run_one(aid: int) -> None: async with sem: try: await enrich_article(aid) except Exception as e: logger.exception("enrich_article %s in loop failed: %s", aid, e) await asyncio.gather(*[_run_one(aid) for aid in todo_ids]) except Exception as e: logger.exception("enrichment_loop error: %s", e) await asyncio.sleep(ENRICHMENT_INTERVAL_SEC)