feat(ingest): API Push 短新闻数据层

- alembic 0008:articles 加 is_short_news/external_id/source_ref/content_hash
  (UNIQUE);sources.kind 加 'api_push';api_tokens 加 purpose + source_id
- SourceKind.API_PUSH enum;Article/ApiToken model 加新字段
- enrichment_article 短新闻跳过 format/image;
  enrichment_loop SQL 加 is_short_news 路径(并入'可 enrich' 条件)
- 入库侧由 commit 2(ingest 接口)负责:写 body_zh_text=body_text,
  format/image/commentary_meituan_status='n/a',
  classify/commentary_status='pending'(带 tags 时 classify='ok')

无迁移爆炸半径:articles.url 保持 NOT NULL,短新闻合成 api-push:// 占位
This commit is contained in:
xiaji
2026-06-14 15:51:22 +08:00
parent f690f1f108
commit 3091f291b2
5 changed files with 194 additions and 23 deletions

View File

@@ -0,0 +1,116 @@
"""API Push 短新闻来源
新增字段:
- articles.is_short_news BOOL NOT NULL DEFAULT false (索引)
- articles.external_id VARCHAR(128) nullable (索引)
- articles.source_ref VARCHAR(64) nullable (索引)
- articles.content_hash VARCHAR(40) nullable UNIQUE (索引,内容去重核心 key)
- articles.url TEXT nullable (放宽 — 短新闻可合成 url)
- sources.kind ENUM 加 'api_push'
- api_tokens.purpose VARCHAR(16) NOT NULL DEFAULT 'mobile' (索引)
值域: mobile / ingest
- api_tokens.source_id INTEGER NULL FK sources.id ON DELETE CASCADE (索引,
ingest token 绑定的 source)
Revision ID: 0008
Revises: 0007
Create Date: 2026-06-14
"""
from __future__ import annotations
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "0008"
down_revision: Union[str, None] = "0007"
branch_labels = None
depends_on = None
def upgrade() -> None:
# === 1) sources.kind 加 'api_push' ===
# PG enum 加 value 必须用 ALTER TYPE,alembic 没有原生 op,直接 execute
op.execute("ALTER TYPE source_kind ADD VALUE IF NOT EXISTS 'api_push'")
# === 2) articles:5 个字段 ===
# 2.1) is_short_news
op.add_column(
"articles",
sa.Column(
"is_short_news",
sa.Boolean,
nullable=False,
server_default=sa.text("false"),
),
)
op.create_index("ix_articles_is_short_news", "articles", ["is_short_news"])
# 2.2) external_id
op.add_column(
"articles",
sa.Column("external_id", sa.String(128), nullable=True),
)
op.create_index("ix_articles_external_id", "articles", ["external_id"])
# 2.3) source_ref
op.add_column(
"articles",
sa.Column("source_ref", sa.String(64), nullable=True),
)
op.create_index("ix_articles_source_ref", "articles", ["source_ref"])
# 2.4) content_hash (UNIQUE,核心去重 key)
op.add_column(
"articles",
sa.Column("content_hash", sa.String(40), nullable=True),
)
op.create_index("ix_articles_content_hash", "articles", ["content_hash"], unique=True)
# 注:articles.url 保持 NOT NULL。短新闻入库时会合成 "api-push://source-slug/content-hash"
# 作为占位,避免改动下游 schema(ArticleDetail.url 等)引出更大爆炸半径。
# === 3) api_tokens.purpose + source_id ===
op.add_column(
"api_tokens",
sa.Column(
"purpose",
sa.String(16),
nullable=False,
server_default="mobile",
),
)
op.create_index("ix_api_tokens_purpose", "api_tokens", ["purpose"])
op.add_column(
"api_tokens",
sa.Column(
"source_id",
sa.Integer,
sa.ForeignKey("sources.id", ondelete="CASCADE"),
nullable=True,
),
)
op.create_index("ix_api_tokens_source_id", "api_tokens", ["source_id"])
def downgrade() -> None:
# === 反向顺序 ===
op.drop_index("ix_api_tokens_source_id", table_name="api_tokens")
op.drop_column("api_tokens", "source_id")
op.drop_index("ix_api_tokens_purpose", table_name="api_tokens")
op.drop_column("api_tokens", "purpose")
op.drop_index("ix_articles_content_hash", table_name="articles")
op.drop_column("articles", "content_hash")
op.drop_index("ix_articles_source_ref", table_name="articles")
op.drop_column("articles", "source_ref")
op.drop_index("ix_articles_external_id", table_name="articles")
op.drop_column("articles", "external_id")
op.drop_index("ix_articles_is_short_news", table_name="articles")
op.drop_column("articles", "is_short_news")
# PG enum remove value 没有原生支持,需要重建类型。
# 这里只保留注释,提醒运维:downgrade 后如需去掉 api_push enum 值,
# 需手工 ALTER TYPE (CREATE TYPE ... + ALTER COLUMN + DROP TYPE)。

View File

@@ -1,14 +1,19 @@
"""API Token(给 Android 用,可独立撤销)。""" """API Token(给 Android + API Push 短新闻 ingest 用,可独立撤销)。"""
from __future__ import annotations from __future__ import annotations
from datetime import datetime from datetime import datetime
from sqlalchemy import DateTime, ForeignKey, String, func from sqlalchemy import DateTime, ForeignKey, Integer, String, func
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
from app.database import Base from app.database import Base
# Token 用途
TOKEN_PURPOSE_MOBILE = "mobile" # Android 客户端
TOKEN_PURPOSE_INGEST = "ingest" # API Push 短新闻 /api/v1/ingest
class ApiToken(Base): class ApiToken(Base):
__tablename__ = "api_tokens" __tablename__ = "api_tokens"
@@ -19,6 +24,13 @@ class ApiToken(Base):
name: Mapped[str] = mapped_column(String(64), nullable=False) # "Xiaomi-14" name: Mapped[str] = mapped_column(String(64), nullable=False) # "Xiaomi-14"
token_hash: Mapped[str] = mapped_column(String(128), unique=True, nullable=False, index=True) token_hash: Mapped[str] = mapped_column(String(128), unique=True, nullable=False, index=True)
# 只存 hash,原始 token 一次性返回给用户 # 只存 hash,原始 token 一次性返回给用户
purpose: Mapped[str] = mapped_column(
String(16), default=TOKEN_PURPOSE_MOBILE, nullable=False, index=True
)
# ingest 专用:绑定的 source_id(purpose=ingest 时使用,mobile 时为 NULL)
source_id: Mapped[int | None] = mapped_column(
ForeignKey("sources.id", ondelete="CASCADE"), nullable=True, index=True
)
last_used_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) last_used_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
expires_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) expires_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
revoked_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) revoked_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))

View File

@@ -5,6 +5,7 @@ from datetime import datetime
from sqlalchemy import ( from sqlalchemy import (
BigInteger, BigInteger,
Boolean,
DateTime, DateTime,
Float, Float,
ForeignKey, ForeignKey,
@@ -36,6 +37,16 @@ class Article(Base):
url_hash: Mapped[str] = mapped_column(String(40), unique=True, nullable=False, index=True) url_hash: Mapped[str] = mapped_column(String(40), unique=True, nullable=False, index=True)
guid: Mapped[str | None] = mapped_column(String(255), index=True) # 源站给的 ID guid: Mapped[str | None] = mapped_column(String(255), index=True) # 源站给的 ID
# === API Push 短新闻特有 ===
is_short_news: Mapped[bool] = mapped_column(
Boolean, default=False, nullable=False, index=True
)
external_id: Mapped[str | None] = mapped_column(String(128), index=True) # 调用方幂等 key
source_ref: Mapped[str | None] = mapped_column(String(64), index=True) # 短新闻里再细分来源
content_hash: Mapped[str | None] = mapped_column(
String(40), unique=True, index=True
) # 内容指纹,核心去重 key(NULL 不参与 unique)
# === 原文内容 === # === 原文内容 ===
title: Mapped[str] = mapped_column(Text, nullable=False) title: Mapped[str] = mapped_column(Text, nullable=False)
body_html: Mapped[str | None] = mapped_column(Text) # 抽取后保留结构 body_html: Mapped[str | None] = mapped_column(Text) # 抽取后保留结构

View File

@@ -24,6 +24,7 @@ class SourceKind(str, enum.Enum):
RSS = "rss" RSS = "rss"
HTML_LIST = "html_list" HTML_LIST = "html_list"
TG_CHANNEL = "tg_channel" TG_CHANNEL = "tg_channel"
API_PUSH = "api_push" # 外部 POST /api/v1/ingest 推送短新闻
class Source(Base): class Source(Base):

View File

@@ -35,7 +35,7 @@ import asyncio
import logging import logging
from typing import Any, Mapping from typing import Any, Mapping
from sqlalchemy import select from sqlalchemy import and_, or_, select
from app.database import AsyncSessionLocal from app.database import AsyncSessionLocal
from app.models.article import Article from app.models.article import Article
@@ -419,6 +419,8 @@ async def enrich_article(article_id: int) -> dict[str, str]:
if not art: if not art:
logger.warning("enrich_article: id=%s not found", article_id) logger.warning("enrich_article: id=%s not found", article_id)
return {} return {}
# 短新闻(API Push):无 translation,但仍需 enrich(classify + commentary)
# 入库时已把 body_text 复制到 body_zh_text,所以这里可以走统一判断
if not (art.title_zh or art.body_zh_text): if not (art.title_zh or art.body_zh_text):
logger.info("enrich_article: id=%s no translation yet, skip", article_id) logger.info("enrich_article: id=%s no translation yet, skip", article_id)
return {} return {}
@@ -451,6 +453,11 @@ async def enrich_article(article_id: int) -> dict[str, str]:
if not art: if not art:
return {} return {}
# === 短新闻(API Push):跳过 format 和 image(短文不需要排版,用户明确不要配图)===
# 短新闻入表时 format_status / image_ai_status 已置 'n/a',这里再 ensure 一次
# 防止未来 ingest 路径忘了设 status。
is_short = bool(art.is_short_news)
# === 1) classify(黑名单 gate,优先执行)=== # === 1) classify(黑名单 gate,优先执行)===
blocklist = _merge_blocklist(setting, art.source if art.source_id else None) blocklist = _merge_blocklist(setting, art.source if art.source_id else None)
try: try:
@@ -476,23 +483,31 @@ async def enrich_article(article_id: int) -> dict[str, str]:
results["classify"] = f"failed:{type(e).__name__}" results["classify"] = f"failed:{type(e).__name__}"
# classify 失败也继续(format/image/commentary 还能跑) # classify 失败也继续(format/image/commentary 还能跑)
# === 2) format === # === 2) format(短新闻跳过)===
try: if is_short:
await _enrich_format(art, setting, client) art.format_status = "n/a"
results["format"] = "ok" results["format"] = "skipped"
except Exception as e: else:
logger.exception("enrich format failed for article %s: %s", article_id, e) try:
art.format_status = "failed" await _enrich_format(art, setting, client)
results["format"] = f"failed:{type(e).__name__}" results["format"] = "ok"
except Exception as e:
logger.exception("enrich format failed for article %s: %s", article_id, e)
art.format_status = "failed"
results["format"] = f"failed:{type(e).__name__}"
# === 3) image === # === 3) image(短新闻跳过)===
try: if is_short:
await _enrich_image(art, setting, client) art.image_ai_status = "n/a"
results["image"] = "ok" results["image"] = "skipped"
except Exception as e: else:
logger.exception("enrich image failed for article %s: %s", article_id, e) try:
art.image_ai_status = "failed" await _enrich_image(art, setting, client)
results["image"] = f"failed:{type(e).__name__}" results["image"] = "ok"
except Exception as e:
logger.exception("enrich image failed for article %s: %s", article_id, e)
art.image_ai_status = "failed"
results["image"] = f"failed:{type(e).__name__}"
# === 4 + 5) commentary_angel + commentary_meituan 并行 === # === 4 + 5) commentary_angel + commentary_meituan 并行 ===
# 关键:每个 provider 独立的 try/except,任一失败不影响另一个 # 关键:每个 provider 独立的 try/except,任一失败不影响另一个
@@ -535,6 +550,8 @@ ENRICHMENT_BATCH_SIZE = 8 # 每轮并发拉取候选,然后顺序处理(LLM 客
async def enrichment_loop() -> None: async def enrichment_loop() -> None:
"""扫描已翻译但未 enrich 的文章(任一 *_status 为 pending/n/a 且 translation_status=ok)。 """扫描已翻译但未 enrich 的文章(任一 *_status 为 pending/n/a 且 translation_status=ok)。
短新闻(API Push)例外:translation_status='n/a',但 is_short_news=True 也要被捞出来。
跟 translation_loop 一样常驻。 跟 translation_loop 一样常驻。
""" """
logger.info("enrichment_loop started") logger.info("enrichment_loop started")
@@ -543,16 +560,30 @@ async def enrichment_loop() -> None:
while True: while True:
try: try:
async with AsyncSessionLocal() as session: async with AsyncSessionLocal() as session:
# 精准定位待 enrich 的文章:已翻译 + 任一 LLM 状态 ∈ {n/a, pending, failed} # 精准定位待 enrich 的文章:
# - 长新闻:translation_status='ok' + title_zh 非空 + 任一 LLM 状态 != 'ok'
# - 短新闻:is_short_news=True + body_zh_text 非空(入库时已从 body_text 复制)
# + 任一 LLM 状态 != 'ok'
# (不能用 order_by id ASC + 内存过滤:已 enrich 的文章 id 可能更小,会占满 limit, # (不能用 order_by id ASC + 内存过滤:已 enrich 的文章 id 可能更小,会占满 limit,
# 让 enrichment_loop 永远看不到后面大 id 的 n/a 文章 — 真实踩过的坑) # 让 enrichment_loop 永远看不到后面大 id 的 n/a 文章 — 真实踩过的坑)
rows = ( rows = (
await session.execute( await session.execute(
select(Article) select(Article)
.where( .where(
Article.translation_status == "ok", # === "可 enrich" 条件 ===
Article.title_zh.is_not(None), or_(
# 任一 LLM 状态不是 ok(包括 NULL) # 长新闻
and_(
Article.translation_status == "ok",
Article.title_zh.is_not(None),
),
# 短新闻(API Push)
and_(
Article.is_short_news.is_(True),
Article.body_zh_text.is_not(None),
),
),
# === "未 enrich" 条件:任一 LLM 状态不是 ok ===
( (
(Article.classify_status.is_(None)) (Article.classify_status.is_(None))
| (Article.classify_status != "ok") | (Article.classify_status != "ok")