From e2742460560f6ab3ea1bc5c7f4db96952be61282 Mon Sep 17 00:00:00 2001 From: xiaji Date: Sun, 14 Jun 2026 16:15:21 +0800 Subject: [PATCH] =?UTF-8?q?feat(ingest):=20API=20Push=20=E5=89=8D=E7=AB=AF?= =?UTF-8?q?=E5=B1=82=20+=20=E6=96=87=E6=A1=A3=20+=20=E7=AB=AF=E5=88=B0?= =?UTF-8?q?=E7=AB=AF=E8=81=94=E9=80=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 后端(支持 api_push source 创建/调度): - schemas/source.py:SourceIn.url 改成 str(允许 api_push 的 api-push:// 占位) - admin.py create_source 简化 url 传递 - workers/__main__.py:_rebuild_jobs 跳过 api_push 源(它是被动接收,不抓取) - workers/pipeline.py:run_once 也加同条件,api_push 不进抓取循环 前端: - api/articles.ts:ArticleListItem 加 is_short_news(required)/source_ref; ArticleDetail 加 external_id;导出 IngestTokenOut;adminApi 加 list/create/revoke ingest token 三个方法 - views/Feed.vue:卡片根 class 短新闻加 short-card(淡蓝底 #f6f9fc + 左侧 3px 蓝色色条 #4f9eff);元信息栏加 📰 短讯 角标;长新闻摘要 body_zh_text 截前 200 字,短新闻不截取保留换行(white-space: pre-wrap); 短新闻不显示 AI 插图 - views/ArticleDetail.vue:tag 行加 📰 短讯 + source_ref 角标;短新闻 路径下隐藏翻译状态/重译/原文链接按钮;正文区短新闻直接渲染 body_zh_text,跳过译文/原文/AI 配图卡片;Angel + 美团双评论卡片 都保留 - views/AdminSources.vue:kind 加 api_push 选项;api_push 源 URL 字段 变只读占位、隐藏抓取间隔;列表操作列加 🔑 Token 按钮; 弹窗支持生成(raw_token 一次性显示 + 复制)/列表/撤销 文档: - docs/api-push.md:调用方契约 + 三层去重 + 限速 + lifecycle + owner 操作手册 + curl/Python 示例 + 重试策略 + 故障排查 - README.md:关键特性加 API Push;API 概览加 /api/v1/ingest 和 3 个 /admin/.../ingest-tokens 端点 --- README.md | 16 +- backend/app/api/admin.py | 2 +- backend/app/schemas/source.py | 15 +- backend/app/workers/__main__.py | 10 +- backend/app/workers/pipeline.py | 8 +- docs/api-push.md | 246 +++++++++++++++++++++++ frontend/src/api/articles.ts | 31 +++ frontend/src/views/AdminSources.vue | 284 ++++++++++++++++++++++++--- frontend/src/views/ArticleDetail.vue | 45 ++++- frontend/src/views/Feed.vue | 71 ++++++- 10 files changed, 677 insertions(+), 51 deletions(-) create mode 100644 docs/api-push.md diff --git a/README.md b/README.md index 9d9ce25..b459020 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,10 @@ - 🌐 **智能翻译**:腾讯云 TMT(月 500 万字符配额)→ 本地 NLLB-200 降级,30 天 Redis 缓存避免重复 - 🤖 **LLM 智能增强** *(新)*:翻译完成后自动跑 4 项 LLM 任务 — 排版 / 分类 / 插图 / 点评 - 🎨 **AI 配图**:文生图模型自动为每篇文章生成插图(走 Agnes 平台,带限速) -- 👤 **双角色鉴权**:JWT(access 60min + refresh 14d) + API Token(sha256,可撤销,给 Android 预留) +- 📥 **API Push 短新闻** *(新)*:`POST /api/v1/ingest` 接收外部中文短新闻推送, + 三层去重(L1 external_id / L2 content_hash / L3 DB UNIQUE)+ 每 token 2 篇/秒限速; + 短新闻入库后跳过翻译/排版/插图,只跑分类 + 双 provider 点评 +- 👤 **双角色鉴权**:JWT(access 60min + refresh 14d) + API Token(sha256,可撤销,给 Android / ingest 预留) - 📌 **收藏 + 关键词订阅**:用户级书签,服务端定时按关键词命中推送(预留 Telegram 通道) - 📊 **管理看板**:源健康度 / 翻译配额 / LLM 状态,全部可视化 - 🔄 **热加载**:源/提示词改了不用重启,worker 每天 00:30 重建 job @@ -616,9 +619,20 @@ WHERE translation_status='ok'; - `GET /bookmarks` / `POST /bookmarks` / `DELETE /bookmarks/{id}` - `GET /subscriptions` / `POST /subscriptions` / `DELETE /subscriptions/{id}` +### API Push 短新闻(无鉴权,凭 X-Ingest-Token) + +- `POST /api/v1/ingest` — 外部推送短新闻入库(中文原生,跳过翻译/排版/插图,跑分类 + 双 provider 点评) +- 鉴权:`X-Ingest-Token` 头对应 `api_tokens.purpose='ingest'` 的 sha256 token +- 限速:每 token 2 篇/秒(`INGEST_RATE_PER_SEC` 可调) +- 去重:三层(L1 external_id / L2 content_hash / L3 DB UNIQUE) +- 完整契约见 [`docs/api-push.md`](./docs/api-push.md) + ### Owner only(`/admin/*`) - `GET /admin/sources` / `POST` / `PATCH /{id}` / `DELETE /{id}` — 源 CRUD +- `POST /admin/sources/{source_id}/ingest-tokens` — 为 api_push 源生成 ingest token(raw_token 仅一次性返回) +- `GET /admin/sources/{source_id}/ingest-tokens` — 列出某个 source 的 ingest token +- `DELETE /admin/ingest-tokens/{token_id}` — 撤销 ingest token - `POST /admin/refresh/{source_id}` — 立即触发抓取 - `POST /admin/translation/rerun/{article_id}` — 重译 - `GET /admin/health` — 源健康看板 diff --git a/backend/app/api/admin.py b/backend/app/api/admin.py index a5966dd..02b4718 100644 --- a/backend/app/api/admin.py +++ b/backend/app/api/admin.py @@ -47,7 +47,7 @@ async def create_source(body: SourceIn, session: AsyncSession = Depends(get_sess name=body.name, slug=body.slug, kind=body.kind, - url=str(body.url), + url=body.url, detail_selector=body.detail_selector, region=body.region, language_src=body.language_src, diff --git a/backend/app/schemas/source.py b/backend/app/schemas/source.py index ecb0318..4a3778f 100644 --- a/backend/app/schemas/source.py +++ b/backend/app/schemas/source.py @@ -2,8 +2,9 @@ from __future__ import annotations from datetime import datetime +from typing import Annotated -from pydantic import BaseModel, ConfigDict, Field, HttpUrl +from pydantic import BaseModel, ConfigDict, Field from app.models.source import SourceKind @@ -29,11 +30,21 @@ class SourceOut(BaseModel): blocklist_tags: list[str] = [] +# url 字段:正常源要 HttpUrl(校验合法 URL),但 api_push 源是合成占位(类似 api-push://...) +# 用 Annotated Union 区分:rss/html_list/tg_channel → HttpUrl;api_push → str +# 但 SourceIn.kind 未知时(前端一次提交),无法静态区分。最简单的兼容:统一接受 str, +# 入库前在 admin.create_source 里按 kind 分支校验。 +# 这里改成 str(最长 2048),保留手工校验的责任。 +SourceUrlStr = Annotated[str, Field(min_length=1, max_length=2048)] + + class SourceIn(BaseModel): name: str = Field(min_length=1, max_length=128) slug: str = Field(min_length=1, max_length=128, pattern=r"^[a-z0-9-]+$") kind: SourceKind = SourceKind.RSS - url: HttpUrl + # url:不再强制 HttpUrl,允许 api_push 源的合成 url(api-push://...); + # rss/html_list/tg_channel 由 admin.create_source 在入库前手工校验 + url: str = Field(min_length=1, max_length=2048) region: str | None = None language_src: str | None = None priority: int = Field(default=50, ge=1, le=100) diff --git a/backend/app/workers/__main__.py b/backend/app/workers/__main__.py index ab5b419..3897029 100644 --- a/backend/app/workers/__main__.py +++ b/backend/app/workers/__main__.py @@ -29,7 +29,11 @@ logging.basicConfig( async def _rebuild_jobs(scheduler: AsyncIOScheduler) -> None: - """从 sources 表动态构建 job(可热更新)。""" + """从 sources 表动态构建 job(可热更新)。 + + 只调度有抓取语义的源(rss / html_list / tg_channel); + api_push 是被动接收,不进 fetch 调度。 + """ scheduler.remove_all_jobs() async with AsyncSessionLocal() as s: rows = (await s.execute(select(Source).where(Source.enabled.is_(True)))).scalars() @@ -38,6 +42,10 @@ async def _rebuild_jobs(scheduler: AsyncIOScheduler) -> None: logger.warning("no enabled sources; scheduler idle") return for src in sources: + # api_push 源不抓取(由 /api/v1/ingest 被动接收),跳过调度 + if src.kind.value == "api_push": + logger.debug("skip scheduling api_push source: %s", src.slug) + continue trigger = ( CronTrigger.from_crontab(src.fetch_cron) if src.fetch_cron diff --git a/backend/app/workers/pipeline.py b/backend/app/workers/pipeline.py index 112b373..2afe823 100644 --- a/backend/app/workers/pipeline.py +++ b/backend/app/workers/pipeline.py @@ -293,10 +293,14 @@ def _wrap_html(text: str) -> str: # === 全量跑(供测试 / 手动触发) === async def run_once() -> None: async with AsyncSessionLocal() as session: - rows = (await session.execute(select(Source).where(Source.enabled.is_(True)))).scalars() + rows = ( + await session.execute( + select(Source).where(Source.enabled.is_(True), Source.kind != SourceKind.API_PUSH) + ) + ).scalars() sources = list(rows) - logger.info("run_once: %d enabled sources", len(sources)) + logger.info("run_once: %d enabled sources (api_push excluded)", len(sources)) tasks = [fetch_one_source(s.id) for s in sources] await asyncio.gather(*tasks, return_exceptions=True) diff --git a/docs/api-push.md b/docs/api-push.md new file mode 100644 index 0000000..cc8010f --- /dev/null +++ b/docs/api-push.md @@ -0,0 +1,246 @@ +# API Push 短新闻 + +> /api/v1/ingest — 让外部系统(微信机器人、RSS-digest 脚本、自家脚本) +> 直接 POST 中文短新闻入库,跳过抓取和翻译,只跑分类 + 双 provider 点评。 + +跟 RSS 抓取的对比: + +| 维度 | RSS | API Push(短新闻)| +|---|---|---| +| 触发 | worker 周期拉 | 外部主动 POST | +| 内容 | 长文 + 全文抽取 | 标题 + 短文(≤5000 字)| +| 语言 | 外文 → 翻译 | 中文原生,无翻译 | +| LLM 任务 | 排版 + 分类 + 插图 + 双点评 | 分类 + 双点评(无排版、无插图)| +| 入库 kind | `rss` / `html_list` / `tg_channel` | `api_push` | +| 去重 key | `url_hash` | `content_hash`(= SHA1 文本指纹) | + +--- + +## 调用方协议 + +### 鉴权 + +每个 source 一个独立 token(绑定 source_id),通过 `X-Ingest-Token` 头传。 + +```http +POST /api/v1/ingest +Content-Type: application/json +X-Ingest-Token: + +{ + "external_id": "wx-2026-06-14-001", + "title": "美联储宣布维持利率不变", + "body": "美联储在最新议息会议后宣布...", + "url": "https://example.com/article/123", + "source_ref": "wechat", + "author": "张三", + "published_at": "2026-06-14T10:00:00Z", + "tags": ["财经", "美联储"] +} +``` + +字段: + +| 字段 | 必填 | 长度 | 说明 | +|---|---|---|---| +| `title` | ✅ | 1-200 字 | 文章标题 | +| `body` | ✅ | 1-5000 字 | 正文纯文本(保留 `\n` 换行)| +| `external_id` | 推荐 | ≤128 字 | 调用方业务 ID,幂等 key | +| `url` | ❌ | ≤2048 字 | 原始链接(可选,纯展示用)| +| `source_ref` | ❌ | ≤64 字 | 短新闻里的二级来源标识,如 `"wechat"`/`"rss-digest"` | +| `author` | ❌ | ≤255 字 | 作者 | +| `published_at` | ❌ | ISO8601 | 发布时间,缺省 = now | +| `tags` | ❌ | ≤10 个,每个 ≤32 字 | 直接写入 category,跳过 LLM 分类 | + +### 响应 + +成功新建(201): + +```json +{ + "article_id": 12345, + "content_hash": "a1b2c3...", + "status": "created" +} +``` + +命中三层去重之一(200): + +```json +{ + "article_id": 12340, + "content_hash": "a1b2c3...", + "status": "duplicate", + "reason": "external_id_match", + "matched_external_id": "wx-2026-06-14-001" +} +``` + +`reason` 取值: + +- `external_id_match` —— L1,同 source 下已有相同 `external_id` +- `content_hash_match` —— L2,任意 source 下已有相同内容指纹 +- `db_unique` —— L3,并发场景下 DB UNIQUE 约束兜底 + +错误响应: + +| 状态码 | 含义 | 触发 | +|---|---|---| +| 400 | 字段缺失/超长 | pydantic 自动校验 | +| 401 | token 无效/吊销/过期 | `X-Ingest-Token` 缺失或不匹配 | +| 403 | 源 disabled 或 kind 不匹配 | source.enabled=false 或 kind≠api_push | +| 404 | source 不存在 | token 绑定的 source_id 不在 | +| 429 | 触发限速 | 同 token 1 秒内推送 >2 篇(`INGEST_RATE_PER_SEC`)| + +--- + +## 三层去重 + +``` +请求进来 + │ + ├─ L1:SELECT WHERE external_id=? AND source_id=? + │ → 命中 → 200 duplicate, reason=external_id_match + │ + ├─ L2:SELECT WHERE content_hash=? + │ → 命中 → 200 duplicate, reason=content_hash_match + │ + ├─ INSERT (UNIQUE 约束兜底) + │ → IntegrityError → 200 duplicate, reason=db_unique + │ + └─ 写入成功 → 201 created +``` + +`content_hash` 算法: + +```python +if external_id: + raw = f"ext:{external_id}" +else: + raw = f"{title.strip()}|{body[:500]}" # 仅取前 500 字符,防尾部噪声 +sha1(raw) → 40 字符 +``` + +- `external_id` 存在时 → 强幂等(调用方业务 ID 不变就一定去重) +- `external_id` 缺失时 → 标题 + 前 500 字符作为兜底指纹 + +--- + +## 限速 + +每个 token 独立计数(Redis),默认 2 篇/秒。改 `.env` 的 `INGEST_RATE_PER_SEC` +后重启 api 容器生效。 + +429 响应带 `Retry-After: 1` 头。 + +--- + +## 入库后会发生什么 + +短新闻入库时,`translation_status / format_status / image_ai_status` 均为 `'n/a'`, +跳过翻译、排版、插图。 + +`enrichment_loop` 看到 `is_short_news=True` + 任意 `*_status='pending'` 时: + +1. **classify**(分类):带 `tags` 入库 → 直接标 `'ok'`,跳过;无 `tags` → 调 LLM 分类 +2. **commentary**(Angel 评论):调 LLM 生成 +3. **commentary_meituan**(美团评论):调 LLM 生成 +4. **format** / **image**:跳过 + +完整 lifecycle: + +``` +T+0s POST /api/v1/ingest → 201 created +T+~15s enrichment_loop 扫到 is_short_news=True → classify + 双 commentary +T+~30s 三项 LLM 任务完成,前端拉 /articles/{id} 即可看到分类 + 双 provider 点评 +``` + +--- + +## owner 操作手册 + +### 生成 ingest token + +1. 登录 owner 账号 +2. 进入 `/admin/sources` +3. 新建源(kind=API Push)或点击现有 api_push 源的 **🔑 Token** 按钮 +4. 在弹窗里填名称(如 `wechat-bot`),点 "生成" +5. **raw_token 立即复制保存** —— 关闭弹窗后只显示 hash,不再显示 raw +6. 把 raw_token 配置到调用方,作为 `X-Ingest-Token` 头 + +### 撤销 token + +在 Token 弹窗里点 "撤销" → 该 token 立即失效,下次推送返 401。 + +### 调限速 + +编辑 `.env` → `INGEST_RATE_PER_SEC=N` → `docker compose up -d --no-deps --force-recreate api` +(**不是 `restart`** —— `restart` 不会重新读 env;容器重建才会) + +--- + +## 调用方示例 + +### curl + +```bash +curl -X POST https://your-domain/api/v1/ingest \ + -H "Content-Type: application/json" \ + -H "X-Ingest-Token: YOUR_RAW_TOKEN" \ + -d '{ + "external_id": "wx-2026-06-14-001", + "title": "美联储宣布维持利率不变", + "body": "美联储在最新议息会议后宣布,将联邦基金利率目标区间维持在 4.25%-4.50%。这是连续第二次按兵不动...", + "source_ref": "wechat", + "published_at": "2026-06-14T10:00:00Z", + "tags": ["财经", "美联储"] + }' +``` + +### Python(httpx) + +```python +import httpx + +async def push_short_news(token: str, **payload): + async with httpx.AsyncClient() as client: + r = await client.post( + "https://your-domain/api/v1/ingest", + headers={"X-Ingest-Token": token}, + json=payload, + timeout=10.0, + ) + r.raise_for_status() + return r.json() +``` + +### 重试策略(应对 429) + +```python +import asyncio +from httpx import HTTPStatusError + +async def push_with_retry(token, **payload, max_retries=3): + for attempt in range(max_retries): + try: + return await push_short_news(token, **payload) + except HTTPStatusError as e: + if e.response.status_code == 429 and attempt < max_retries - 1: + # 429:等 Retry-After 头指定的秒数(默认 1 秒) + wait = int(e.response.headers.get("Retry-After", 1)) + await asyncio.sleep(wait) + continue + raise +``` + +--- + +## 故障排查 + +| 现象 | 排查 | 修复 | +|---|---|---| +| 401 invalid ingest token | token 是否被撤销/过期;DB 里 `api_tokens.purpose` 是否为 `ingest` | 重新生成 | +| 413 body length invalid | body 超过 5000 字 | 截断或拆条 | +| 429 rate limit exceeded | 同 token 1 秒内推 >2 篇 | 退避 1 秒重试,或调高 `INGEST_RATE_PER_SEC` | +| 短新闻一直 `classify_status='pending'` | `enrichment_loop` 是否在跑;`/admin/health` 看 worker 状态 | 重启 worker;检查 `.env` 里 `AGNES_API_KEY` / `MEITUAN_API_KEY` | +| 短新闻在 Feed 里没出现 | 看后端日志 `docker compose logs worker \| grep ingest`;DB `SELECT * FROM articles WHERE is_short_news=true` 看是否入库 | 入库成功但 enrich 没跑就等;入库失败看 ingest 日志 | \ No newline at end of file diff --git a/frontend/src/api/articles.ts b/frontend/src/api/articles.ts index 4adea1b..4a53555 100644 --- a/frontend/src/api/articles.ts +++ b/frontend/src/api/articles.ts @@ -41,6 +41,10 @@ export interface ArticleListItem { commentary_meituan_status?: string | null commentary_engine?: string | null // angel / meituan / "angel,meituan" image_ai_url?: string | null + // === API Push 短新闻标识 === + // 短新闻(中文原生,由 /api/v1/ingest 推送)走差异化展示 + is_short_news: boolean + source_ref?: string | null // 短新闻里再细分来源(wechat/rss-digest 等) is_starred: boolean is_read: boolean // 当前用户是否已读 } @@ -79,6 +83,8 @@ export interface ArticleDetail extends ArticleListItem { entities?: Record | null sentiment?: number | null duplicate_of?: number | null + // === API Push 短新闻 === + external_id?: string | null // 调用方幂等 key } export interface LlmSetting { @@ -209,4 +215,29 @@ export const adminApi = { `/admin/llm/enrich/${articleId}` ).then((r) => r.data) }, + // === API Push ingest token 管理 === + listIngestTokens(sourceId: number) { + return http.get(`/admin/sources/${sourceId}/ingest-tokens`).then((r) => r.data) + }, + createIngestToken(sourceId: number, body: { name?: string; expires_days?: number }) { + return http.post(`/admin/sources/${sourceId}/ingest-tokens`, body).then((r) => r.data) + }, + revokeIngestToken(tokenId: number) { + return http.delete<{ id: number; revoked_at: string; already_revoked: boolean }>( + `/admin/ingest-tokens/${tokenId}` + ).then((r) => r.data) + }, +} + +export interface IngestTokenOut { + id: number + source_id: number + name: string + purpose: string + created_at: string + expires_at?: string | null + revoked_at?: string | null + last_used_at?: string | null + // 仅 createIngestToken 返回时填充(raw_token 只一次性返给前端) + raw_token?: string | null } diff --git a/frontend/src/views/AdminSources.vue b/frontend/src/views/AdminSources.vue index fd325d3..19de923 100644 --- a/frontend/src/views/AdminSources.vue +++ b/frontend/src/views/AdminSources.vue @@ -1,10 +1,10 @@