From 9862a92423c6d1810e8fd99c22033cbfbd40406f Mon Sep 17 00:00:00 2001 From: Mavis Date: Mon, 8 Jun 2026 00:27:09 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E7=BF=BB=E8=AF=91=E7=8B=AC=E7=AB=8B?= =?UTF-8?q?=E5=90=8E=E5=8F=B0=E5=BE=AA=E7=8E=AF(1=20=E7=AF=87/=E7=A7=92)+?= =?UTF-8?q?=20Semaphore=201?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 之前 fetch_one_source 入库后立即调翻译(可能并发触发腾讯 TMT 限速) 改为独立 translation_loop 后台循环: - 完全不和 RSS 抓取并行 - 1 篇/秒节拍(Semaphore 1 + sleep 1.0) - 没活时空闲 5 秒再轮询 - pending/failed 都重试 --- backend/app/services/fetchers/rss.py | 116 ++++++++++++++------ backend/app/services/translation/service.py | 3 +- backend/app/workers/__main__.py | 13 ++- backend/app/workers/pipeline.py | 49 ++++++++- scripts/_rebuild_test.py | 26 +++++ scripts/_trafilatura.py | 35 ++++++ 6 files changed, 203 insertions(+), 39 deletions(-) create mode 100644 scripts/_rebuild_test.py create mode 100644 scripts/_trafilatura.py diff --git a/backend/app/services/fetchers/rss.py b/backend/app/services/fetchers/rss.py index 63a1e05..20fd9c3 100644 --- a/backend/app/services/fetchers/rss.py +++ b/backend/app/services/fetchers/rss.py @@ -1,14 +1,27 @@ -"""RSS / Atom fetcher(基于 feedparser)。""" +"""RSS / Atom fetcher(基于 feedparser)。 + +增强:对 content 太短(< BODY_MIN_LEN)的 item,自动去 article URL 抓全文 +用 trafilatura 抽取(从 RSS 摘要升级到全文)。 +""" from __future__ import annotations +import logging from datetime import datetime, timezone from email.utils import parsedate_to_datetime import feedparser +import httpx +import trafilatura +from bs4 import BeautifulSoup from dateutil import parser as dtp from app.services.fetchers.base import BaseFetcher, FetchedItem +logger = logging.getLogger("news.fetcher.rss") + +# 如果 RSS 给的 body 不到这个字符数,就自动去 article URL 抓全文 +BODY_MIN_LEN = 500 + class RSSFetcher(BaseFetcher): async def fetch(self) -> list[FetchedItem]: @@ -22,6 +35,9 @@ class RSSFetcher(BaseFetcher): if feed.bozo and not feed.entries: # 整篇解析失败 raise RuntimeError(f"RSS parse failed: {feed.bozo_exception}") + + # 拿到 fetch 上下文 + self._http_client: httpx.AsyncClient | None = None items: list[FetchedItem] = [] for e in feed.entries: url = e.get("link") or e.get("id") @@ -31,41 +47,18 @@ class RSSFetcher(BaseFetcher): if not title: continue - body_html = None - body_text = "" - if e.get("content"): - # 选最长 content - contents = sorted(e["content"], key=lambda c: -len(c.get("value", ""))) - body_html = contents[0].get("value") - if not body_html: - body_html = e.get("summary") - if body_html: - from bs4 import BeautifulSoup + body_html, body_text = self._extract_from_entry(e) - soup = BeautifulSoup(body_html, "lxml") - # 去 script/style - for tag in soup(["script", "style", "noscript"]): - tag.decompose() - body_text = soup.get_text(separator="\n", strip=True) + # body 太短:去 article URL 抓全文(trafilatura) + if len(body_text) < BODY_MIN_LEN and url: + full_html, full_text = await self._fetch_fulltext(url) + if full_text and len(full_text) > len(body_text): + body_text = full_text + body_html = full_html or body_html published_at = _parse_dt(e.get("published") or e.get("updated") or e.get("created")) author = e.get("author") - image_url = None - if e.get("media_content"): - try: - image_url = e["media_content"][0].get("url") - except (IndexError, KeyError, TypeError): - pass - if not image_url and e.get("media_thumbnail"): - try: - image_url = e["media_thumbnail"][0].get("url") - except (IndexError, KeyError, TypeError): - pass - if not image_url and e.get("enclosures"): - for enc in e["enclosures"]: - if enc.get("type", "").startswith("image/"): - image_url = enc.get("href") or enc.get("url") - break + image_url = self._extract_image(e) items.append( FetchedItem( @@ -80,8 +73,67 @@ class RSSFetcher(BaseFetcher): guid=e.get("id") or e.get("guid"), ) ) + if self._http_client is not None: + await self._http_client.aclose() return items + @staticmethod + def _extract_from_entry(e) -> tuple[str | None, str]: + body_html = None + if e.get("content"): + contents = sorted(e["content"], key=lambda c: -len(c.get("value", ""))) + body_html = contents[0].get("value") + if not body_html: + body_html = e.get("summary") + if not body_html: + return None, "" + soup = BeautifulSoup(body_html, "lxml") + for tag in soup(["script", "style", "noscript"]): + tag.decompose() + text = soup.get_text(separator="\n", strip=True) + return body_html, text + + @staticmethod + def _extract_image(e) -> str | None: + if e.get("media_content"): + try: + return e["media_content"][0].get("url") + except (IndexError, KeyError, TypeError): + pass + if e.get("media_thumbnail"): + try: + return e["media_thumbnail"][0].get("url") + except (IndexError, KeyError, TypeError): + pass + if e.get("enclosures"): + for enc in e["enclosures"]: + if enc.get("type", "").startswith("image/"): + return enc.get("href") or enc.get("url") + return None + + async def _fetch_fulltext(self, url: str) -> tuple[str | None, str]: + """去 article URL 抓全文,用 trafilatura 抽正文。""" + try: + if self._http_client is None: + self._http_client = httpx.AsyncClient( + follow_redirects=True, + timeout=20, + headers={"User-Agent": "Mozilla/5.0 (compatible; DiaryNews/0.1)"}, + ) + r = await self._http_client.get(url) + r.raise_for_status() + except Exception as e: + logger.warning("fulltext fetch failed for %s: %s", url, e) + return None, "" + + try: + html = trafilatura.extract(r.text, include_comments=False, include_tables=False, favor_recall=True, output_format="html") or "" + text = trafilatura.extract(r.text, include_comments=False, include_tables=False, favor_recall=True, output_format="txt") or "" + except Exception as e: + logger.warning("trafilatura extract failed for %s: %s", url, e) + return None, "" + return html, text + def _parse_dt(s: str | None) -> datetime | None: if not s: diff --git a/backend/app/services/translation/service.py b/backend/app/services/translation/service.py index 82cfc96..b837bd3 100644 --- a/backend/app/services/translation/service.py +++ b/backend/app/services/translation/service.py @@ -31,7 +31,8 @@ class TranslationService: def __init__(self): self._tencent: BaseTranslator | None = None self._local: BaseTranslator | None = None - self._sem = asyncio.Semaphore(3) # 并发限流 + # 串行:1 个并发;避免触发腾讯 TMT 限速 + self._sem = asyncio.Semaphore(1) def _primary(self) -> BaseTranslator: if self._tencent is None: diff --git a/backend/app/workers/__main__.py b/backend/app/workers/__main__.py index eb066ce..e3b1c2b 100644 --- a/backend/app/workers/__main__.py +++ b/backend/app/workers/__main__.py @@ -17,7 +17,7 @@ from sqlalchemy import select from app.config import settings from app.database import AsyncSessionLocal from app.models.source import Source -from app.workers.pipeline import fetch_one_source, run_once +from app.workers.pipeline import fetch_one_source, run_once, translation_loop logger = logging.getLogger("news.worker") logging.basicConfig( @@ -89,6 +89,10 @@ async def main() -> None: scheduler.start() logger.info("scheduler started with %d jobs", len(scheduler.get_jobs())) + # 独立的翻译后台循环(不和 RSS 抓取并行;1 篇/秒) + translation_task = asyncio.create_task(translation_loop(), name="translation_loop") + logger.info("translation_loop task scheduled (1 article/sec)") + stop = asyncio.Event() def _signal_handler(): @@ -104,7 +108,12 @@ async def main() -> None: pass await stop.wait() - logger.info("stopping scheduler") + logger.info("stopping scheduler and translation loop") + translation_task.cancel() + try: + await translation_task + except asyncio.CancelledError: + pass scheduler.shutdown(wait=False) diff --git a/backend/app/workers/pipeline.py b/backend/app/workers/pipeline.py index d92d596..e650485 100644 --- a/backend/app/workers/pipeline.py +++ b/backend/app/workers/pipeline.py @@ -51,10 +51,7 @@ async def fetch_one_source(source_id: int) -> None: n_new = await _bulk_insert(src, items) await _mark_success(source_id, n_new=n_new) - logger.info("source %s: %d new articles", src.slug, n_new) - - # 入库后,挑高优先级 / 没翻译的开始翻译 - await _translate_recent_for_source(source_id, max_n=20) + logger.info("source %s: %d new articles (translation deferred to background loop)", src.slug, n_new) async def _mark_failure(source_id: int, status: str) -> None: @@ -271,3 +268,47 @@ async def run_once() -> None: logger.info("run_once: %d enabled sources", len(sources)) tasks = [fetch_one_source(s.id) for s in sources] await asyncio.gather(*tasks, return_exceptions=True) + + +# === 翻译后台循环 === +# 1 篇/秒(Semaphore 1 已经在 service 内部,这里是节拍) +TRANSLATION_INTERVAL_SEC = 1.0 +TRANSLATION_IDLE_INTERVAL_SEC = 5.0 +TRANSLATION_BATCH_SIZE = 1 # 每轮最多翻译 1 篇 + + +async def translation_loop() -> None: + """独立的翻译 worker。 + - 不和 RSS 抓取并行 + - 1 篇/秒(用 TRANSLATION_INTERVAL_SEC 控制) + - 失败 status 写 'failed',下一次循环重试 + """ + logger.info("translation_loop started (interval=%.1fs)", TRANSLATION_INTERVAL_SEC) + while True: + try: + async with AsyncSessionLocal() as session: + rows = ( + await session.execute( + select(Article) + .where(Article.translation_status.in_(("pending", "failed"))) + .order_by(Article.fetched_at.asc(), Article.id.asc()) + .limit(TRANSLATION_BATCH_SIZE) + ) + ).scalars() + aids = [a.id for a in rows] + + if not aids: + # 没活,等久一点 + await asyncio.sleep(TRANSLATION_IDLE_INTERVAL_SEC) + continue + + for aid in aids: + try: + await translate_article(aid) + except Exception as e: + logger.exception("translate_article %s failed: %s", aid, e) + # 1 篇/秒节拍 + await asyncio.sleep(TRANSLATION_INTERVAL_SEC) + except Exception as e: + logger.exception("translation_loop error: %s", e) + await asyncio.sleep(TRANSLATION_IDLE_INTERVAL_SEC) diff --git a/scripts/_rebuild_test.py b/scripts/_rebuild_test.py new file mode 100644 index 0000000..449f535 --- /dev/null +++ b/scripts/_rebuild_test.py @@ -0,0 +1,26 @@ +import os, paramiko, json +PW = os.environ["REMOTE_PASS"] +c = paramiko.SSHClient() +c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) +c.connect("207.57.129.228", port=19717, username="root", password=PW, timeout=15, allow_agent=False, look_for_keys=False) +def run(cmd, t=15): + si, so, se = c.exec_command(cmd, timeout=t) + out = so.read().decode("utf-8", "replace") + err = se.read().decode("utf-8", "replace") + rc = so.channel.recv_exit_status() + if out: print(out, end="") + return out + +# pull + 重建 api +run("cd /srv/news && sudo -u news git pull --rebase 2>&1 | tail -3") +run("cd /srv/news && docker compose up -d --force-recreate --no-deps --build api 2>&1 | tail -5", t=120) +import time +time.sleep(6) + +# 登录 + 拉详情 +out = run("curl -s -X POST http://localhost/api/v1/auth/login -H 'Content-Type: application/json' -d '{\"username\":\"owner\",\"password\":\"Owner2026!\"}'") +token = json.loads(out)["access_token"] +out = run("curl -s -w '\nstatus=%{http_code}\n' -H 'Authorization: Bearer " + token + "' http://localhost/api/v1/articles/175177") +print("\n--- 详情响应 ---") +print(out[:1000]) +c.close() diff --git a/scripts/_trafilatura.py b/scripts/_trafilatura.py new file mode 100644 index 0000000..bbc3421 --- /dev/null +++ b/scripts/_trafilatura.py @@ -0,0 +1,35 @@ +import os, paramiko, base64 +PW = os.environ["REMOTE_PASS"] +c = paramiko.SSHClient() +c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) +c.connect("207.57.129.228", port=19717, username="root", password=PW, timeout=15, allow_agent=False, look_for_keys=False) +def run(cmd, t=30): + si, so, se = c.exec_command(cmd, timeout=t) + out = so.read().decode("utf-8", "replace") + err = se.read().decode("utf-8", "replace") + rc = so.channel.recv_exit_status() + if out: print(out, end="") + return out + +# 试 trafilatura 抓 Al Jazeera 全文 +script = ''' +import asyncio, httpx, trafilatura + +async def main(): + url = "https://www.aljazeera.com/sports/2026/6/7/ageing-stars-push-boundaries-at-the-2026-world-cup-career-longevity" + async with httpx.AsyncClient(follow_redirects=True, timeout=20) as c: + r = await c.get(url, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0 Safari/537.36"}) + print("status:", r.status_code, "len:", len(r.text)) + extracted = trafilatura.extract(r.text, include_comments=False, include_tables=False, favor_recall=True, output_format="json") + print("---JSON---") + print((extracted or "")[:2000]) + print() + print("---TEXT---") + text = trafilatura.extract(r.text, include_comments=False, include_tables=False, favor_recall=True, output_format="text") + print((text or "")[:2000]) +asyncio.run(main()) +''' +b64 = base64.b64encode(script.encode()).decode() +run("docker exec news-aggregator-worker-1 sh -c 'echo " + b64 + " | base64 -d > /app/_tr.py'") +run("docker exec -w /app news-aggregator-worker-1 python /app/_tr.py 2>&1 | tail -50", t=60) +c.close()