feat(ingest): API Push 短新闻接口层

- POST /api/v1/ingest:鉴权(X-Ingest-Token) + 限速(每 token 2 篇/秒,
  Redis 滑动桶,INGEST_RATE_PER_SEC 可调) + 三层去重(L1 external_id /
  L2 content_hash / L3 DB UNIQUE 兜底,均带 reason)
- 写入字段:is_short_news=True、translation/format/image_ai_status='n/a'、
  classify_status=(有 tags?'ok':'pending')、commentary_{angel,meituan}_status='pending'、
  body_zh_text=body_text(走统一路径,前端/prompt 不用改)
- services/fetchers/api_push.py:compute_content_hash + synthesize_url +
  normalize_published_at + build_initial_status 纯函数
- schemas/ingest.py:IngestPayload(title 1-200/body 1-5000/tags 去重去空) +
  IngestResponse(article_id/content_hash/status/reason/matched_external_id)
- admin.py:POST/GET/DELETE /admin/sources/{id}/ingest-tokens — owner 生成
  (raw_token 仅一次性返回)、列出、撤销
- schemas/article.py:ArticleListItem 加 is_short_news/source_ref;
  ArticleDetail 加 is_short_news/source_ref/external_id
- main.py:挂 ingest router;config.py + .env.example:ingest_rate_per_sec 默认 2

短新闻由 commit 1 enrichment_loop 自动接管 classify + 双 provider commentary,
跳过 format/image。
This commit is contained in:
xiaji
2026-06-14 16:04:45 +08:00
parent 3091f291b2
commit 07534eb144
8 changed files with 606 additions and 2 deletions

269
backend/app/api/ingest.py Normal file
View File

@@ -0,0 +1,269 @@
"""POST /api/v1/ingest — 外部推送短新闻入库。
鉴权:X-Ingest-Token 头(对应 api_tokens.purpose='ingest' 的 sha256 token)
限速:每个 token 2 篇/秒(Redis 滑动窗口,可在 .env 调 INGEST_RATE_PER_SEC)
去重:三层(L1 external_id / L2 content_hash / L3 DB UNIQUE 兜底)
返回:
- 201 新建
- 200 重复(任何一层命中,带 reason)
- 400 字段缺失/超长
- 401 token 无效/吊销/过期
- 413 body 超长(被 pydantic 自动捕获返 400;这里额外兜底)
- 429 触发限速
调用方契约见 docs/api-push.md。
"""
from __future__ import annotations
import logging
import time
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, Header, HTTPException, status
from fastapi.responses import JSONResponse
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.core.security import hash_api_token
from app.database import get_session
from app.models.api_token import TOKEN_PURPOSE_INGEST, ApiToken
from app.models.article import Article
from app.models.source import Source
from app.redis_client import get_redis
from app.schemas.ingest import IngestPayload, IngestResponse
from app.services.fetchers.api_push import (
build_initial_status,
compute_content_hash,
normalize_published_at,
synthesize_url,
)
logger = logging.getLogger("news.ingest")
router = APIRouter(prefix="/ingest", tags=["ingest"])
# === 限速:Redis 滑动窗口(每个 token 独立计数)===
# key = f"ingest:rl:{token_id}:{floor(now)}" # 1 秒桶
# 简单方案:固定窗口(每秒清零)。足够防止刷爆,不会严格滑动。
async def _rate_limit_check(token_id: int, max_per_sec: int) -> bool:
"""返回 True=放行,False=限速命中。"""
r = get_redis()
now = int(time.time())
key = f"ingest:rl:{token_id}:{now}"
pipe = r.pipeline()
pipe.incr(key, 1)
pipe.expire(key, 2) # key 2 秒后过期,自然清零
results = await pipe.execute()
count = results[0]
return int(count) <= max_per_sec
# === 鉴权 + 反查 source ===
async def _resolve_ingest_token(
session: AsyncSession,
raw_token: str,
) -> ApiToken:
"""验证 token + 反查 api_tokens 行,要求 purpose='ingest' + 未撤销 + 未过期。
返回 ApiToken(含 source_id)。
"""
h = hash_api_token(raw_token)
result = await session.execute(
select(ApiToken).where(
ApiToken.token_hash == h,
ApiToken.purpose == TOKEN_PURPOSE_INGEST,
ApiToken.revoked_at.is_(None),
)
)
tok = result.scalar_one_or_none()
if not tok:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "invalid ingest token")
if tok.expires_at and tok.expires_at < datetime.now(timezone.utc):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "ingest token expired")
if tok.source_id is None:
# 数据完整性:purpose=ingest 必须绑定 source_id
logger.error("ingest token id=%s missing source_id", tok.id)
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, "token misconfigured")
return tok
# === 验证 source 状态 ===
async def _resolve_source(session: AsyncSession, source_id: int) -> Source:
src = (
await session.execute(select(Source).where(Source.id == source_id))
).scalar_one_or_none()
if not src:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"source {source_id} not found")
if not src.enabled:
raise HTTPException(status.HTTP_403_FORBIDDEN, f"source {src.slug} disabled")
if src.kind.value != "api_push":
# 防呆:token 绑定的 source 必须是 api_push 类型
raise HTTPException(
status.HTTP_500_INTERNAL_SERVER_ERROR,
f"source {src.slug} is not api_push kind",
)
return src
# === 主路由 ===
@router.post("", response_model=None)
async def ingest(
payload: IngestPayload,
x_ingest_token: str = Header(..., alias="X-Ingest-Token", description="ingest token"),
session: AsyncSession = Depends(get_session),
):
"""接收一条短新闻,落库或返重复。
- 鉴权:X-Ingest-Token → 反查 token → 拿到 source_id
- 限速:每个 token 2 篇/秒(可在 .env 调 INGEST_RATE_PER_SEC)
- 去重:三层(content_hash UNIQUE 是 DB 兜底)
- 入库后:enrichment_loop 自动跑 classify + 双 provider commentary
"""
# 1) 鉴权
tok = await _resolve_ingest_token(session, x_ingest_token)
# 2) 验证 source
src = await _resolve_source(session, tok.source_id)
# 3) 限速
if not await _rate_limit_check(tok.id, settings.ingest_rate_per_sec):
raise HTTPException(
status.HTTP_429_TOO_MANY_REQUESTS,
f"rate limit exceeded ({settings.ingest_rate_per_sec}/s per token)",
headers={"Retry-After": "1"},
)
# 4) 算 content_hash
content_hash = compute_content_hash(
external_id=payload.external_id,
title=payload.title,
body=payload.body,
)
# 5) L1:external_id 精确命中
if payload.external_id:
result = await session.execute(
select(Article.id, Article.content_hash).where(
Article.external_id == payload.external_id,
Article.source_id == src.id,
)
)
row = result.first()
if row:
return JSONResponse(
status_code=status.HTTP_200_OK,
content={
"article_id": row[0],
"content_hash": row[1],
"status": "duplicate",
"reason": "external_id_match",
"matched_external_id": payload.external_id,
},
)
# 6) L2:content_hash 命中(SELECT 走索引,快)
result = await session.execute(
select(Article.id, Article.external_id).where(
Article.content_hash == content_hash,
)
)
row = result.first()
if row:
return JSONResponse(
status_code=status.HTTP_200_OK,
content={
"article_id": row[0],
"content_hash": content_hash,
"status": "duplicate",
"reason": "content_hash_match",
"matched_external_id": row[1],
},
)
# 7) 入库
has_tags = bool(payload.tags)
init_status = build_initial_status(has_tags=has_tags)
article = Article(
source_id=src.id,
is_short_news=True,
external_id=payload.external_id,
source_ref=payload.source_ref,
content_hash=content_hash,
# url / url_hash:合成占位,保证 NOT NULL + UNIQUE 不冲突
url=synthesize_url(src.slug, content_hash),
url_hash=content_hash, # 短新闻无真实 url,用 content_hash 兜底
title=payload.title[:512], # 留个余量,避免 > 512 报错
body_text=payload.body[:65535], # 同步 DB 限制
body_html=None, # API Push 是纯文本
# 把 body 复制到 body_zh_text,让前端/prompt 走统一路径(短新闻无 translation)
body_zh_text=payload.body[:65535],
body_zh_html=None,
title_zh=None,
summary_zh=None,
lang_src="zh",
author=payload.author,
image_url=None,
image_ai_url=None,
published_at=normalize_published_at(payload.published_at),
# === enrich 状态字段 ===
translation_status=init_status["translation_status"],
format_status=init_status["format_status"],
image_ai_status=init_status["image_ai_status"],
classify_status=init_status["classify_status"],
commentary_status=init_status["commentary_status"],
commentary_meituan_status=init_status["commentary_meituan_status"],
# === 内容字段 ===
# 调用方传了 tags → 直接当 category,省一次 LLM 调用
category=",".join(payload.tags)[:64] if payload.tags else None,
)
session.add(article)
try:
await session.commit()
except IntegrityError:
# L3 兜底:并发场景下两个请求同时到,都通过 L2 但 DB UNIQUE 拦下
await session.rollback()
# 重新查一次,拿已有那条
result = await session.execute(
select(Article.id).where(Article.content_hash == content_hash)
)
existing_id = result.scalar_one_or_none()
if existing_id is None:
# 真出问题了,返 500 让调用方重试
logger.exception("ingest integrity error but no existing row found")
raise HTTPException(
status.HTTP_500_INTERNAL_SERVER_ERROR,
"ingest conflict, please retry",
)
logger.info(
"ingest db_unique caught: source=%s external_id=%s article_id=%s",
src.slug, payload.external_id, existing_id,
)
return JSONResponse(
status_code=status.HTTP_200_OK,
content={
"article_id": existing_id,
"content_hash": content_hash,
"status": "duplicate",
"reason": "db_unique",
},
)
# 8) 刷新 token last_used_at(异步,不影响主流程)
tok.last_used_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(article)
logger.info(
"ingest created: source=%s external_id=%s article_id=%s hash=%s...",
src.slug, payload.external_id, article.id, content_hash[:12],
)
return IngestResponse(
article_id=article.id,
content_hash=content_hash,
status="created",
)