后端(支持 api_push source 创建/调度): - schemas/source.py:SourceIn.url 改成 str(允许 api_push 的 api-push:// 占位) - admin.py create_source 简化 url 传递 - workers/__main__.py:_rebuild_jobs 跳过 api_push 源(它是被动接收,不抓取) - workers/pipeline.py:run_once 也加同条件,api_push 不进抓取循环 前端: - api/articles.ts:ArticleListItem 加 is_short_news(required)/source_ref; ArticleDetail 加 external_id;导出 IngestTokenOut;adminApi 加 list/create/revoke ingest token 三个方法 - views/Feed.vue:卡片根 class 短新闻加 short-card(淡蓝底 #f6f9fc + 左侧 3px 蓝色色条 #4f9eff);元信息栏加 📰 短讯 角标;长新闻摘要 body_zh_text 截前 200 字,短新闻不截取保留换行(white-space: pre-wrap); 短新闻不显示 AI 插图 - views/ArticleDetail.vue:tag 行加 📰 短讯 + source_ref 角标;短新闻 路径下隐藏翻译状态/重译/原文链接按钮;正文区短新闻直接渲染 body_zh_text,跳过译文/原文/AI 配图卡片;Angel + 美团双评论卡片 都保留 - views/AdminSources.vue:kind 加 api_push 选项;api_push 源 URL 字段 变只读占位、隐藏抓取间隔;列表操作列加 🔑 Token 按钮; 弹窗支持生成(raw_token 一次性显示 + 复制)/列表/撤销 文档: - docs/api-push.md:调用方契约 + 三层去重 + 限速 + lifecycle + owner 操作手册 + curl/Python 示例 + 重试策略 + 故障排查 - README.md:关键特性加 API Push;API 概览加 /api/v1/ingest 和 3 个 /admin/.../ingest-tokens 端点
7.4 KiB
7.4 KiB
API Push 短新闻
/api/v1/ingest — 让外部系统(微信机器人、RSS-digest 脚本、自家脚本) 直接 POST 中文短新闻入库,跳过抓取和翻译,只跑分类 + 双 provider 点评。
跟 RSS 抓取的对比:
| 维度 | RSS | API Push(短新闻) |
|---|---|---|
| 触发 | worker 周期拉 | 外部主动 POST |
| 内容 | 长文 + 全文抽取 | 标题 + 短文(≤5000 字) |
| 语言 | 外文 → 翻译 | 中文原生,无翻译 |
| LLM 任务 | 排版 + 分类 + 插图 + 双点评 | 分类 + 双点评(无排版、无插图) |
| 入库 kind | rss / html_list / tg_channel |
api_push |
| 去重 key | url_hash |
content_hash(= SHA1 文本指纹) |
调用方协议
鉴权
每个 source 一个独立 token(绑定 source_id),通过 X-Ingest-Token 头传。
POST /api/v1/ingest
Content-Type: application/json
X-Ingest-Token: <per-source-token>
{
"external_id": "wx-2026-06-14-001",
"title": "美联储宣布维持利率不变",
"body": "美联储在最新议息会议后宣布...",
"url": "https://example.com/article/123",
"source_ref": "wechat",
"author": "张三",
"published_at": "2026-06-14T10:00:00Z",
"tags": ["财经", "美联储"]
}
字段:
| 字段 | 必填 | 长度 | 说明 |
|---|---|---|---|
title |
✅ | 1-200 字 | 文章标题 |
body |
✅ | 1-5000 字 | 正文纯文本(保留 \n 换行) |
external_id |
推荐 | ≤128 字 | 调用方业务 ID,幂等 key |
url |
❌ | ≤2048 字 | 原始链接(可选,纯展示用) |
source_ref |
❌ | ≤64 字 | 短新闻里的二级来源标识,如 "wechat"/"rss-digest" |
author |
❌ | ≤255 字 | 作者 |
published_at |
❌ | ISO8601 | 发布时间,缺省 = now |
tags |
❌ | ≤10 个,每个 ≤32 字 | 直接写入 category,跳过 LLM 分类 |
响应
成功新建(201):
{
"article_id": 12345,
"content_hash": "a1b2c3...",
"status": "created"
}
命中三层去重之一(200):
{
"article_id": 12340,
"content_hash": "a1b2c3...",
"status": "duplicate",
"reason": "external_id_match",
"matched_external_id": "wx-2026-06-14-001"
}
reason 取值:
external_id_match—— L1,同 source 下已有相同external_idcontent_hash_match—— L2,任意 source 下已有相同内容指纹db_unique—— L3,并发场景下 DB UNIQUE 约束兜底
错误响应:
| 状态码 | 含义 | 触发 |
|---|---|---|
| 400 | 字段缺失/超长 | pydantic 自动校验 |
| 401 | token 无效/吊销/过期 | X-Ingest-Token 缺失或不匹配 |
| 403 | 源 disabled 或 kind 不匹配 | source.enabled=false 或 kind≠api_push |
| 404 | source 不存在 | token 绑定的 source_id 不在 |
| 429 | 触发限速 | 同 token 1 秒内推送 >2 篇(INGEST_RATE_PER_SEC) |
三层去重
请求进来
│
├─ L1:SELECT WHERE external_id=? AND source_id=?
│ → 命中 → 200 duplicate, reason=external_id_match
│
├─ L2:SELECT WHERE content_hash=?
│ → 命中 → 200 duplicate, reason=content_hash_match
│
├─ INSERT (UNIQUE 约束兜底)
│ → IntegrityError → 200 duplicate, reason=db_unique
│
└─ 写入成功 → 201 created
content_hash 算法:
if external_id:
raw = f"ext:{external_id}"
else:
raw = f"{title.strip()}|{body[:500]}" # 仅取前 500 字符,防尾部噪声
sha1(raw) → 40 字符
external_id存在时 → 强幂等(调用方业务 ID 不变就一定去重)external_id缺失时 → 标题 + 前 500 字符作为兜底指纹
限速
每个 token 独立计数(Redis),默认 2 篇/秒。改 .env 的 INGEST_RATE_PER_SEC
后重启 api 容器生效。
429 响应带 Retry-After: 1 头。
入库后会发生什么
短新闻入库时,translation_status / format_status / image_ai_status 均为 'n/a',
跳过翻译、排版、插图。
enrichment_loop 看到 is_short_news=True + 任意 *_status='pending' 时:
- classify(分类):带
tags入库 → 直接标'ok',跳过;无tags→ 调 LLM 分类 - commentary(Angel 评论):调 LLM 生成
- commentary_meituan(美团评论):调 LLM 生成
- format / image:跳过
完整 lifecycle:
T+0s POST /api/v1/ingest → 201 created
T+~15s enrichment_loop 扫到 is_short_news=True → classify + 双 commentary
T+~30s 三项 LLM 任务完成,前端拉 /articles/{id} 即可看到分类 + 双 provider 点评
owner 操作手册
生成 ingest token
- 登录 owner 账号
- 进入
/admin/sources - 新建源(kind=API Push)或点击现有 api_push 源的 🔑 Token 按钮
- 在弹窗里填名称(如
wechat-bot),点 "生成" - raw_token 立即复制保存 —— 关闭弹窗后只显示 hash,不再显示 raw
- 把 raw_token 配置到调用方,作为
X-Ingest-Token头
撤销 token
在 Token 弹窗里点 "撤销" → 该 token 立即失效,下次推送返 401。
调限速
编辑 .env → INGEST_RATE_PER_SEC=N → docker compose up -d --no-deps --force-recreate api
(不是 restart —— restart 不会重新读 env;容器重建才会)
调用方示例
curl
curl -X POST https://your-domain/api/v1/ingest \
-H "Content-Type: application/json" \
-H "X-Ingest-Token: YOUR_RAW_TOKEN" \
-d '{
"external_id": "wx-2026-06-14-001",
"title": "美联储宣布维持利率不变",
"body": "美联储在最新议息会议后宣布,将联邦基金利率目标区间维持在 4.25%-4.50%。这是连续第二次按兵不动...",
"source_ref": "wechat",
"published_at": "2026-06-14T10:00:00Z",
"tags": ["财经", "美联储"]
}'
Python(httpx)
import httpx
async def push_short_news(token: str, **payload):
async with httpx.AsyncClient() as client:
r = await client.post(
"https://your-domain/api/v1/ingest",
headers={"X-Ingest-Token": token},
json=payload,
timeout=10.0,
)
r.raise_for_status()
return r.json()
重试策略(应对 429)
import asyncio
from httpx import HTTPStatusError
async def push_with_retry(token, **payload, max_retries=3):
for attempt in range(max_retries):
try:
return await push_short_news(token, **payload)
except HTTPStatusError as e:
if e.response.status_code == 429 and attempt < max_retries - 1:
# 429:等 Retry-After 头指定的秒数(默认 1 秒)
wait = int(e.response.headers.get("Retry-After", 1))
await asyncio.sleep(wait)
continue
raise
故障排查
| 现象 | 排查 | 修复 |
|---|---|---|
| 401 invalid ingest token | token 是否被撤销/过期;DB 里 api_tokens.purpose 是否为 ingest |
重新生成 |
| 413 body length invalid | body 超过 5000 字 | 截断或拆条 |
| 429 rate limit exceeded | 同 token 1 秒内推 >2 篇 | 退避 1 秒重试,或调高 INGEST_RATE_PER_SEC |
短新闻一直 classify_status='pending' |
enrichment_loop 是否在跑;/admin/health 看 worker 状态 |
重启 worker;检查 .env 里 AGNES_API_KEY / MEITUAN_API_KEY |
| 短新闻在 Feed 里没出现 | 看后端日志 docker compose logs worker | grep ingest;DB SELECT * FROM articles WHERE is_short_news=true 看是否入库 |
入库成功但 enrich 没跑就等;入库失败看 ingest 日志 |