From 3091f291b23e08e2acc87cb43b56659686074ab8 Mon Sep 17 00:00:00 2001 From: xiaji Date: Sun, 14 Jun 2026 15:51:22 +0800 Subject: [PATCH] =?UTF-8?q?feat(ingest):=20API=20Push=20=E7=9F=AD=E6=96=B0?= =?UTF-8?q?=E9=97=BB=E6=95=B0=E6=8D=AE=E5=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - alembic 0008:articles 加 is_short_news/external_id/source_ref/content_hash (UNIQUE);sources.kind 加 'api_push';api_tokens 加 purpose + source_id - SourceKind.API_PUSH enum;Article/ApiToken model 加新字段 - enrichment_article 短新闻跳过 format/image; enrichment_loop SQL 加 is_short_news 路径(并入'可 enrich' 条件) - 入库侧由 commit 2(ingest 接口)负责:写 body_zh_text=body_text, format/image/commentary_meituan_status='n/a', classify/commentary_status='pending'(带 tags 时 classify='ok') 无迁移爆炸半径:articles.url 保持 NOT NULL,短新闻合成 api-push:// 占位 --- backend/alembic/versions/0008_api_push.py | 116 ++++++++++++++++++++++ backend/app/models/api_token.py | 16 ++- backend/app/models/article.py | 11 ++ backend/app/models/source.py | 1 + backend/app/services/llm/enrichment.py | 73 ++++++++++---- 5 files changed, 194 insertions(+), 23 deletions(-) create mode 100644 backend/alembic/versions/0008_api_push.py diff --git a/backend/alembic/versions/0008_api_push.py b/backend/alembic/versions/0008_api_push.py new file mode 100644 index 0000000..6471142 --- /dev/null +++ b/backend/alembic/versions/0008_api_push.py @@ -0,0 +1,116 @@ +"""API Push 短新闻来源 + +新增字段: +- articles.is_short_news BOOL NOT NULL DEFAULT false (索引) +- articles.external_id VARCHAR(128) nullable (索引) +- articles.source_ref VARCHAR(64) nullable (索引) +- articles.content_hash VARCHAR(40) nullable UNIQUE (索引,内容去重核心 key) +- articles.url TEXT nullable (放宽 — 短新闻可合成 url) +- sources.kind ENUM 加 'api_push' 值 +- api_tokens.purpose VARCHAR(16) NOT NULL DEFAULT 'mobile' (索引) + 值域: mobile / ingest +- api_tokens.source_id INTEGER NULL FK sources.id ON DELETE CASCADE (索引, + ingest token 绑定的 source) + +Revision ID: 0008 +Revises: 0007 +Create Date: 2026-06-14 +""" +from __future__ import annotations + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + + +revision: str = "0008" +down_revision: Union[str, None] = "0007" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # === 1) sources.kind 加 'api_push' === + # PG enum 加 value 必须用 ALTER TYPE,alembic 没有原生 op,直接 execute + op.execute("ALTER TYPE source_kind ADD VALUE IF NOT EXISTS 'api_push'") + + # === 2) articles:5 个字段 === + # 2.1) is_short_news + op.add_column( + "articles", + sa.Column( + "is_short_news", + sa.Boolean, + nullable=False, + server_default=sa.text("false"), + ), + ) + op.create_index("ix_articles_is_short_news", "articles", ["is_short_news"]) + + # 2.2) external_id + op.add_column( + "articles", + sa.Column("external_id", sa.String(128), nullable=True), + ) + op.create_index("ix_articles_external_id", "articles", ["external_id"]) + + # 2.3) source_ref + op.add_column( + "articles", + sa.Column("source_ref", sa.String(64), nullable=True), + ) + op.create_index("ix_articles_source_ref", "articles", ["source_ref"]) + + # 2.4) content_hash (UNIQUE,核心去重 key) + op.add_column( + "articles", + sa.Column("content_hash", sa.String(40), nullable=True), + ) + op.create_index("ix_articles_content_hash", "articles", ["content_hash"], unique=True) + + # 注:articles.url 保持 NOT NULL。短新闻入库时会合成 "api-push://source-slug/content-hash" + # 作为占位,避免改动下游 schema(ArticleDetail.url 等)引出更大爆炸半径。 + + # === 3) api_tokens.purpose + source_id === + op.add_column( + "api_tokens", + sa.Column( + "purpose", + sa.String(16), + nullable=False, + server_default="mobile", + ), + ) + op.create_index("ix_api_tokens_purpose", "api_tokens", ["purpose"]) + op.add_column( + "api_tokens", + sa.Column( + "source_id", + sa.Integer, + sa.ForeignKey("sources.id", ondelete="CASCADE"), + nullable=True, + ), + ) + op.create_index("ix_api_tokens_source_id", "api_tokens", ["source_id"]) + + +def downgrade() -> None: + # === 反向顺序 === + op.drop_index("ix_api_tokens_source_id", table_name="api_tokens") + op.drop_column("api_tokens", "source_id") + op.drop_index("ix_api_tokens_purpose", table_name="api_tokens") + op.drop_column("api_tokens", "purpose") + + op.drop_index("ix_articles_content_hash", table_name="articles") + op.drop_column("articles", "content_hash") + op.drop_index("ix_articles_source_ref", table_name="articles") + op.drop_column("articles", "source_ref") + op.drop_index("ix_articles_external_id", table_name="articles") + op.drop_column("articles", "external_id") + op.drop_index("ix_articles_is_short_news", table_name="articles") + op.drop_column("articles", "is_short_news") + + # PG enum remove value 没有原生支持,需要重建类型。 + # 这里只保留注释,提醒运维:downgrade 后如需去掉 api_push enum 值, + # 需手工 ALTER TYPE (CREATE TYPE ... + ALTER COLUMN + DROP TYPE)。 \ No newline at end of file diff --git a/backend/app/models/api_token.py b/backend/app/models/api_token.py index fc39e43..1729ae8 100644 --- a/backend/app/models/api_token.py +++ b/backend/app/models/api_token.py @@ -1,14 +1,19 @@ -"""API Token(给 Android 用,可独立撤销)。""" +"""API Token(给 Android + API Push 短新闻 ingest 用,可独立撤销)。""" from __future__ import annotations from datetime import datetime -from sqlalchemy import DateTime, ForeignKey, String, func +from sqlalchemy import DateTime, ForeignKey, Integer, String, func from sqlalchemy.orm import Mapped, mapped_column from app.database import Base +# Token 用途 +TOKEN_PURPOSE_MOBILE = "mobile" # Android 客户端 +TOKEN_PURPOSE_INGEST = "ingest" # API Push 短新闻 /api/v1/ingest + + class ApiToken(Base): __tablename__ = "api_tokens" @@ -19,6 +24,13 @@ class ApiToken(Base): name: Mapped[str] = mapped_column(String(64), nullable=False) # "Xiaomi-14" token_hash: Mapped[str] = mapped_column(String(128), unique=True, nullable=False, index=True) # 只存 hash,原始 token 一次性返回给用户 + purpose: Mapped[str] = mapped_column( + String(16), default=TOKEN_PURPOSE_MOBILE, nullable=False, index=True + ) + # ingest 专用:绑定的 source_id(purpose=ingest 时使用,mobile 时为 NULL) + source_id: Mapped[int | None] = mapped_column( + ForeignKey("sources.id", ondelete="CASCADE"), nullable=True, index=True + ) last_used_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) expires_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) revoked_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) diff --git a/backend/app/models/article.py b/backend/app/models/article.py index d0b8039..060807f 100644 --- a/backend/app/models/article.py +++ b/backend/app/models/article.py @@ -5,6 +5,7 @@ from datetime import datetime from sqlalchemy import ( BigInteger, + Boolean, DateTime, Float, ForeignKey, @@ -36,6 +37,16 @@ class Article(Base): url_hash: Mapped[str] = mapped_column(String(40), unique=True, nullable=False, index=True) guid: Mapped[str | None] = mapped_column(String(255), index=True) # 源站给的 ID + # === API Push 短新闻特有 === + is_short_news: Mapped[bool] = mapped_column( + Boolean, default=False, nullable=False, index=True + ) + external_id: Mapped[str | None] = mapped_column(String(128), index=True) # 调用方幂等 key + source_ref: Mapped[str | None] = mapped_column(String(64), index=True) # 短新闻里再细分来源 + content_hash: Mapped[str | None] = mapped_column( + String(40), unique=True, index=True + ) # 内容指纹,核心去重 key(NULL 不参与 unique) + # === 原文内容 === title: Mapped[str] = mapped_column(Text, nullable=False) body_html: Mapped[str | None] = mapped_column(Text) # 抽取后保留结构 diff --git a/backend/app/models/source.py b/backend/app/models/source.py index 1f7ed62..bfff757 100644 --- a/backend/app/models/source.py +++ b/backend/app/models/source.py @@ -24,6 +24,7 @@ class SourceKind(str, enum.Enum): RSS = "rss" HTML_LIST = "html_list" TG_CHANNEL = "tg_channel" + API_PUSH = "api_push" # 外部 POST /api/v1/ingest 推送短新闻 class Source(Base): diff --git a/backend/app/services/llm/enrichment.py b/backend/app/services/llm/enrichment.py index c2dc8e5..1605efc 100644 --- a/backend/app/services/llm/enrichment.py +++ b/backend/app/services/llm/enrichment.py @@ -35,7 +35,7 @@ import asyncio import logging from typing import Any, Mapping -from sqlalchemy import select +from sqlalchemy import and_, or_, select from app.database import AsyncSessionLocal from app.models.article import Article @@ -419,6 +419,8 @@ async def enrich_article(article_id: int) -> dict[str, str]: if not art: logger.warning("enrich_article: id=%s not found", article_id) return {} + # 短新闻(API Push):无 translation,但仍需 enrich(classify + commentary) + # 入库时已把 body_text 复制到 body_zh_text,所以这里可以走统一判断 if not (art.title_zh or art.body_zh_text): logger.info("enrich_article: id=%s no translation yet, skip", article_id) return {} @@ -451,6 +453,11 @@ async def enrich_article(article_id: int) -> dict[str, str]: if not art: return {} + # === 短新闻(API Push):跳过 format 和 image(短文不需要排版,用户明确不要配图)=== + # 短新闻入表时 format_status / image_ai_status 已置 'n/a',这里再 ensure 一次 + # 防止未来 ingest 路径忘了设 status。 + is_short = bool(art.is_short_news) + # === 1) classify(黑名单 gate,优先执行)=== blocklist = _merge_blocklist(setting, art.source if art.source_id else None) try: @@ -476,23 +483,31 @@ async def enrich_article(article_id: int) -> dict[str, str]: results["classify"] = f"failed:{type(e).__name__}" # classify 失败也继续(format/image/commentary 还能跑) - # === 2) format === - try: - await _enrich_format(art, setting, client) - results["format"] = "ok" - except Exception as e: - logger.exception("enrich format failed for article %s: %s", article_id, e) - art.format_status = "failed" - results["format"] = f"failed:{type(e).__name__}" + # === 2) format(短新闻跳过)=== + if is_short: + art.format_status = "n/a" + results["format"] = "skipped" + else: + try: + await _enrich_format(art, setting, client) + results["format"] = "ok" + except Exception as e: + logger.exception("enrich format failed for article %s: %s", article_id, e) + art.format_status = "failed" + results["format"] = f"failed:{type(e).__name__}" - # === 3) image === - try: - await _enrich_image(art, setting, client) - results["image"] = "ok" - except Exception as e: - logger.exception("enrich image failed for article %s: %s", article_id, e) - art.image_ai_status = "failed" - results["image"] = f"failed:{type(e).__name__}" + # === 3) image(短新闻跳过)=== + if is_short: + art.image_ai_status = "n/a" + results["image"] = "skipped" + else: + try: + await _enrich_image(art, setting, client) + results["image"] = "ok" + except Exception as e: + logger.exception("enrich image failed for article %s: %s", article_id, e) + art.image_ai_status = "failed" + results["image"] = f"failed:{type(e).__name__}" # === 4 + 5) commentary_angel + commentary_meituan 并行 === # 关键:每个 provider 独立的 try/except,任一失败不影响另一个 @@ -535,6 +550,8 @@ ENRICHMENT_BATCH_SIZE = 8 # 每轮并发拉取候选,然后顺序处理(LLM 客 async def enrichment_loop() -> None: """扫描已翻译但未 enrich 的文章(任一 *_status 为 pending/n/a 且 translation_status=ok)。 + 短新闻(API Push)例外:translation_status='n/a',但 is_short_news=True 也要被捞出来。 + 跟 translation_loop 一样常驻。 """ logger.info("enrichment_loop started") @@ -543,16 +560,30 @@ async def enrichment_loop() -> None: while True: try: async with AsyncSessionLocal() as session: - # 精准定位待 enrich 的文章:已翻译 + 任一 LLM 状态 ∈ {n/a, pending, failed} + # 精准定位待 enrich 的文章: + # - 长新闻:translation_status='ok' + title_zh 非空 + 任一 LLM 状态 != 'ok' + # - 短新闻:is_short_news=True + body_zh_text 非空(入库时已从 body_text 复制) + # + 任一 LLM 状态 != 'ok' # (不能用 order_by id ASC + 内存过滤:已 enrich 的文章 id 可能更小,会占满 limit, # 让 enrichment_loop 永远看不到后面大 id 的 n/a 文章 — 真实踩过的坑) rows = ( await session.execute( select(Article) .where( - Article.translation_status == "ok", - Article.title_zh.is_not(None), - # 任一 LLM 状态不是 ok(包括 NULL) + # === "可 enrich" 条件 === + or_( + # 长新闻 + and_( + Article.translation_status == "ok", + Article.title_zh.is_not(None), + ), + # 短新闻(API Push) + and_( + Article.is_short_news.is_(True), + Article.body_zh_text.is_not(None), + ), + ), + # === "未 enrich" 条件:任一 LLM 状态不是 ok === ( (Article.classify_status.is_(None)) | (Article.classify_status != "ok")