From 07534eb14435be0d514b2413b1e7f40dcd0cc4f8 Mon Sep 17 00:00:00 2001 From: xiaji Date: Sun, 14 Jun 2026 16:04:45 +0800 Subject: [PATCH] =?UTF-8?q?feat(ingest):=20API=20Push=20=E7=9F=AD=E6=96=B0?= =?UTF-8?q?=E9=97=BB=E6=8E=A5=E5=8F=A3=E5=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - POST /api/v1/ingest:鉴权(X-Ingest-Token) + 限速(每 token 2 篇/秒, Redis 滑动桶,INGEST_RATE_PER_SEC 可调) + 三层去重(L1 external_id / L2 content_hash / L3 DB UNIQUE 兜底,均带 reason) - 写入字段:is_short_news=True、translation/format/image_ai_status='n/a'、 classify_status=(有 tags?'ok':'pending')、commentary_{angel,meituan}_status='pending'、 body_zh_text=body_text(走统一路径,前端/prompt 不用改) - services/fetchers/api_push.py:compute_content_hash + synthesize_url + normalize_published_at + build_initial_status 纯函数 - schemas/ingest.py:IngestPayload(title 1-200/body 1-5000/tags 去重去空) + IngestResponse(article_id/content_hash/status/reason/matched_external_id) - admin.py:POST/GET/DELETE /admin/sources/{id}/ingest-tokens — owner 生成 (raw_token 仅一次性返回)、列出、撤销 - schemas/article.py:ArticleListItem 加 is_short_news/source_ref; ArticleDetail 加 is_short_news/source_ref/external_id - main.py:挂 ingest router;config.py + .env.example:ingest_rate_per_sec 默认 2 短新闻由 commit 1 enrichment_loop 自动接管 classify + 双 provider commentary, 跳过 format/image。 --- .env.example | 4 + backend/app/api/admin.py | 161 ++++++++++++- backend/app/api/ingest.py | 269 ++++++++++++++++++++++ backend/app/config.py | 5 + backend/app/main.py | 3 +- backend/app/schemas/article.py | 7 + backend/app/schemas/ingest.py | 73 ++++++ backend/app/services/fetchers/api_push.py | 86 +++++++ 8 files changed, 606 insertions(+), 2 deletions(-) create mode 100644 backend/app/api/ingest.py create mode 100644 backend/app/schemas/ingest.py create mode 100644 backend/app/services/fetchers/api_push.py diff --git a/.env.example b/.env.example index b2b621f..dfae5f3 100644 --- a/.env.example +++ b/.env.example @@ -80,6 +80,10 @@ FETCH_FAIL_PAUSE_THRESHOLD=3 # 单源 fetch 最大重试次数 FETCH_MAX_RETRIES=2 +# ===== API Push 短新闻 ingest 限速 ===== +# 每个 ingest token 每秒最多推几篇(滑动窗口)。2 = 一秒最多 2 篇 +INGEST_RATE_PER_SEC=2 + # ===== Caddy / 域名 ===== # 留空走 IP 自签证书;有域名走自动 HTTPS DOMAIN= diff --git a/backend/app/api/admin.py b/backend/app/api/admin.py index 635121b..a5966dd 100644 --- a/backend/app/api/admin.py +++ b/backend/app/api/admin.py @@ -4,20 +4,24 @@ - 手动触发抓取 / 重译 - 源健康看板 - 翻译配额管理 +- API Push ingest token 管理(生成/列/撤销) """ from __future__ import annotations +import logging from datetime import datetime, timedelta, timezone from typing import Any from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status -from pydantic import BaseModel +from pydantic import BaseModel, Field from sqlalchemy import func, select from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from app.core.deps import require_owner +from app.core.security import generate_api_token from app.database import get_session +from app.models.api_token import TOKEN_PURPOSE_INGEST, TOKEN_PURPOSE_MOBILE, ApiToken from app.models.article import Article from app.models.source import Source from app.models.user import User @@ -25,6 +29,8 @@ from app.schemas.source import SourceIn, SourceOut, SourceUpdate router = APIRouter(prefix="/admin", tags=["admin"], dependencies=[Depends(require_owner)]) +logger = logging.getLogger("news.admin") + # === Source CRUD === @router.get("/sources", response_model=list[SourceOut]) @@ -260,3 +266,156 @@ async def delete_active_ip(ip: str): ok = await remove_active_ip(ip) return KickIpResponse(kicked=ok, ip=ip) + + +# === API Push ingest token 管理 === +# 给 owner 在 /admin/sources 详情页用:为某个 api_push source 颁发 ingest token, +# 或撤销已有 token。raw token 只在生成时一次性返回。 +class IngestTokenCreate(BaseModel): + """POST /admin/sources/{id}/ingest-tokens 请求体。""" + + name: str = Field(default="default", min_length=1, max_length=64) + expires_days: int | None = Field(default=None, ge=1, le=3650) + + +class IngestTokenOut(BaseModel): + """生成/列出时的响应(列出时不返 raw_token)。""" + + id: int + source_id: int + name: str + purpose: str + created_at: datetime + expires_at: datetime | None = None + revoked_at: datetime | None = None + last_used_at: datetime | None = None + # 仅生成时填充 + raw_token: str | None = None + + class Config: + from_attributes = True + + +@router.post( + "/sources/{source_id}/ingest-tokens", + response_model=IngestTokenOut, + status_code=status.HTTP_201_CREATED, +) +async def create_ingest_token( + source_id: int, + body: IngestTokenCreate, + session: AsyncSession = Depends(get_session), + user: User = Depends(require_owner), +): + """为某个 api_push source 颁发一个 ingest token。 + + - raw_token 仅本次返回(后续只能列/撤销) + - token 绑定 source_id(只能推送到这个 source) + - 限速:由 .env INGEST_RATE_PER_SEC 全局生效 + """ + # 验证 source 存在且是 api_push 类型 + src = ( + await session.execute(select(Source).where(Source.id == source_id)) + ).scalar_one_or_none() + if not src: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Source not found") + if src.kind.value != "api_push": + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"source kind={src.kind.value} cannot have ingest token; need api_push", + ) + + raw, hashed = generate_api_token() + expires = ( + datetime.now(timezone.utc) + timedelta(days=body.expires_days) + if body.expires_days + else None + ) + tok = ApiToken( + user_id=user.id, + name=body.name, + token_hash=hashed, + purpose=TOKEN_PURPOSE_INGEST, + source_id=src.id, + expires_at=expires, + ) + session.add(tok) + await session.commit() + await session.refresh(tok) + return IngestTokenOut( + id=tok.id, + source_id=tok.source_id, + name=tok.name, + purpose=tok.purpose, + created_at=tok.created_at, + expires_at=tok.expires_at, + revoked_at=tok.revoked_at, + last_used_at=tok.last_used_at, + raw_token=raw, # 只此一次! + ) + + +@router.get( + "/sources/{source_id}/ingest-tokens", + response_model=list[IngestTokenOut], +) +async def list_ingest_tokens( + source_id: int, + session: AsyncSession = Depends(get_session), +): + """列出某个 source 的所有 ingest token(含已撤销)。""" + rows = ( + await session.execute( + select(ApiToken) + .where( + ApiToken.source_id == source_id, + ApiToken.purpose == TOKEN_PURPOSE_INGEST, + ) + .order_by(ApiToken.id.desc()) + ) + ).scalars() + return [ + IngestTokenOut( + id=t.id, + source_id=t.source_id, + name=t.name, + purpose=t.purpose, + created_at=t.created_at, + expires_at=t.expires_at, + revoked_at=t.revoked_at, + last_used_at=t.last_used_at, + raw_token=None, # 列出时不返 raw + ) + for t in rows + ] + + +@router.delete( + "/ingest-tokens/{token_id}", + response_model=dict, + status_code=status.HTTP_200_OK, +) +async def revoke_ingest_token( + token_id: int, + session: AsyncSession = Depends(get_session), +): + """撤销某个 ingest token。撤销后立即生效(下次推送返 401)。""" + tok = ( + await session.execute( + select(ApiToken).where( + ApiToken.id == token_id, + ApiToken.purpose == TOKEN_PURPOSE_INGEST, + ) + ) + ).scalar_one_or_none() + if not tok: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Ingest token not found") + if tok.revoked_at: + return {"id": tok.id, "revoked_at": tok.revoked_at, "already_revoked": True} + tok.revoked_at = datetime.now(timezone.utc) + await session.commit() + logger.info( + "ingest token revoked: id=%s source_id=%s user_id=%s", + tok.id, tok.source_id, tok.user_id, + ) + return {"id": tok.id, "revoked_at": tok.revoked_at, "already_revoked": False} diff --git a/backend/app/api/ingest.py b/backend/app/api/ingest.py new file mode 100644 index 0000000..d975a8f --- /dev/null +++ b/backend/app/api/ingest.py @@ -0,0 +1,269 @@ +"""POST /api/v1/ingest — 外部推送短新闻入库。 + +鉴权:X-Ingest-Token 头(对应 api_tokens.purpose='ingest' 的 sha256 token) +限速:每个 token 2 篇/秒(Redis 滑动窗口,可在 .env 调 INGEST_RATE_PER_SEC) +去重:三层(L1 external_id / L2 content_hash / L3 DB UNIQUE 兜底) + +返回: +- 201 新建 +- 200 重复(任何一层命中,带 reason) +- 400 字段缺失/超长 +- 401 token 无效/吊销/过期 +- 413 body 超长(被 pydantic 自动捕获返 400;这里额外兜底) +- 429 触发限速 + +调用方契约见 docs/api-push.md。 +""" +from __future__ import annotations + +import logging +import time +from datetime import datetime, timezone + +from fastapi import APIRouter, Depends, Header, HTTPException, status +from fastapi.responses import JSONResponse +from sqlalchemy import select +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import settings +from app.core.security import hash_api_token +from app.database import get_session +from app.models.api_token import TOKEN_PURPOSE_INGEST, ApiToken +from app.models.article import Article +from app.models.source import Source +from app.redis_client import get_redis +from app.schemas.ingest import IngestPayload, IngestResponse +from app.services.fetchers.api_push import ( + build_initial_status, + compute_content_hash, + normalize_published_at, + synthesize_url, +) + +logger = logging.getLogger("news.ingest") + +router = APIRouter(prefix="/ingest", tags=["ingest"]) + + +# === 限速:Redis 滑动窗口(每个 token 独立计数)=== +# key = f"ingest:rl:{token_id}:{floor(now)}" # 1 秒桶 +# 简单方案:固定窗口(每秒清零)。足够防止刷爆,不会严格滑动。 +async def _rate_limit_check(token_id: int, max_per_sec: int) -> bool: + """返回 True=放行,False=限速命中。""" + r = get_redis() + now = int(time.time()) + key = f"ingest:rl:{token_id}:{now}" + pipe = r.pipeline() + pipe.incr(key, 1) + pipe.expire(key, 2) # key 2 秒后过期,自然清零 + results = await pipe.execute() + count = results[0] + return int(count) <= max_per_sec + + +# === 鉴权 + 反查 source === +async def _resolve_ingest_token( + session: AsyncSession, + raw_token: str, +) -> ApiToken: + """验证 token + 反查 api_tokens 行,要求 purpose='ingest' + 未撤销 + 未过期。 + + 返回 ApiToken(含 source_id)。 + """ + h = hash_api_token(raw_token) + result = await session.execute( + select(ApiToken).where( + ApiToken.token_hash == h, + ApiToken.purpose == TOKEN_PURPOSE_INGEST, + ApiToken.revoked_at.is_(None), + ) + ) + tok = result.scalar_one_or_none() + if not tok: + raise HTTPException(status.HTTP_401_UNAUTHORIZED, "invalid ingest token") + if tok.expires_at and tok.expires_at < datetime.now(timezone.utc): + raise HTTPException(status.HTTP_401_UNAUTHORIZED, "ingest token expired") + if tok.source_id is None: + # 数据完整性:purpose=ingest 必须绑定 source_id + logger.error("ingest token id=%s missing source_id", tok.id) + raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, "token misconfigured") + return tok + + +# === 验证 source 状态 === +async def _resolve_source(session: AsyncSession, source_id: int) -> Source: + src = ( + await session.execute(select(Source).where(Source.id == source_id)) + ).scalar_one_or_none() + if not src: + raise HTTPException(status.HTTP_404_NOT_FOUND, f"source {source_id} not found") + if not src.enabled: + raise HTTPException(status.HTTP_403_FORBIDDEN, f"source {src.slug} disabled") + if src.kind.value != "api_push": + # 防呆:token 绑定的 source 必须是 api_push 类型 + raise HTTPException( + status.HTTP_500_INTERNAL_SERVER_ERROR, + f"source {src.slug} is not api_push kind", + ) + return src + + +# === 主路由 === +@router.post("", response_model=None) +async def ingest( + payload: IngestPayload, + x_ingest_token: str = Header(..., alias="X-Ingest-Token", description="ingest token"), + session: AsyncSession = Depends(get_session), +): + """接收一条短新闻,落库或返重复。 + + - 鉴权:X-Ingest-Token → 反查 token → 拿到 source_id + - 限速:每个 token 2 篇/秒(可在 .env 调 INGEST_RATE_PER_SEC) + - 去重:三层(content_hash UNIQUE 是 DB 兜底) + - 入库后:enrichment_loop 自动跑 classify + 双 provider commentary + """ + # 1) 鉴权 + tok = await _resolve_ingest_token(session, x_ingest_token) + + # 2) 验证 source + src = await _resolve_source(session, tok.source_id) + + # 3) 限速 + if not await _rate_limit_check(tok.id, settings.ingest_rate_per_sec): + raise HTTPException( + status.HTTP_429_TOO_MANY_REQUESTS, + f"rate limit exceeded ({settings.ingest_rate_per_sec}/s per token)", + headers={"Retry-After": "1"}, + ) + + # 4) 算 content_hash + content_hash = compute_content_hash( + external_id=payload.external_id, + title=payload.title, + body=payload.body, + ) + + # 5) L1:external_id 精确命中 + if payload.external_id: + result = await session.execute( + select(Article.id, Article.content_hash).where( + Article.external_id == payload.external_id, + Article.source_id == src.id, + ) + ) + row = result.first() + if row: + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "article_id": row[0], + "content_hash": row[1], + "status": "duplicate", + "reason": "external_id_match", + "matched_external_id": payload.external_id, + }, + ) + + # 6) L2:content_hash 命中(SELECT 走索引,快) + result = await session.execute( + select(Article.id, Article.external_id).where( + Article.content_hash == content_hash, + ) + ) + row = result.first() + if row: + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "article_id": row[0], + "content_hash": content_hash, + "status": "duplicate", + "reason": "content_hash_match", + "matched_external_id": row[1], + }, + ) + + # 7) 入库 + has_tags = bool(payload.tags) + init_status = build_initial_status(has_tags=has_tags) + + article = Article( + source_id=src.id, + is_short_news=True, + external_id=payload.external_id, + source_ref=payload.source_ref, + content_hash=content_hash, + # url / url_hash:合成占位,保证 NOT NULL + UNIQUE 不冲突 + url=synthesize_url(src.slug, content_hash), + url_hash=content_hash, # 短新闻无真实 url,用 content_hash 兜底 + title=payload.title[:512], # 留个余量,避免 > 512 报错 + body_text=payload.body[:65535], # 同步 DB 限制 + body_html=None, # API Push 是纯文本 + # 把 body 复制到 body_zh_text,让前端/prompt 走统一路径(短新闻无 translation) + body_zh_text=payload.body[:65535], + body_zh_html=None, + title_zh=None, + summary_zh=None, + lang_src="zh", + author=payload.author, + image_url=None, + image_ai_url=None, + published_at=normalize_published_at(payload.published_at), + # === enrich 状态字段 === + translation_status=init_status["translation_status"], + format_status=init_status["format_status"], + image_ai_status=init_status["image_ai_status"], + classify_status=init_status["classify_status"], + commentary_status=init_status["commentary_status"], + commentary_meituan_status=init_status["commentary_meituan_status"], + # === 内容字段 === + # 调用方传了 tags → 直接当 category,省一次 LLM 调用 + category=",".join(payload.tags)[:64] if payload.tags else None, + ) + session.add(article) + try: + await session.commit() + except IntegrityError: + # L3 兜底:并发场景下两个请求同时到,都通过 L2 但 DB UNIQUE 拦下 + await session.rollback() + # 重新查一次,拿已有那条 + result = await session.execute( + select(Article.id).where(Article.content_hash == content_hash) + ) + existing_id = result.scalar_one_or_none() + if existing_id is None: + # 真出问题了,返 500 让调用方重试 + logger.exception("ingest integrity error but no existing row found") + raise HTTPException( + status.HTTP_500_INTERNAL_SERVER_ERROR, + "ingest conflict, please retry", + ) + logger.info( + "ingest db_unique caught: source=%s external_id=%s article_id=%s", + src.slug, payload.external_id, existing_id, + ) + return JSONResponse( + status_code=status.HTTP_200_OK, + content={ + "article_id": existing_id, + "content_hash": content_hash, + "status": "duplicate", + "reason": "db_unique", + }, + ) + + # 8) 刷新 token last_used_at(异步,不影响主流程) + tok.last_used_at = datetime.now(timezone.utc) + await session.commit() + + await session.refresh(article) + logger.info( + "ingest created: source=%s external_id=%s article_id=%s hash=%s...", + src.slug, payload.external_id, article.id, content_hash[:12], + ) + return IngestResponse( + article_id=article.id, + content_hash=content_hash, + status="created", + ) \ No newline at end of file diff --git a/backend/app/config.py b/backend/app/config.py index 894497b..4bff1d4 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -120,6 +120,11 @@ class Settings(BaseSettings): fetch_fail_pause_threshold: int = 3 fetch_max_retries: int = 2 + # ===== API Push 短新闻 ingest 限速 ===== + # 每个 ingest token 的滑动窗口限速(篇/秒)。2 = 短新闻一秒最多推 2 篇 + # 单 token — 防止单点滥用。改小需重启 api。 + ingest_rate_per_sec: int = 2 + # ===== 站点并发登录 IP 限制 ===== # 限制同时在线的客户端 IP 数(防滥用 + 防 token 泄漏被滥用) # Redis ZSET 滑动窗口:每次已认证请求刷新 score,30 天没活动自动剔除 diff --git a/backend/app/main.py b/backend/app/main.py index 42ddd23..dba40d8 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -16,7 +16,7 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from starlette.exceptions import HTTPException as StarletteHTTPException -from app.api import admin, admin_llm, articles, auth, bookmarks, me, sources, subscriptions +from app.api import admin, admin_llm, articles, auth, bookmarks, ingest, me, sources, subscriptions from app.config import settings from app.database import engine from app.redis_client import close_redis, get_redis @@ -99,6 +99,7 @@ app.include_router(articles.router, prefix=API_PREFIX) app.include_router(sources.router, prefix=API_PREFIX) app.include_router(bookmarks.router, prefix=API_PREFIX) app.include_router(subscriptions.router, prefix=API_PREFIX) +app.include_router(ingest.router, prefix=API_PREFIX) app.include_router(admin.router, prefix=API_PREFIX) app.include_router(admin_llm.router, prefix=API_PREFIX) diff --git a/backend/app/schemas/article.py b/backend/app/schemas/article.py index 19bbaae..9f368d4 100644 --- a/backend/app/schemas/article.py +++ b/backend/app/schemas/article.py @@ -41,6 +41,9 @@ class ArticleListItem(BaseModel): commentary_meituan_status: str | None = None commentary_engine: str | None = None # angel / meituan / "angel,meituan" image_ai_url: str | None = None # AI 插图(列表里缩略图) + # === API Push 短新闻标识 === + is_short_news: bool = False + source_ref: str | None = None is_starred: bool = False is_read: bool = False @@ -84,6 +87,10 @@ class ArticleDetail(BaseModel): duplicate_of: int | None = None published_at: datetime | None = None fetched_at: datetime + # === API Push 短新闻标识 === + is_short_news: bool = False + source_ref: str | None = None + external_id: str | None = None # 调用方幂等 key is_starred: bool = False is_read: bool = False diff --git a/backend/app/schemas/ingest.py b/backend/app/schemas/ingest.py new file mode 100644 index 0000000..2230b4d --- /dev/null +++ b/backend/app/schemas/ingest.py @@ -0,0 +1,73 @@ +"""API Push 短新闻 ingest 接口的 I/O schema。""" +from __future__ import annotations + +from datetime import datetime +from typing import Literal + +from pydantic import BaseModel, Field, field_validator + + +# 长度上限(commit 1 规划) +TITLE_MAX = 200 +BODY_MAX = 5000 +EXTERNAL_ID_MAX = 128 +SOURCE_REF_MAX = 64 +AUTHOR_MAX = 255 +TAGS_MAX = 10 # 最多 10 个标签 +TAG_MAX_LEN = 32 # 单个标签最长 + + +class IngestPayload(BaseModel): + """POST /api/v1/ingest 请求体。 + + 必填:title + body + 推荐:external_id(幂等 key) + 可选:url / source_ref / author / published_at / tags + """ + + external_id: str | None = Field(default=None, max_length=EXTERNAL_ID_MAX) + title: str = Field(min_length=1, max_length=TITLE_MAX) + body: str = Field(min_length=1, max_length=BODY_MAX) + url: str | None = Field(default=None, max_length=2048) + source_ref: str | None = Field(default=None, max_length=SOURCE_REF_MAX) + author: str | None = Field(default=None, max_length=AUTHOR_MAX) + published_at: datetime | None = None + tags: list[str] | None = Field(default=None, max_length=TAGS_MAX) + + @field_validator("title", "body") + @classmethod + def _strip_and_check(cls, v: str) -> str: + v = v.strip() + if not v: + raise ValueError("must not be empty after strip") + return v + + @field_validator("tags") + @classmethod + def _clean_tags(cls, v: list[str] | None) -> list[str] | None: + if v is None: + return None + out: list[str] = [] + seen: set[str] = set() + for t in v: + t = t.strip() + if not t: + continue + if len(t) > TAG_MAX_LEN: + raise ValueError(f"tag too long: {t[:16]}...") + if t in seen: + continue + seen.add(t) + out.append(t) + return out or None + + +class IngestResponse(BaseModel): + """POST /api/v1/ingest 响应。""" + + article_id: int + content_hash: str + status: Literal["created", "duplicate"] + # duplicate 时,告知是 L1 / L2 / L3 哪一层命中 + reason: str | None = None # 仅 duplicate 时填充 + matched_external_id: str | None = None # 仅 L1 命中时填充 \ No newline at end of file diff --git a/backend/app/services/fetchers/api_push.py b/backend/app/services/fetchers/api_push.py new file mode 100644 index 0000000..b60b804 --- /dev/null +++ b/backend/app/services/fetchers/api_push.py @@ -0,0 +1,86 @@ +"""API Push 短新闻 — normalize 工具。 + +不走 fetcher 抽象(那是"周期拉取"语义),API Push 是"被动接收"。 +提供两个纯函数,供 ingest 路由调用: + +- compute_content_hash(external_id, title, body) -> str +- normalize_payload(payload: dict) -> dict(供入库时使用) + +设计要点: +- external_id 存在时,作为主幂等 key(L1) +- external_id 缺失时,title+body[:500] 作为兜底指纹(L2) +- url 可选;缺失时合成 api-push://{source_slug}/{hash[:16]} 占位 +- 字段长度校验集中在路由里(返回 400),这里只做归一化 +""" +from __future__ import annotations + +import hashlib +from datetime import datetime, timezone +from typing import Any + + +def compute_content_hash( + *, + external_id: str | None, + title: str, + body: str, +) -> str: + """三层去重核心 key。 + + - external_id 存在:`sha1("ext:" + external_id)` —— 调用方幂等保证,最强 + - external_id 缺失:`sha1(title.strip() + "|" + body[:500])` —— 兜底,防尾部噪声 + + 注:body 取原始字符串的前 500 字符,不做 strip。 + 因为不同长度的 body(200字 vs 2000字)前 500 字符一定相等,这是设计意图 — + 仅靠"前 N 字符"判断重复,避免被尾部噪声(URL尾巴/HTML 注释)误判。 + """ + if external_id: + raw = f"ext:{external_id.strip()}" + else: + raw = f"{title.strip()}|{body[:500]}" + return hashlib.sha1(raw.encode("utf-8")).hexdigest() + + +def synthesize_url(source_slug: str, content_hash: str) -> str: + """短新闻 url 占位(articles.url NOT NULL,需要合成)。""" + return f"api-push://{source_slug}/{content_hash[:16]}" + + +def normalize_published_at(value: Any) -> datetime: + """published_at 兜底:无值 → now(UTC)。""" + if value is None: + return datetime.now(timezone.utc) + if isinstance(value, datetime): + # 入参可能是 naive datetime(没带 tz),按 UTC 兜底 + if value.tzinfo is None: + return value.replace(tzinfo=timezone.utc) + return value + if isinstance(value, str): + # ISO8601 解析;失败的让 pydantic 在路由层报 400 + try: + # fromisoformat 在 3.11+ 支持 'Z' 后缀;3.12 没问题 + dt = datetime.fromisoformat(value.replace("Z", "+00:00")) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + except ValueError: + # 解析失败兜底为 now;路由层校验会先于 normalize 跑,pydantic 应该已经报 400 了 + return datetime.now(timezone.utc) + return datetime.now(timezone.utc) + + +def build_initial_status(*, has_tags: bool) -> dict[str, str]: + """返回 enrich 状态字段的初始值。 + + - has_tags=True → classify_status='ok'(直接用 tags 当分类,不浪费 LLM 调用) + - has_tags=False → classify_status='pending'(enrichment_loop 会跑 classify) + - 其他:*_status='n/a' 或 'pending',具体见 commit 1 enrichment_article 的跳过逻辑 + """ + return { + "translation_status": "n/a", # 跳过翻译(中文原生) + "format_status": "n/a", # 跳过排版(短文不需要) + "image_ai_status": "n/a", # 跳过插图(用户明确不要) + "classify_status": "ok" if has_tags else "pending", + "commentary_status": "pending", # 双 provider 评论都跑 + "commentary_meituan_status": "pending", + } \ No newline at end of file