"""核心 pipeline: - 抓取(去重 + 入库) - 翻译(分块 + 配额管理) - 手动 run_once / fetch_one_source / translate_article """ from __future__ import annotations import asyncio import logging from datetime import datetime, timezone from sqlalchemy import select from sqlalchemy.dialects.postgresql import insert as pg_insert from app.config import settings from app.database import AsyncSessionLocal from app.models.article import Article from app.models.source import Source, SourceKind from app.services.fetchers import get_fetcher from app.services.fetchers.base import FetchedItem, url_hash from app.services.translation.service import service as translation_service logger = logging.getLogger("news.pipeline") TRANSLATE_BODY_MAX = 8000 # 单篇正文最大翻译字符 SEM_PER_SOURCE = asyncio.Semaphore(2) # 同一源抓取并发 # === 抓取 + 入库 === async def fetch_one_source(source_id: int) -> None: async with SEM_PER_SOURCE: async with AsyncSessionLocal() as session: src = ( await session.execute(select(Source).where(Source.id == source_id)) ).scalar_one_or_none() if not src or not src.enabled: logger.info("source %s disabled or missing", source_id) return try: fetcher = get_fetcher(src.kind.value, url=src.url, headers=src.headers_json) items = await fetcher.fetch() except Exception as e: logger.exception("fetch failed for %s: %s", src.slug, e) await _mark_failure(source_id, f"fetch: {type(e).__name__}: {e}") return if not items: await _mark_success(source_id, n_new=0) return n_new = await _bulk_insert(src, items) await _mark_success(source_id, n_new=n_new) logger.info("source %s: %d new articles", src.slug, n_new) # 入库后,挑高优先级 / 没翻译的开始翻译 await _translate_recent_for_source(source_id, max_n=20) async def _mark_failure(source_id: int, status: str) -> None: async with AsyncSessionLocal() as session: src = ( await session.execute(select(Source).where(Source.id == source_id)) ).scalar_one_or_none() if not src: return src.last_status = status src.consecutive_failures += 1 src.last_fetched_at = datetime.now(timezone.utc) if src.consecutive_failures >= settings.fetch_fail_pause_threshold: # 退避:把 interval 翻倍,封顶 720 分钟 src.fetch_interval_min = min(720, src.fetch_interval_min * 2) logger.warning( "source %s paused, interval bumped to %dm", src.slug, src.fetch_interval_min, ) await session.commit() async def _mark_success(source_id: int, n_new: int) -> None: async with AsyncSessionLocal() as session: src = ( await session.execute(select(Source).where(Source.id == source_id)) ).scalar_one_or_none() if not src: return src.last_status = f"ok:new={n_new}" src.consecutive_failures = 0 src.last_fetched_at = datetime.now(timezone.utc) await session.commit() async def _bulk_insert(src: Source, items: list[FetchedItem]) -> int: """用 PG ON CONFLICT DO NOTHING 去重;返回新插入行数。""" if not items: return 0 rows = [] for it in items: if not it.title or not it.url: continue rows.append( { "source_id": src.id, "url": it.url, "url_hash": url_hash(it.url), "guid": it.guid, "title": it.title[:512], "body_html": (it.body_html or "")[:65535], "body_text": (it.body_text or "")[:65535], "lang_src": it.lang or src.language_src, "author": it.author, "image_url": it.image_url, "published_at": it.published_at, "translation_status": "pending", } ) if not rows: return 0 async with AsyncSessionLocal() as session: stmt = ( pg_insert(Article) .values(rows) .on_conflict_do_nothing(index_elements=["url_hash"]) .returning(Article.id) ) result = await session.execute(stmt) inserted_ids = [r[0] for r in result.all()] await session.commit() return len(inserted_ids) # === 翻译 === async def _translate_recent_for_source(source_id: int, max_n: int = 20) -> None: async with AsyncSessionLocal() as session: rows = ( await session.execute( select(Article) .where(Article.source_id == source_id, Article.translation_status == "pending") .order_by(Article.published_at.desc().nullslast(), Article.id.desc()) .limit(max_n) ) ).scalars() article_ids = [a.id for a in rows] for aid in article_ids: await translate_article(aid) async def translate_article(article_id: int) -> None: async with AsyncSessionLocal() as session: art = ( await session.execute(select(Article).where(Article.id == article_id)) ).scalar_one_or_none() if not art: return if art.translation_status not in ("pending", "failed"): return title = art.title body_text = (art.body_text or "")[:TRANSLATE_BODY_MAX] lang_src = art.lang_src or "auto" target = "zh" article_id_ref = art.id if not body_text and not title: return total_chars = 0 try: # title tr_title = await translation_service.translate(title, source=lang_src, target=target) total_chars += tr_title.chars # body 段落切分 + 重组 chunks = _chunk_text(body_text, max_chars=settings.tencent_tmt_max_chars_per_req) translated_chunks: list[str] = [] for ch in chunks: tr = await translation_service.translate(ch, source=lang_src, target=target) total_chars += tr.chars translated_chunks.append(tr.text) tr_body = "\n\n".join(translated_chunks) engine_label = "tencent" status = "ok" if (tr_title.text and tr_body) else "partial" except Exception as e: logger.exception("translate article %s failed: %s", article_id, e) async with AsyncSessionLocal() as session: art = ( await session.execute(select(Article).where(Article.id == article_id)) ).scalar_one_or_none() if art: art.translation_status = "failed" await session.commit() return # 写回 async with AsyncSessionLocal() as session: art = ( await session.execute(select(Article).where(Article.id == article_id_ref)) ).scalar_one_or_none() if art: art.title_zh = tr_title.text if tr_title.text else None art.body_zh_text = tr_body or None art.body_zh_html = _wrap_html(tr_body) if tr_body else None art.translation_status = status art.translation_engine = engine_label art.translation_chars = total_chars art.translated_at = datetime.now(timezone.utc) await session.commit() logger.info("article %s translated: %d chars, %s", article_id, total_chars, engine_label) def _chunk_text(text: str, max_chars: int) -> list[str]: if not text: return [] paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()] chunks: list[str] = [] cur = "" for p in paragraphs: if len(p) > max_chars: # 单段过长:按句号切 sentences = _split_long_para(p, max_chars) for s in sentences: if len(cur) + len(s) + 2 > max_chars: if cur: chunks.append(cur) cur = s else: cur = (cur + "\n\n" + s).strip() if cur else s else: if len(cur) + len(p) + 2 > max_chars: if cur: chunks.append(cur) cur = p else: cur = (cur + "\n\n" + p).strip() if cur else p if cur: chunks.append(cur) return chunks def _split_long_para(para: str, max_chars: int) -> list[str]: parts: list[str] = [] cur = "" for ch in para: cur += ch if ch in ".!?。!?" and len(cur) >= max_chars // 2: parts.append(cur.strip()) cur = "" if cur.strip(): parts.append(cur.strip()) if not parts: return [para[:max_chars]] return parts def _wrap_html(text: str) -> str: """把译文包成 HTML 段落。""" from bs4 import BeautifulSoup parts = [f"

{p.strip()}

" for p in text.split("\n\n") if p.strip()] return "\n".join(parts) if parts else "" # === 全量跑(供测试 / 手动触发) === async def run_once() -> None: async with AsyncSessionLocal() as session: rows = (await session.execute(select(Source).where(Source.enabled.is_(True)))).scalars() sources = list(rows) logger.info("run_once: %d enabled sources", len(sources)) tasks = [fetch_one_source(s.id) for s in sources] await asyncio.gather(*tasks, return_exceptions=True)