Files
diary-news/backend/app/services/translation/service.py
Mavis 921e674a30 feat(translate): 加 Agnes 翻译 fallback,buffer 改 0.5
腾讯 TMT 月度配额快满时(腾讯后台口径已用 2M/5M),翻译降级链:

1. tencent TMT(主,按月配额)
2. tencent MaaS u2(第二级,翻译专用,无配额)
3. agnes 通用 LLM(第三级,质量次之但够用)
4. local NLLB(最后兜底)

新增 backend/app/services/translation/agnes.py: AgnesTranslator
复用 LlmClient 做限速 + 重试,系统 prompt 强约束只输出译文,
去除 "以下是翻译" 等常见 LLM 翻译前缀。

service.py 改动:
- fallback 链 maas -> agnes -> local
- cache 接受 agnes 结果(30天)
- add_usage 只算 tencent TMT

buffer 调整: TENCENT_TMT_QUOTA_BUFFER 0.05 -> 0.5
腾讯云后台按请求字节计费,与我们 redis 字符累加口径差约 2.5x;
按腾讯后台口径 redis 累加到 1M 字符即触发降级(对应腾讯约 2.5M 字节 =50% 用量),
留足 buffer,避免月底真爆。
2026-06-10 17:44:47 +08:00

218 lines
8.7 KiB
Python

"""翻译服务门面:配额检查 + 缓存 + 引擎选择 + 月度计数。
引擎链路(优先级降序):
1. tencent TMT(主,按月配额;快满时主动切走)
2. tencent_maas(备用,OpenAI 兼容,无配额;主失败/TMT 配额耗尽时启用)
3. agnes(第三级,通用 LLM 做翻译;MaaS 不可用时启用 — 质量次之但够用)
4. local(最后兜底,需 settings.local_translate_enabled=true)
注:
- TMT 是按月计费的(腾讯云后台可能计费口径是请求字节,我们 redis 累加的是字符数,
差异约 2-3x);用户从腾讯云后台看"已用 2M"时,我们 redis 显示约 80 万字符
- 用户决策:以腾讯云后台数字为准,快满时降级
"""
from __future__ import annotations
import asyncio
import hashlib
import logging
from datetime import datetime, timezone
from typing import Protocol
from app.config import settings
from app.redis_client import get_redis
from app.services.translation.agnes import AgnesTranslator
from app.services.translation.base import BaseTranslator, TranslationResult
from app.services.translation.local import LocalTranslator
from app.services.translation.tencent import TencentTranslator
from app.services.translation.tencent_maas import TencentMaaSTranslator
logger = logging.getLogger("news.translate.service")
# 缓存 key
def _cache_key(text: str, src: str, tgt: str) -> str:
h = hashlib.sha1(f"{src}|{tgt}|{text}".encode()).hexdigest()
return f"translation:cache:{h}"
def _month_key() -> str:
now = datetime.now(timezone.utc)
return f"translation:month:{now:%Y%m}"
class TranslationService:
def __init__(self):
self._tencent: BaseTranslator | None = None
self._tencent_maas: BaseTranslator | None = None
self._agnes: BaseTranslator | None = None
self._local: BaseTranslator | None = None
# 串行:1 个并发;避免触发腾讯 TMT 限速
self._sem = asyncio.Semaphore(1)
def _primary(self) -> BaseTranslator | None:
"""主引擎:腾讯 TMT(初始化失败返回 None 表示不可用)。"""
if self._tencent is None:
try:
self._tencent = TencentTranslator()
except Exception as e:
logger.warning("tencent TMT init failed: %s", e)
self._tencent = None
return self._tencent
def _maas(self) -> BaseTranslator | None:
"""备用引擎:腾讯 MaaS(OpenAI 兼容,无配额)。"""
if self._tencent_maas is None and settings.tencent_maas_api_key:
try:
self._tencent_maas = TencentMaaSTranslator()
except Exception as e:
logger.warning("tencent MaaS init failed: %s", e)
self._tencent_maas = None
return self._tencent_maas
def _agnes(self) -> BaseTranslator | None:
"""第三级:Agnes 通用 LLM 翻译(在 MaaS 不可用时启用)。
质量比 TMT/MaaS u2 差,但通用 LLM 也能翻,够用。
"""
if self._agnes is None and settings.agnes_api_key:
try:
self._agnes = AgnesTranslator()
except Exception as e:
logger.warning("Agnes init failed: %s", e)
self._agnes = None
return self._agnes
def _local_translator(self) -> BaseTranslator | None:
"""最后兜底:本地模型(需开关)。"""
if self._local is None and settings.local_translate_enabled:
try:
self._local = LocalTranslator()
except Exception as e:
logger.warning("local translator init failed: %s", e)
self._local = None
return self._local
# 兼容旧调用点:返回第一个可用的 fallback(优先 maas,次 agnes,再 local)
def _fallback(self) -> BaseTranslator | None:
for getter in (self._maas, self._agnes, self._local_translator):
f = getter()
if f is not None:
return f
return None
async def can_use_tencent(self, chars: int) -> bool:
if not settings.tencentcloud_secret_id:
return False
r = get_redis()
used = int(await r.get(_month_key()) or 0)
buffered = int(
settings.tencent_tmt_quota_month * (1 - settings.tencent_tmt_quota_buffer)
)
return (used + chars) <= buffered
async def add_usage(self, chars: int) -> None:
r = get_redis()
key = _month_key()
now = datetime.now(timezone.utc)
# 下个月第一天
if now.month == 12:
next_month = now.replace(year=now.year + 1, month=1, day=1)
else:
next_month = now.replace(month=now.month + 1, day=1, hour=0, minute=0, second=0, microsecond=0)
ttl = max(60, int((next_month - now).total_seconds()) + 86400) # +1 天 buffer
async with r.pipeline(transaction=False) as pipe:
pipe.incrby(key, chars)
pipe.expire(key, ttl)
await pipe.execute()
async def translate(
self, text: str, source: str = "auto", target: str = "zh"
) -> TranslationResult:
if not text.strip():
return TranslationResult(text=text, engine="skip", chars=0)
chars = len(text)
# 1) 缓存
r = get_redis()
ck = _cache_key(text, source, target)
cached = await r.get(ck)
if cached is not None:
return TranslationResult(text=cached, engine="cache", chars=chars, cached=True)
# 2) 选引擎(主 → maas 备用 → local 兜底)
use_tencent = await self.can_use_tencent(chars)
if use_tencent:
engine: BaseTranslator | None = self._primary()
if engine is None:
# TMT 配了 key 但初始化失败 → 直接走 maas
logger.warning("TMT unavailable, falling back to MaaS")
engine = self._maas()
else:
engine = None
if engine is None:
# 配额耗尽 / TMT 不可用:走备用链(maas → agnes → local)
engine = self._fallback()
if engine is None:
# 全无可用:返回原文 + 标记
return TranslationResult(
text=text + "\n\n[本条未翻译:所有翻译通道不可用]",
engine="skip",
chars=chars,
)
logger.info(
"tencent quota exhausted, fallback to %s for %d chars",
engine.name, chars,
)
# 3) 调用(失败时降级)
async with self._sem:
res: TranslationResult | None = None
try:
res = await engine.translate(text, source=source, target=target)
except Exception as e:
logger.exception("translate failed with %s: %s", engine.name, e)
# 按 maas → local 顺序找一个不同的 fallback
fb = self._maas() if engine.name != "tencent_maas" else None
if fb is None and settings.local_translate_enabled and engine.name != "local":
fb = self._local_translator()
if fb is not None:
try:
res = await fb.translate(text, source=source, target=target)
logger.info("fallback %s succeeded after %s failed", fb.name, engine.name)
except Exception as e2:
logger.exception("fallback %s also failed: %s", fb.name, e2)
res = None
if res is None:
raise RuntimeError(f"translation failed for {chars} chars (engine={engine.name})")
# 注:engine 已经设好但运行时降级需要重新判断 fallback 链
# 上面 translate() 调用失败时,会重试 _fallback() 里下一个可用引擎
# 这里 engine 已经在 _fallback() 中按顺序选了一个最合适的,直接使用即可
# 4) 写缓存 — 只缓存真实翻译结果;失败/降级文本不缓存(避免污染 30 天)
if res.engine in ("tencent", "tencent_maas", "agnes", "nllb") and not res.cached:
if "[翻译失败" not in res.text and "[本条未翻译" not in res.text:
try:
await r.set(ck, res.text, ex=60 * 60 * 24 * 30) # 30 天
except Exception:
pass
# 5) 计数(只在 tencent TMT 上计;maas / agnes / local 都不计腾讯云配额)
if res.engine == "tencent":
try:
await self.add_usage(res.chars or chars)
except Exception as e:
logger.warning("add_usage failed: %s", e)
return res
# 全局单例
service = TranslationService()
# 让后端 worker 直接调
class _Protocol(Protocol):
async def translate(self, text: str, source: str = "auto", target: str = "zh") -> TranslationResult: ...