"""POST /api/v1/ingest — 外部推送短新闻入库。 鉴权:X-Ingest-Token 头(对应 api_tokens.purpose='ingest' 的 sha256 token) 限速:每个 token 2 篇/秒(Redis 滑动窗口,可在 .env 调 INGEST_RATE_PER_SEC) 去重:三层(L1 external_id / L2 content_hash / L3 DB UNIQUE 兜底) 返回: - 201 新建 - 200 重复(任何一层命中,带 reason) - 400 字段缺失/超长 - 401 token 无效/吊销/过期 - 413 body 超长(被 pydantic 自动捕获返 400;这里额外兜底) - 429 触发限速 调用方契约见 docs/api-push.md。 """ from __future__ import annotations import logging import time from datetime import datetime, timezone from fastapi import APIRouter, Depends, Header, HTTPException, status from fastapi.responses import JSONResponse from sqlalchemy import select from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.core.security import hash_api_token from app.database import get_session from app.models.api_token import TOKEN_PURPOSE_INGEST, ApiToken from app.models.article import Article from app.models.source import Source from app.redis_client import get_redis from app.schemas.ingest import IngestPayload, IngestResponse from app.services.fetchers.api_push import ( build_initial_status, compute_content_hash, normalize_published_at, synthesize_url, ) logger = logging.getLogger("news.ingest") router = APIRouter(prefix="/ingest", tags=["ingest"]) # === 限速:Redis 滑动窗口(每个 token 独立计数)=== # key = f"ingest:rl:{token_id}:{floor(now)}" # 1 秒桶 # 简单方案:固定窗口(每秒清零)。足够防止刷爆,不会严格滑动。 async def _rate_limit_check(token_id: int, max_per_sec: int) -> bool: """返回 True=放行,False=限速命中。""" r = get_redis() now = int(time.time()) key = f"ingest:rl:{token_id}:{now}" pipe = r.pipeline() pipe.incr(key, 1) pipe.expire(key, 2) # key 2 秒后过期,自然清零 results = await pipe.execute() count = results[0] return int(count) <= max_per_sec # === 鉴权 + 反查 source === async def _resolve_ingest_token( session: AsyncSession, raw_token: str, ) -> ApiToken: """验证 token + 反查 api_tokens 行,要求 purpose='ingest' + 未撤销 + 未过期。 返回 ApiToken(含 source_id)。 """ h = hash_api_token(raw_token) result = await session.execute( select(ApiToken).where( ApiToken.token_hash == h, ApiToken.purpose == TOKEN_PURPOSE_INGEST, ApiToken.revoked_at.is_(None), ) ) tok = result.scalar_one_or_none() if not tok: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "invalid ingest token") if tok.expires_at and tok.expires_at < datetime.now(timezone.utc): raise HTTPException(status.HTTP_401_UNAUTHORIZED, "ingest token expired") if tok.source_id is None: # 数据完整性:purpose=ingest 必须绑定 source_id logger.error("ingest token id=%s missing source_id", tok.id) raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, "token misconfigured") return tok # === 验证 source 状态 === async def _resolve_source(session: AsyncSession, source_id: int) -> Source: src = ( await session.execute(select(Source).where(Source.id == source_id)) ).scalar_one_or_none() if not src: raise HTTPException(status.HTTP_404_NOT_FOUND, f"source {source_id} not found") if not src.enabled: raise HTTPException(status.HTTP_403_FORBIDDEN, f"source {src.slug} disabled") if src.kind.value != "api_push": # 防呆:token 绑定的 source 必须是 api_push 类型 raise HTTPException( status.HTTP_500_INTERNAL_SERVER_ERROR, f"source {src.slug} is not api_push kind", ) return src # === 主路由 === @router.post("", response_model=None) async def ingest( payload: IngestPayload, x_ingest_token: str = Header(..., alias="X-Ingest-Token", description="ingest token"), session: AsyncSession = Depends(get_session), ): """接收一条短新闻,落库或返重复。 - 鉴权:X-Ingest-Token → 反查 token → 拿到 source_id - 限速:每个 token 2 篇/秒(可在 .env 调 INGEST_RATE_PER_SEC) - 去重:三层(content_hash UNIQUE 是 DB 兜底) - 入库后:enrichment_loop 自动跑 classify + 双 provider commentary """ # 1) 鉴权 tok = await _resolve_ingest_token(session, x_ingest_token) # 2) 验证 source src = await _resolve_source(session, tok.source_id) # 3) 限速 if not await _rate_limit_check(tok.id, settings.ingest_rate_per_sec): raise HTTPException( status.HTTP_429_TOO_MANY_REQUESTS, f"rate limit exceeded ({settings.ingest_rate_per_sec}/s per token)", headers={"Retry-After": "1"}, ) # 4) 算 content_hash content_hash = compute_content_hash( external_id=payload.external_id, title=payload.title, body=payload.body, ) # 5) L1:external_id 精确命中 if payload.external_id: result = await session.execute( select(Article.id, Article.content_hash).where( Article.external_id == payload.external_id, Article.source_id == src.id, ) ) row = result.first() if row: return JSONResponse( status_code=status.HTTP_200_OK, content={ "article_id": row[0], "content_hash": row[1], "status": "duplicate", "reason": "external_id_match", "matched_external_id": payload.external_id, }, ) # 6) L2:content_hash 命中(SELECT 走索引,快) result = await session.execute( select(Article.id, Article.external_id).where( Article.content_hash == content_hash, ) ) row = result.first() if row: return JSONResponse( status_code=status.HTTP_200_OK, content={ "article_id": row[0], "content_hash": content_hash, "status": "duplicate", "reason": "content_hash_match", "matched_external_id": row[1], }, ) # 7) 入库 has_tags = bool(payload.tags) init_status = build_initial_status(has_tags=has_tags) article = Article( source_id=src.id, is_short_news=True, external_id=payload.external_id, source_ref=payload.source_ref, content_hash=content_hash, # url / url_hash:合成占位,保证 NOT NULL + UNIQUE 不冲突 url=synthesize_url(src.slug, content_hash), url_hash=content_hash, # 短新闻无真实 url,用 content_hash 兜底 title=payload.title[:512], # 留个余量,避免 > 512 报错 body_text=payload.body[:65535], # 同步 DB 限制 body_html=None, # API Push 是纯文本 # 把 body 复制到 body_zh_text,让前端/prompt 走统一路径(短新闻无 translation) body_zh_text=payload.body[:65535], body_zh_html=None, title_zh=None, summary_zh=None, lang_src="zh", author=payload.author, image_url=None, image_ai_url=None, published_at=normalize_published_at(payload.published_at), # === enrich 状态字段 === translation_status=init_status["translation_status"], format_status=init_status["format_status"], image_ai_status=init_status["image_ai_status"], classify_status=init_status["classify_status"], commentary_status=init_status["commentary_status"], commentary_meituan_status=init_status["commentary_meituan_status"], # === 内容字段 === # 调用方传了 tags → 直接当 category,省一次 LLM 调用 category=",".join(payload.tags)[:64] if payload.tags else None, ) session.add(article) try: await session.commit() except IntegrityError: # L3 兜底:并发场景下两个请求同时到,都通过 L2 但 DB UNIQUE 拦下 await session.rollback() # 重新查一次,拿已有那条 result = await session.execute( select(Article.id).where(Article.content_hash == content_hash) ) existing_id = result.scalar_one_or_none() if existing_id is None: # 真出问题了,返 500 让调用方重试 logger.exception("ingest integrity error but no existing row found") raise HTTPException( status.HTTP_500_INTERNAL_SERVER_ERROR, "ingest conflict, please retry", ) logger.info( "ingest db_unique caught: source=%s external_id=%s article_id=%s", src.slug, payload.external_id, existing_id, ) return JSONResponse( status_code=status.HTTP_200_OK, content={ "article_id": existing_id, "content_hash": content_hash, "status": "duplicate", "reason": "db_unique", }, ) # 8) 刷新 token last_used_at(异步,不影响主流程) tok.last_used_at = datetime.now(timezone.utc) await session.commit() await session.refresh(article) logger.info( "ingest created: source=%s external_id=%s article_id=%s hash=%s...", src.slug, payload.external_id, article.id, content_hash[:12], ) return IngestResponse( article_id=article.id, content_hash=content_hash, status="created", )