320 lines
11 KiB
Python
320 lines
11 KiB
Python
"""核心 pipeline:
|
|
- 抓取(去重 + 入库)
|
|
- 翻译(分块 + 配额管理)
|
|
- 手动 run_once / fetch_one_source / translate_article
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
|
|
|
from app.config import settings
|
|
from app.database import AsyncSessionLocal
|
|
from app.models.article import Article
|
|
from app.models.source import Source, SourceKind
|
|
from app.services.fetchers import get_fetcher
|
|
from app.services.fetchers.base import FetchedItem, url_hash
|
|
from app.services.translation.service import service as translation_service
|
|
|
|
logger = logging.getLogger("news.pipeline")
|
|
|
|
TRANSLATE_BODY_MAX = 8000 # 单篇正文最大翻译字符
|
|
SEM_PER_SOURCE = asyncio.Semaphore(2) # 同一源抓取并发
|
|
|
|
|
|
# === 抓取 + 入库 ===
|
|
async def fetch_one_source(source_id: int) -> None:
|
|
async with SEM_PER_SOURCE:
|
|
async with AsyncSessionLocal() as session:
|
|
src = (
|
|
await session.execute(select(Source).where(Source.id == source_id))
|
|
).scalar_one_or_none()
|
|
if not src or not src.enabled:
|
|
logger.info("source %s disabled or missing", source_id)
|
|
return
|
|
|
|
try:
|
|
fetcher = get_fetcher(src.kind.value, url=src.url, headers=src.headers_json)
|
|
items = await fetcher.fetch()
|
|
except Exception as e:
|
|
logger.exception("fetch failed for %s: %s", src.slug, e)
|
|
await _mark_failure(source_id, f"fetch: {type(e).__name__}: {e}")
|
|
return
|
|
|
|
if not items:
|
|
await _mark_success(source_id, n_new=0)
|
|
return
|
|
|
|
n_new = await _bulk_insert(src, items)
|
|
await _mark_success(source_id, n_new=n_new)
|
|
logger.info("source %s: %d new articles (translation deferred to background loop)", src.slug, n_new)
|
|
|
|
|
|
async def _mark_failure(source_id: int, status: str) -> None:
|
|
async with AsyncSessionLocal() as session:
|
|
src = (
|
|
await session.execute(select(Source).where(Source.id == source_id))
|
|
).scalar_one_or_none()
|
|
if not src:
|
|
return
|
|
src.last_status = status
|
|
src.consecutive_failures += 1
|
|
src.last_fetched_at = datetime.now(timezone.utc)
|
|
if src.consecutive_failures >= settings.fetch_fail_pause_threshold:
|
|
# 退避:把 interval 翻倍,封顶 720 分钟
|
|
src.fetch_interval_min = min(720, src.fetch_interval_min * 2)
|
|
logger.warning(
|
|
"source %s paused, interval bumped to %dm",
|
|
src.slug,
|
|
src.fetch_interval_min,
|
|
)
|
|
await session.commit()
|
|
|
|
|
|
async def _mark_success(source_id: int, n_new: int) -> None:
|
|
async with AsyncSessionLocal() as session:
|
|
src = (
|
|
await session.execute(select(Source).where(Source.id == source_id))
|
|
).scalar_one_or_none()
|
|
if not src:
|
|
return
|
|
src.last_status = f"ok:new={n_new}"
|
|
src.consecutive_failures = 0
|
|
src.last_fetched_at = datetime.now(timezone.utc)
|
|
await session.commit()
|
|
|
|
|
|
async def _bulk_insert(src: Source, items: list[FetchedItem]) -> int:
|
|
"""用 PG ON CONFLICT DO NOTHING 去重;返回新插入行数。"""
|
|
if not items:
|
|
return 0
|
|
rows = []
|
|
for it in items:
|
|
if not it.title or not it.url:
|
|
continue
|
|
rows.append(
|
|
{
|
|
"source_id": src.id,
|
|
"url": it.url,
|
|
"url_hash": url_hash(it.url),
|
|
"guid": it.guid,
|
|
"title": it.title[:512],
|
|
"body_html": (it.body_html or "")[:65535],
|
|
"body_text": (it.body_text or "")[:65535],
|
|
"lang_src": it.lang or src.language_src,
|
|
"author": it.author,
|
|
"image_url": it.image_url,
|
|
"published_at": it.published_at,
|
|
"translation_status": "pending",
|
|
}
|
|
)
|
|
if not rows:
|
|
return 0
|
|
|
|
async with AsyncSessionLocal() as session:
|
|
stmt = (
|
|
pg_insert(Article)
|
|
.values(rows)
|
|
.on_conflict_do_nothing(index_elements=["url_hash"])
|
|
.returning(Article.id)
|
|
)
|
|
result = await session.execute(stmt)
|
|
inserted_ids = [r[0] for r in result.all()]
|
|
await session.commit()
|
|
return len(inserted_ids)
|
|
|
|
|
|
# === 翻译 ===
|
|
async def _translate_recent_for_source(source_id: int, max_n: int = 20) -> None:
|
|
async with AsyncSessionLocal() as session:
|
|
rows = (
|
|
await session.execute(
|
|
select(Article)
|
|
.where(Article.source_id == source_id, Article.translation_status == "pending")
|
|
.order_by(Article.published_at.desc().nullslast(), Article.id.desc())
|
|
.limit(max_n)
|
|
)
|
|
).scalars()
|
|
article_ids = [a.id for a in rows]
|
|
for aid in article_ids:
|
|
await translate_article(aid)
|
|
|
|
|
|
async def translate_article(article_id: int) -> None:
|
|
async with AsyncSessionLocal() as session:
|
|
art = (
|
|
await session.execute(select(Article).where(Article.id == article_id))
|
|
).scalar_one_or_none()
|
|
if not art:
|
|
return
|
|
if art.translation_status not in ("pending", "failed"):
|
|
return
|
|
title = art.title
|
|
body_text = (art.body_text or "")[:TRANSLATE_BODY_MAX]
|
|
# lang_src 优先级:article.lang_src > source.language_src > "auto"
|
|
# (article 入库时已经优先用了 feedparser 的 lang,这里再做一次兜底)
|
|
if not art.lang_src and art.source and art.source.language_src:
|
|
lang_src = art.source.language_src
|
|
else:
|
|
lang_src = art.lang_src or "auto"
|
|
target = "zh"
|
|
article_id_ref = art.id
|
|
|
|
if not body_text and not title:
|
|
return
|
|
|
|
total_chars = 0
|
|
try:
|
|
# title
|
|
tr_title = await translation_service.translate(title, source=lang_src, target=target)
|
|
total_chars += tr_title.chars
|
|
|
|
# body 段落切分 + 重组
|
|
chunks = _chunk_text(body_text, max_chars=settings.tencent_tmt_max_chars_per_req)
|
|
translated_chunks: list[str] = []
|
|
for ch in chunks:
|
|
tr = await translation_service.translate(ch, source=lang_src, target=target)
|
|
total_chars += tr.chars
|
|
translated_chunks.append(tr.text)
|
|
tr_body = "\n\n".join(translated_chunks)
|
|
|
|
engine_label = "tencent"
|
|
status = "ok" if (tr_title.text and tr_body) else "partial"
|
|
except Exception as e:
|
|
logger.exception("translate article %s failed: %s", article_id, e)
|
|
async with AsyncSessionLocal() as session:
|
|
art = (
|
|
await session.execute(select(Article).where(Article.id == article_id))
|
|
).scalar_one_or_none()
|
|
if art:
|
|
art.translation_status = "failed"
|
|
await session.commit()
|
|
return
|
|
|
|
# 写回
|
|
async with AsyncSessionLocal() as session:
|
|
art = (
|
|
await session.execute(select(Article).where(Article.id == article_id_ref))
|
|
).scalar_one_or_none()
|
|
if art:
|
|
art.title_zh = tr_title.text if tr_title.text else None
|
|
art.body_zh_text = tr_body or None
|
|
art.body_zh_html = _wrap_html(tr_body) if tr_body else None
|
|
art.translation_status = status
|
|
art.translation_engine = engine_label
|
|
art.translation_chars = total_chars
|
|
art.translated_at = datetime.now(timezone.utc)
|
|
await session.commit()
|
|
logger.info("article %s translated: %d chars, %s", article_id, total_chars, engine_label)
|
|
|
|
|
|
def _chunk_text(text: str, max_chars: int) -> list[str]:
|
|
if not text:
|
|
return []
|
|
paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
|
|
chunks: list[str] = []
|
|
cur = ""
|
|
for p in paragraphs:
|
|
if len(p) > max_chars:
|
|
# 单段过长:按句号切
|
|
sentences = _split_long_para(p, max_chars)
|
|
for s in sentences:
|
|
if len(cur) + len(s) + 2 > max_chars:
|
|
if cur:
|
|
chunks.append(cur)
|
|
cur = s
|
|
else:
|
|
cur = (cur + "\n\n" + s).strip() if cur else s
|
|
else:
|
|
if len(cur) + len(p) + 2 > max_chars:
|
|
if cur:
|
|
chunks.append(cur)
|
|
cur = p
|
|
else:
|
|
cur = (cur + "\n\n" + p).strip() if cur else p
|
|
if cur:
|
|
chunks.append(cur)
|
|
return chunks
|
|
|
|
|
|
def _split_long_para(para: str, max_chars: int) -> list[str]:
|
|
parts: list[str] = []
|
|
cur = ""
|
|
for ch in para:
|
|
cur += ch
|
|
if ch in ".!?。!?" and len(cur) >= max_chars // 2:
|
|
parts.append(cur.strip())
|
|
cur = ""
|
|
if cur.strip():
|
|
parts.append(cur.strip())
|
|
if not parts:
|
|
return [para[:max_chars]]
|
|
return parts
|
|
|
|
|
|
def _wrap_html(text: str) -> str:
|
|
"""把译文包成 HTML 段落。"""
|
|
from bs4 import BeautifulSoup
|
|
|
|
parts = [f"<p>{p.strip()}</p>" for p in text.split("\n\n") if p.strip()]
|
|
return "\n".join(parts) if parts else ""
|
|
|
|
|
|
# === 全量跑(供测试 / 手动触发) ===
|
|
async def run_once() -> None:
|
|
async with AsyncSessionLocal() as session:
|
|
rows = (await session.execute(select(Source).where(Source.enabled.is_(True)))).scalars()
|
|
sources = list(rows)
|
|
|
|
logger.info("run_once: %d enabled sources", len(sources))
|
|
tasks = [fetch_one_source(s.id) for s in sources]
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
|
|
# === 翻译后台循环 ===
|
|
# 1 篇/秒(Semaphore 1 已经在 service 内部,这里是节拍)
|
|
TRANSLATION_INTERVAL_SEC = 1.0
|
|
TRANSLATION_IDLE_INTERVAL_SEC = 5.0
|
|
TRANSLATION_BATCH_SIZE = 1 # 每轮最多翻译 1 篇
|
|
|
|
|
|
async def translation_loop() -> None:
|
|
"""独立的翻译 worker。
|
|
- 不和 RSS 抓取并行
|
|
- 1 篇/秒(用 TRANSLATION_INTERVAL_SEC 控制)
|
|
- 失败 status 写 'failed',下一次循环重试
|
|
"""
|
|
logger.info("translation_loop started (interval=%.1fs)", TRANSLATION_INTERVAL_SEC)
|
|
while True:
|
|
try:
|
|
async with AsyncSessionLocal() as session:
|
|
rows = (
|
|
await session.execute(
|
|
select(Article)
|
|
.where(Article.translation_status.in_(("pending", "failed")))
|
|
.order_by(Article.fetched_at.asc(), Article.id.asc())
|
|
.limit(TRANSLATION_BATCH_SIZE)
|
|
)
|
|
).scalars()
|
|
aids = [a.id for a in rows]
|
|
|
|
if not aids:
|
|
# 没活,等久一点
|
|
await asyncio.sleep(TRANSLATION_IDLE_INTERVAL_SEC)
|
|
continue
|
|
|
|
for aid in aids:
|
|
try:
|
|
await translate_article(aid)
|
|
except Exception as e:
|
|
logger.exception("translate_article %s failed: %s", aid, e)
|
|
# 1 篇/秒节拍
|
|
await asyncio.sleep(TRANSLATION_INTERVAL_SEC)
|
|
except Exception as e:
|
|
logger.exception("translation_loop error: %s", e)
|
|
await asyncio.sleep(TRANSLATION_IDLE_INTERVAL_SEC)
|