Files
diary-news/backend/app/workers/pipeline.py
xiaji 55e20e923a perf(translate): translation_loop 跳过中文源头,省 TMT 配额
中文 RSS 长新闻(原文就是中文)走 TMT 中翻中,纯粹浪费
月配额(500 万字符)且产生无意义译文。前端 commit 6
已经隐藏"译文"板块;本 commit 在后端拦截,从源头不跑翻译。

改动:
- translation_loop SQL 加 WHERE lang_src IS NULL OR NOT LIKE 'zh%'
  - lang_src 为 NULL 时仍走翻译(英文 RSS 没设 language_src 的合法场景)
  - LIKE 'zh%' 覆盖 zh / zh-CN / zh-Hans / zh-TW 等区域码
- translate_article() 函数内加防御性 guard:中文源直接返
  并把 translation_status 改 'n/a',避免反复入队
  (主路径 SQL 过滤已足够,这里是兜底,应对手动 reset status 的情况)

不影响:
- 短新闻(commit 1 已是 translation_status='n/a',根本不进队列)
- 外文 RSS(走翻译)
- 历史已被错误翻译的中文长新闻:保留 translation_status='ok'
  + body_zh_text 中文(空跑产生的) — commit 6 前端已隐藏,
  不影响用户感知;回滚存量不在本 commit 范围(独立 SQL 即可,
  风险与收益需要单独评估)
- enrichment_loop(commit 1 已经能扫到中文源头的 is_short_news,
  长新闻 lang_src=zh 仍能被 enrichment 处理,排版+插图+评论都跑)

范围:仅 backend/app/workers/pipeline.py,+20/-2 行。
2026-06-14 20:57:11 +08:00

370 lines
14 KiB
Python

"""核心 pipeline:
- 抓取(去重 + 入库)
- 翻译(分块 + 配额管理)
- 手动 run_once / fetch_one_source / translate_article
"""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timezone
from sqlalchemy import or_, select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from app.config import settings
from app.database import AsyncSessionLocal
from app.models.article import Article
from app.models.source import Source, SourceKind
from app.services.fetchers import get_fetcher
from app.services.fetchers.base import FetchedItem, url_hash
from app.services.translation.service import service as translation_service
logger = logging.getLogger("news.pipeline")
TRANSLATE_BODY_MAX = 8000 # 单篇正文最大翻译字符
SEM_PER_SOURCE = asyncio.Semaphore(2) # 同一源抓取并发
# === 抓取 + 入库 ===
async def fetch_one_source(source_id: int) -> None:
async with SEM_PER_SOURCE:
async with AsyncSessionLocal() as session:
src = (
await session.execute(select(Source).where(Source.id == source_id))
).scalar_one_or_none()
if not src or not src.enabled:
logger.info("source %s disabled or missing", source_id)
return
try:
fetcher = get_fetcher(src.kind.value, url=src.url, headers=src.headers_json)
items = await fetcher.fetch()
except Exception as e:
logger.exception("fetch failed for %s: %s", src.slug, e)
await _mark_failure(source_id, f"fetch: {type(e).__name__}: {e}")
return
if not items:
await _mark_success(source_id, n_new=0)
return
n_new = await _bulk_insert(src, items)
await _mark_success(source_id, n_new=n_new)
logger.info("source %s: %d new articles (translation deferred to background loop)", src.slug, n_new)
async def _mark_failure(source_id: int, status: str) -> None:
async with AsyncSessionLocal() as session:
src = (
await session.execute(select(Source).where(Source.id == source_id))
).scalar_one_or_none()
if not src:
return
src.last_status = status
src.consecutive_failures += 1
src.last_fetched_at = datetime.now(timezone.utc)
if src.consecutive_failures >= settings.fetch_fail_pause_threshold:
# 退避:把 interval 翻倍,封顶 720 分钟
src.fetch_interval_min = min(720, src.fetch_interval_min * 2)
logger.warning(
"source %s paused, interval bumped to %dm",
src.slug,
src.fetch_interval_min,
)
await session.commit()
async def _mark_success(source_id: int, n_new: int) -> None:
async with AsyncSessionLocal() as session:
src = (
await session.execute(select(Source).where(Source.id == source_id))
).scalar_one_or_none()
if not src:
return
src.last_status = f"ok:new={n_new}"
src.consecutive_failures = 0
src.last_fetched_at = datetime.now(timezone.utc)
await session.commit()
async def _bulk_insert(src: Source, items: list[FetchedItem]) -> int:
"""用 PG ON CONFLICT DO NOTHING 去重;返回新插入行数。"""
if not items:
return 0
rows = []
for it in items:
if not it.title or not it.url:
continue
rows.append(
{
"source_id": src.id,
"url": it.url,
"url_hash": url_hash(it.url),
"guid": it.guid,
"title": it.title[:512],
"body_html": (it.body_html or "")[:65535],
"body_text": (it.body_text or "")[:65535],
"lang_src": it.lang or src.language_src,
"author": it.author,
"image_url": it.image_url,
"published_at": it.published_at,
"translation_status": "pending",
}
)
if not rows:
return 0
async with AsyncSessionLocal() as session:
stmt = (
pg_insert(Article)
.values(rows)
.on_conflict_do_nothing(index_elements=["url_hash"])
.returning(Article.id)
)
result = await session.execute(stmt)
inserted_ids = [r[0] for r in result.all()]
await session.commit()
return len(inserted_ids)
# === 翻译 ===
async def _translate_recent_for_source(source_id: int, max_n: int = 20) -> None:
async with AsyncSessionLocal() as session:
rows = (
await session.execute(
select(Article)
.where(Article.source_id == source_id, Article.translation_status == "pending")
.order_by(Article.published_at.desc().nullslast(), Article.id.desc())
.limit(max_n)
)
).scalars()
article_ids = [a.id for a in rows]
for aid in article_ids:
await translate_article(aid)
async def translate_article(article_id: int) -> None:
async with AsyncSessionLocal() as session:
art = (
await session.execute(select(Article).where(Article.id == article_id))
).scalar_one_or_none()
if not art:
return
if art.translation_status not in ("pending", "failed"):
return
# 防御性 guard:中文源头(原文就是中文)不应走翻译。
# 正常路径 translation_loop SQL 已经过滤(commit 7),这里兜底。
# 直接把 status 改 n/a 避免反复入队。
if art.lang_src and art.lang_src.lower().startswith("zh"):
logger.info(
"translate_article id=%s skipped: lang_src=%s (中文源,无需翻译)",
article_id, art.lang_src,
)
art.translation_status = "n/a"
await session.commit()
return
title = art.title
body_text = (art.body_text or "")[:TRANSLATE_BODY_MAX]
# lang_src 优先级:article.lang_src > source.language_src > "auto"
# (article 入库时已经优先用了 feedparser 的 lang,这里再做一次兜底)
if not art.lang_src and art.source and art.source.language_src:
lang_src = art.source.language_src
else:
lang_src = art.lang_src or "auto"
target = "zh"
article_id_ref = art.id
if not body_text and not title:
return
total_chars = 0
try:
# title
tr_title = await translation_service.translate(title, source=lang_src, target=target)
total_chars += tr_title.chars
# body 段落切分 + 重组
chunks = _chunk_text(body_text, max_chars=settings.tencent_tmt_max_chars_per_req)
translated_chunks: list[str] = []
last_engine: str | None = None
for ch in chunks:
tr = await translation_service.translate(ch, source=lang_src, target=target)
total_chars += tr.chars
translated_chunks.append(tr.text)
last_engine = tr.engine
tr_body = "\n\n".join(translated_chunks)
# 引擎名取 body 最后一段(更准 — 失败 fallback 后会用 fallback 的引擎)
engine_label = last_engine or tr_title.engine or "tencent"
# === 严格 status 判定 ===
# 防御性:即使 service.py 已经主动检测 marker 并抛异常,
# 万一上游漏了,这里再补一刀 — 不让错误 marker 文本伪装成 ok。
# 出现以下任一情况都视为 failed:
# 1) 标题或正文为空
# 2) 含错误 marker ([翻译失败 / [本条未翻译 / AuthFailure / TencentCloudSDKException)
# 3) body 完全等于 body_text(翻译没起作用,虽然理论上 service 不会返回原文)
bad_markers = ("[翻译失败", "[本条未翻译", "AuthFailure", "TencentCloudSDKException")
combined = (tr_title.text or "") + "\n" + (tr_body or "")
has_marker = any(m in combined for m in bad_markers)
has_content = bool(tr_title.text) and bool(tr_body)
body_untranslated = bool(tr_body) and tr_body == (body_text or "")
if has_marker or body_untranslated:
status = "failed"
logger.warning(
"article %s translation marked failed: marker=%s body_untranslated=%s",
article_id, has_marker, body_untranslated,
)
elif not has_content:
status = "partial"
else:
status = "ok"
except Exception as e:
logger.exception("translate article %s failed: %s", article_id, e)
async with AsyncSessionLocal() as session:
art = (
await session.execute(select(Article).where(Article.id == article_id))
).scalar_one_or_none()
if art:
art.translation_status = "failed"
await session.commit()
return
# 写回
async with AsyncSessionLocal() as session:
art = (
await session.execute(select(Article).where(Article.id == article_id_ref))
).scalar_one_or_none()
if art:
art.title_zh = tr_title.text if tr_title.text else None
art.body_zh_text = tr_body or None
art.body_zh_html = _wrap_html(tr_body) if tr_body else None
art.translation_status = status
art.translation_engine = engine_label
art.translation_chars = total_chars
art.translated_at = datetime.now(timezone.utc)
await session.commit()
logger.info("article %s translated: %d chars, %s", article_id, total_chars, engine_label)
def _chunk_text(text: str, max_chars: int) -> list[str]:
if not text:
return []
paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
chunks: list[str] = []
cur = ""
for p in paragraphs:
if len(p) > max_chars:
# 单段过长:按句号切
sentences = _split_long_para(p, max_chars)
for s in sentences:
if len(cur) + len(s) + 2 > max_chars:
if cur:
chunks.append(cur)
cur = s
else:
cur = (cur + "\n\n" + s).strip() if cur else s
else:
if len(cur) + len(p) + 2 > max_chars:
if cur:
chunks.append(cur)
cur = p
else:
cur = (cur + "\n\n" + p).strip() if cur else p
if cur:
chunks.append(cur)
return chunks
def _split_long_para(para: str, max_chars: int) -> list[str]:
parts: list[str] = []
cur = ""
for ch in para:
cur += ch
if ch in ".!?。!?" and len(cur) >= max_chars // 2:
parts.append(cur.strip())
cur = ""
if cur.strip():
parts.append(cur.strip())
if not parts:
return [para[:max_chars]]
return parts
def _wrap_html(text: str) -> str:
"""把译文包成 HTML 段落。"""
from bs4 import BeautifulSoup
parts = [f"<p>{p.strip()}</p>" for p in text.split("\n\n") if p.strip()]
return "\n".join(parts) if parts else ""
# === 全量跑(供测试 / 手动触发) ===
async def run_once() -> None:
async with AsyncSessionLocal() as session:
rows = (
await session.execute(
select(Source).where(Source.enabled.is_(True), Source.kind != SourceKind.API_PUSH)
)
).scalars()
sources = list(rows)
logger.info("run_once: %d enabled sources (api_push excluded)", len(sources))
tasks = [fetch_one_source(s.id) for s in sources]
await asyncio.gather(*tasks, return_exceptions=True)
# === 翻译后台循环 ===
# 1 篇/秒(Semaphore 1 已经在 service 内部,这里是节拍)
TRANSLATION_INTERVAL_SEC = 1.0
TRANSLATION_IDLE_INTERVAL_SEC = 5.0
TRANSLATION_BATCH_SIZE = 1 # 每轮最多翻译 1 篇
async def translation_loop() -> None:
"""独立的翻译 worker。
- 不和 RSS 抓取并行
- 1 篇/秒(用 TRANSLATION_INTERVAL_SEC 控制)
- 失败 status 写 'failed',下一次循环重试
"""
logger.info("translation_loop started (interval=%.1fs)", TRANSLATION_INTERVAL_SEC)
while True:
try:
async with AsyncSessionLocal() as session:
# 中文源头跳过 — 原文就是中文,TMT 中翻中浪费配额。
# lang_src 为 NULL 时不跳过(可能是英文 RSS 没设 language_src,走翻译正确)
# 前端详情页同步隐藏了"译文"板块(commit 6)
rows = (
await session.execute(
select(Article)
.where(
Article.translation_status.in_(("pending", "failed")),
or_(
Article.lang_src.is_(None),
~Article.lang_src.like("zh%"),
),
)
.order_by(Article.fetched_at.asc(), Article.id.asc())
.limit(TRANSLATION_BATCH_SIZE)
)
).scalars()
aids = [a.id for a in rows]
if not aids:
# 没活,等久一点
await asyncio.sleep(TRANSLATION_IDLE_INTERVAL_SEC)
continue
for aid in aids:
try:
await translate_article(aid)
except Exception as e:
logger.exception("translate_article %s failed: %s", aid, e)
# 1 篇/秒节拍
await asyncio.sleep(TRANSLATION_INTERVAL_SEC)
except Exception as e:
logger.exception("translation_loop error: %s", e)
await asyncio.sleep(TRANSLATION_IDLE_INTERVAL_SEC)