Files
diary-news/backend/app/workers/pipeline.py
Mavis 9862a92423 perf: 翻译独立后台循环(1 篇/秒)+ Semaphore 1
之前 fetch_one_source 入库后立即调翻译(可能并发触发腾讯 TMT 限速)
改为独立 translation_loop 后台循环:
- 完全不和 RSS 抓取并行
- 1 篇/秒节拍(Semaphore 1 + sleep 1.0)
- 没活时空闲 5 秒再轮询
- pending/failed 都重试
2026-06-08 00:27:09 +08:00

315 lines
11 KiB
Python

"""核心 pipeline:
- 抓取(去重 + 入库)
- 翻译(分块 + 配额管理)
- 手动 run_once / fetch_one_source / translate_article
"""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timezone
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from app.config import settings
from app.database import AsyncSessionLocal
from app.models.article import Article
from app.models.source import Source, SourceKind
from app.services.fetchers import get_fetcher
from app.services.fetchers.base import FetchedItem, url_hash
from app.services.translation.service import service as translation_service
logger = logging.getLogger("news.pipeline")
TRANSLATE_BODY_MAX = 8000 # 单篇正文最大翻译字符
SEM_PER_SOURCE = asyncio.Semaphore(2) # 同一源抓取并发
# === 抓取 + 入库 ===
async def fetch_one_source(source_id: int) -> None:
async with SEM_PER_SOURCE:
async with AsyncSessionLocal() as session:
src = (
await session.execute(select(Source).where(Source.id == source_id))
).scalar_one_or_none()
if not src or not src.enabled:
logger.info("source %s disabled or missing", source_id)
return
try:
fetcher = get_fetcher(src.kind.value, url=src.url, headers=src.headers_json)
items = await fetcher.fetch()
except Exception as e:
logger.exception("fetch failed for %s: %s", src.slug, e)
await _mark_failure(source_id, f"fetch: {type(e).__name__}: {e}")
return
if not items:
await _mark_success(source_id, n_new=0)
return
n_new = await _bulk_insert(src, items)
await _mark_success(source_id, n_new=n_new)
logger.info("source %s: %d new articles (translation deferred to background loop)", src.slug, n_new)
async def _mark_failure(source_id: int, status: str) -> None:
async with AsyncSessionLocal() as session:
src = (
await session.execute(select(Source).where(Source.id == source_id))
).scalar_one_or_none()
if not src:
return
src.last_status = status
src.consecutive_failures += 1
src.last_fetched_at = datetime.now(timezone.utc)
if src.consecutive_failures >= settings.fetch_fail_pause_threshold:
# 退避:把 interval 翻倍,封顶 720 分钟
src.fetch_interval_min = min(720, src.fetch_interval_min * 2)
logger.warning(
"source %s paused, interval bumped to %dm",
src.slug,
src.fetch_interval_min,
)
await session.commit()
async def _mark_success(source_id: int, n_new: int) -> None:
async with AsyncSessionLocal() as session:
src = (
await session.execute(select(Source).where(Source.id == source_id))
).scalar_one_or_none()
if not src:
return
src.last_status = f"ok:new={n_new}"
src.consecutive_failures = 0
src.last_fetched_at = datetime.now(timezone.utc)
await session.commit()
async def _bulk_insert(src: Source, items: list[FetchedItem]) -> int:
"""用 PG ON CONFLICT DO NOTHING 去重;返回新插入行数。"""
if not items:
return 0
rows = []
for it in items:
if not it.title or not it.url:
continue
rows.append(
{
"source_id": src.id,
"url": it.url,
"url_hash": url_hash(it.url),
"guid": it.guid,
"title": it.title[:512],
"body_html": (it.body_html or "")[:65535],
"body_text": (it.body_text or "")[:65535],
"lang_src": it.lang or src.language_src,
"author": it.author,
"image_url": it.image_url,
"published_at": it.published_at,
"translation_status": "pending",
}
)
if not rows:
return 0
async with AsyncSessionLocal() as session:
stmt = (
pg_insert(Article)
.values(rows)
.on_conflict_do_nothing(index_elements=["url_hash"])
.returning(Article.id)
)
result = await session.execute(stmt)
inserted_ids = [r[0] for r in result.all()]
await session.commit()
return len(inserted_ids)
# === 翻译 ===
async def _translate_recent_for_source(source_id: int, max_n: int = 20) -> None:
async with AsyncSessionLocal() as session:
rows = (
await session.execute(
select(Article)
.where(Article.source_id == source_id, Article.translation_status == "pending")
.order_by(Article.published_at.desc().nullslast(), Article.id.desc())
.limit(max_n)
)
).scalars()
article_ids = [a.id for a in rows]
for aid in article_ids:
await translate_article(aid)
async def translate_article(article_id: int) -> None:
async with AsyncSessionLocal() as session:
art = (
await session.execute(select(Article).where(Article.id == article_id))
).scalar_one_or_none()
if not art:
return
if art.translation_status not in ("pending", "failed"):
return
title = art.title
body_text = (art.body_text or "")[:TRANSLATE_BODY_MAX]
lang_src = art.lang_src or "auto"
target = "zh"
article_id_ref = art.id
if not body_text and not title:
return
total_chars = 0
try:
# title
tr_title = await translation_service.translate(title, source=lang_src, target=target)
total_chars += tr_title.chars
# body 段落切分 + 重组
chunks = _chunk_text(body_text, max_chars=settings.tencent_tmt_max_chars_per_req)
translated_chunks: list[str] = []
for ch in chunks:
tr = await translation_service.translate(ch, source=lang_src, target=target)
total_chars += tr.chars
translated_chunks.append(tr.text)
tr_body = "\n\n".join(translated_chunks)
engine_label = "tencent"
status = "ok" if (tr_title.text and tr_body) else "partial"
except Exception as e:
logger.exception("translate article %s failed: %s", article_id, e)
async with AsyncSessionLocal() as session:
art = (
await session.execute(select(Article).where(Article.id == article_id))
).scalar_one_or_none()
if art:
art.translation_status = "failed"
await session.commit()
return
# 写回
async with AsyncSessionLocal() as session:
art = (
await session.execute(select(Article).where(Article.id == article_id_ref))
).scalar_one_or_none()
if art:
art.title_zh = tr_title.text if tr_title.text else None
art.body_zh_text = tr_body or None
art.body_zh_html = _wrap_html(tr_body) if tr_body else None
art.translation_status = status
art.translation_engine = engine_label
art.translation_chars = total_chars
art.translated_at = datetime.now(timezone.utc)
await session.commit()
logger.info("article %s translated: %d chars, %s", article_id, total_chars, engine_label)
def _chunk_text(text: str, max_chars: int) -> list[str]:
if not text:
return []
paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
chunks: list[str] = []
cur = ""
for p in paragraphs:
if len(p) > max_chars:
# 单段过长:按句号切
sentences = _split_long_para(p, max_chars)
for s in sentences:
if len(cur) + len(s) + 2 > max_chars:
if cur:
chunks.append(cur)
cur = s
else:
cur = (cur + "\n\n" + s).strip() if cur else s
else:
if len(cur) + len(p) + 2 > max_chars:
if cur:
chunks.append(cur)
cur = p
else:
cur = (cur + "\n\n" + p).strip() if cur else p
if cur:
chunks.append(cur)
return chunks
def _split_long_para(para: str, max_chars: int) -> list[str]:
parts: list[str] = []
cur = ""
for ch in para:
cur += ch
if ch in ".!?。!?" and len(cur) >= max_chars // 2:
parts.append(cur.strip())
cur = ""
if cur.strip():
parts.append(cur.strip())
if not parts:
return [para[:max_chars]]
return parts
def _wrap_html(text: str) -> str:
"""把译文包成 HTML 段落。"""
from bs4 import BeautifulSoup
parts = [f"<p>{p.strip()}</p>" for p in text.split("\n\n") if p.strip()]
return "\n".join(parts) if parts else ""
# === 全量跑(供测试 / 手动触发) ===
async def run_once() -> None:
async with AsyncSessionLocal() as session:
rows = (await session.execute(select(Source).where(Source.enabled.is_(True)))).scalars()
sources = list(rows)
logger.info("run_once: %d enabled sources", len(sources))
tasks = [fetch_one_source(s.id) for s in sources]
await asyncio.gather(*tasks, return_exceptions=True)
# === 翻译后台循环 ===
# 1 篇/秒(Semaphore 1 已经在 service 内部,这里是节拍)
TRANSLATION_INTERVAL_SEC = 1.0
TRANSLATION_IDLE_INTERVAL_SEC = 5.0
TRANSLATION_BATCH_SIZE = 1 # 每轮最多翻译 1 篇
async def translation_loop() -> None:
"""独立的翻译 worker。
- 不和 RSS 抓取并行
- 1 篇/秒(用 TRANSLATION_INTERVAL_SEC 控制)
- 失败 status 写 'failed',下一次循环重试
"""
logger.info("translation_loop started (interval=%.1fs)", TRANSLATION_INTERVAL_SEC)
while True:
try:
async with AsyncSessionLocal() as session:
rows = (
await session.execute(
select(Article)
.where(Article.translation_status.in_(("pending", "failed")))
.order_by(Article.fetched_at.asc(), Article.id.asc())
.limit(TRANSLATION_BATCH_SIZE)
)
).scalars()
aids = [a.id for a in rows]
if not aids:
# 没活,等久一点
await asyncio.sleep(TRANSLATION_IDLE_INTERVAL_SEC)
continue
for aid in aids:
try:
await translate_article(aid)
except Exception as e:
logger.exception("translate_article %s failed: %s", aid, e)
# 1 篇/秒节拍
await asyncio.sleep(TRANSLATION_INTERVAL_SEC)
except Exception as e:
logger.exception("translation_loop error: %s", e)
await asyncio.sleep(TRANSLATION_IDLE_INTERVAL_SEC)