配 zhipu.py(zhipu 模块)一起使用: - service.py: 新增 _zhipu_translator(),引擎选择链路 spark → zhipu → tencent → maas → agnes → local - config.py: 新增 zhipu_api_key / zhipu_base_url / zhipu_model / zhipu_interval_sec - .env.example: 补 ZHIPU_* 字段说明 留空 zhipu_api_key = spark 不可用时直接降级 tencent(向后兼容)
261 lines
11 KiB
Python
261 lines
11 KiB
Python
"""翻译服务门面:配额检查 + 缓存 + 引擎选择 + 月度计数。
|
|
|
|
引擎链路(优先级降序):
|
|
1. spark(主,Lite 免费;spark_api_password 配了才用)
|
|
2. zhipu(第二序位,GLM-4-Flash 免费;zhipu_api_key 配了才用)
|
|
3. tencent TMT(第三级,按月配额;快满时主动切走)
|
|
4. tencent_maas(备用,OpenAI 兼容,无配额;主失败/TMT 配额耗尽时启用)
|
|
5. agnes(第四级,通用 LLM 做翻译;MaaS 不可用时启用 — 质量次之但够用)
|
|
6. local(最后兜底,需 settings.local_translate_enabled=true)
|
|
|
|
注:
|
|
- TMT 是按月计费的(腾讯云后台可能计费口径是请求字节,我们 redis 累加的是字符数,
|
|
差异约 2-3x);用户从腾讯云后台看"已用 2M"时,我们 redis 显示约 80 万字符
|
|
- 用户决策:以腾讯云后台数字为准,快满时降级
|
|
- spark / zhipu 都是免费模型,默认优先;不可用时降级到 tencent(继续吃配额)。
|
|
想要完全绕开 tencent,把 TENCENTCLOUD_SECRET_ID 留空即可。
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Protocol
|
|
|
|
from app.config import settings
|
|
from app.redis_client import get_redis
|
|
from app.services.translation.agnes import AgnesTranslator
|
|
from app.services.translation.base import BaseTranslator, TranslationResult
|
|
from app.services.translation.local import LocalTranslator
|
|
from app.services.translation.spark import SparkTranslator
|
|
from app.services.translation.tencent import TencentTranslator
|
|
from app.services.translation.tencent_maas import TencentMaaSTranslator
|
|
from app.services.translation.zhipu import ZhipuTranslator
|
|
|
|
logger = logging.getLogger("news.translate.service")
|
|
|
|
|
|
# 缓存 key
|
|
def _cache_key(text: str, src: str, tgt: str) -> str:
|
|
h = hashlib.sha1(f"{src}|{tgt}|{text}".encode()).hexdigest()
|
|
return f"translation:cache:{h}"
|
|
|
|
|
|
def _month_key() -> str:
|
|
now = datetime.now(timezone.utc)
|
|
return f"translation:month:{now:%Y%m}"
|
|
|
|
|
|
class TranslationService:
|
|
def __init__(self):
|
|
self._spark: BaseTranslator | None = None
|
|
self._zhipu: BaseTranslator | None = None
|
|
self._tencent: BaseTranslator | None = None
|
|
self._tencent_maas: BaseTranslator | None = None
|
|
self._agnes: BaseTranslator | None = None
|
|
self._local: BaseTranslator | None = None
|
|
# 串行:1 个并发;避免触发腾讯 TMT 限速
|
|
self._sem = asyncio.Semaphore(1)
|
|
|
|
def _spark_translator(self) -> BaseTranslator | None:
|
|
"""主引擎:星火 Spark(Lite 免费)。配了 spark_api_password 才启用。"""
|
|
if self._spark is None and settings.spark_api_password:
|
|
try:
|
|
self._spark = SparkTranslator()
|
|
except Exception as e:
|
|
logger.warning("spark init failed: %s", e)
|
|
self._spark = None
|
|
return self._spark
|
|
|
|
def _zhipu_translator(self) -> BaseTranslator | None:
|
|
"""第二序位引擎:智谱 GLM(免费)。配了 zhipu_api_key 才启用。"""
|
|
if self._zhipu is None and settings.zhipu_api_key:
|
|
try:
|
|
self._zhipu = ZhipuTranslator()
|
|
except Exception as e:
|
|
logger.warning("zhipu init failed: %s", e)
|
|
self._zhipu = None
|
|
return self._zhipu
|
|
|
|
def _primary(self) -> BaseTranslator | None:
|
|
"""第三级:腾讯 TMT(初始化失败返回 None 表示不可用)。"""
|
|
if self._tencent is None:
|
|
try:
|
|
self._tencent = TencentTranslator()
|
|
except Exception as e:
|
|
logger.warning("tencent TMT init failed: %s", e)
|
|
self._tencent = None
|
|
return self._tencent
|
|
|
|
def _maas(self) -> BaseTranslator | None:
|
|
"""备用引擎:腾讯 MaaS(OpenAI 兼容,无配额)。"""
|
|
if self._tencent_maas is None and settings.tencent_maas_api_key:
|
|
try:
|
|
self._tencent_maas = TencentMaaSTranslator()
|
|
except Exception as e:
|
|
logger.warning("tencent MaaS init failed: %s", e)
|
|
self._tencent_maas = None
|
|
return self._tencent_maas
|
|
|
|
def _agnes(self) -> BaseTranslator | None:
|
|
"""第三级:Agnes 通用 LLM 翻译(在 MaaS 不可用时启用)。
|
|
质量比 TMT/MaaS u2 差,但通用 LLM 也能翻,够用。
|
|
"""
|
|
if self._agnes is None and settings.agnes_api_key:
|
|
try:
|
|
self._agnes = AgnesTranslator()
|
|
except Exception as e:
|
|
logger.warning("Agnes init failed: %s", e)
|
|
self._agnes = None
|
|
return self._agnes
|
|
|
|
def _local_translator(self) -> BaseTranslator | None:
|
|
"""最后兜底:本地模型(需开关)。"""
|
|
if self._local is None and settings.local_translate_enabled:
|
|
try:
|
|
self._local = LocalTranslator()
|
|
except Exception as e:
|
|
logger.warning("local translator init failed: %s", e)
|
|
self._local = None
|
|
return self._local
|
|
|
|
# 兼容旧调用点:返回第一个可用的 fallback(优先 maas,次 agnes,再 local)
|
|
def _fallback(self) -> BaseTranslator | None:
|
|
for getter in (self._maas, self._agnes, self._local_translator):
|
|
f = getter()
|
|
if f is not None:
|
|
return f
|
|
return None
|
|
|
|
async def can_use_tencent(self, chars: int) -> bool:
|
|
if not settings.tencentcloud_secret_id:
|
|
return False
|
|
r = get_redis()
|
|
used = int(await r.get(_month_key()) or 0)
|
|
buffered = int(
|
|
settings.tencent_tmt_quota_month * (1 - settings.tencent_tmt_quota_buffer)
|
|
)
|
|
return (used + chars) <= buffered
|
|
|
|
async def add_usage(self, chars: int) -> None:
|
|
r = get_redis()
|
|
key = _month_key()
|
|
now = datetime.now(timezone.utc)
|
|
# 下个月第一天
|
|
if now.month == 12:
|
|
next_month = now.replace(year=now.year + 1, month=1, day=1)
|
|
else:
|
|
next_month = now.replace(month=now.month + 1, day=1, hour=0, minute=0, second=0, microsecond=0)
|
|
ttl = max(60, int((next_month - now).total_seconds()) + 86400) # +1 天 buffer
|
|
async with r.pipeline(transaction=False) as pipe:
|
|
pipe.incrby(key, chars)
|
|
pipe.expire(key, ttl)
|
|
await pipe.execute()
|
|
|
|
async def translate(
|
|
self, text: str, source: str = "auto", target: str = "zh"
|
|
) -> TranslationResult:
|
|
if not text.strip():
|
|
return TranslationResult(text=text, engine="skip", chars=0)
|
|
|
|
chars = len(text)
|
|
# 1) 缓存
|
|
r = get_redis()
|
|
ck = _cache_key(text, source, target)
|
|
cached = await r.get(ck)
|
|
if cached is not None:
|
|
return TranslationResult(text=cached, engine="cache", chars=chars, cached=True)
|
|
|
|
# 2) 选引擎
|
|
# 优先级:spark → zhipu → tencent(配额)→ maas → agnes → local
|
|
engine: BaseTranslator | None = None
|
|
if self._spark_translator() is not None:
|
|
engine = self._spark_translator()
|
|
elif self._zhipu_translator() is not None:
|
|
engine = self._zhipu_translator()
|
|
elif await self.can_use_tencent(chars):
|
|
engine = self._primary()
|
|
if engine is None:
|
|
logger.warning("TMT unavailable, falling back to MaaS")
|
|
engine = self._maas()
|
|
else:
|
|
engine = None
|
|
|
|
if engine is None:
|
|
# spark / zhipu 不可用 + 配额耗尽 / TMT 不可用:走备用链(maas → agnes → local)
|
|
engine = self._fallback()
|
|
if engine is None:
|
|
# 全无可用:返回原文 + 标记
|
|
return TranslationResult(
|
|
text=text + "\n\n[本条未翻译:所有翻译通道不可用]",
|
|
engine="skip",
|
|
chars=chars,
|
|
)
|
|
logger.info(
|
|
"primary engines unavailable, fallback to %s for %d chars",
|
|
engine.name, chars,
|
|
)
|
|
|
|
# 3) 调用(失败时降级)
|
|
async with self._sem:
|
|
res: TranslationResult | None = None
|
|
try:
|
|
res = await engine.translate(text, source=source, target=target)
|
|
except Exception as e:
|
|
logger.exception("translate failed with %s: %s", engine.name, e)
|
|
# 失败时按 zhipu → tencent → maas → local 顺序找一个不同的 fallback
|
|
# spark / zhipu 失败时也要走 tencent(继续吃配额,因优先级只是降低不是禁用)
|
|
fb: BaseTranslator | None = None
|
|
if engine.name == "spark":
|
|
if self._zhipu_translator() is not None:
|
|
fb = self._zhipu_translator()
|
|
if fb is None and await self.can_use_tencent(chars):
|
|
fb = self._primary()
|
|
if fb is None:
|
|
fb = self._maas() if engine.name != "tencent_maas" else None
|
|
elif engine.name == "zhipu":
|
|
if await self.can_use_tencent(chars):
|
|
fb = self._primary()
|
|
if fb is None:
|
|
fb = self._maas() if engine.name != "tencent_maas" else None
|
|
elif engine.name == "tencent":
|
|
fb = self._maas() if engine.name != "tencent_maas" else None
|
|
if fb is None and settings.local_translate_enabled and engine.name != "local":
|
|
fb = self._local_translator()
|
|
if fb is not None:
|
|
try:
|
|
res = await fb.translate(text, source=source, target=target)
|
|
logger.info("fallback %s succeeded after %s failed", fb.name, engine.name)
|
|
except Exception as e2:
|
|
logger.exception("fallback %s also failed: %s", fb.name, e2)
|
|
res = None
|
|
if res is None:
|
|
raise RuntimeError(f"translation failed for {chars} chars (engine={engine.name})")
|
|
|
|
# 4) 写缓存 — 只缓存真实翻译结果;失败/降级文本不缓存(避免污染 30 天)
|
|
if res.engine in ("spark", "zhipu", "tencent", "tencent_maas", "agnes", "nllb") and not res.cached:
|
|
if "[翻译失败" not in res.text and "[本条未翻译" not in res.text:
|
|
try:
|
|
await r.set(ck, res.text, ex=60 * 60 * 24 * 30) # 30 天
|
|
except Exception:
|
|
pass
|
|
|
|
# 5) 计数(只在 tencent TMT 上计;spark / maas / agnes / local 都不计腾讯云配额)
|
|
if res.engine == "tencent":
|
|
try:
|
|
await self.add_usage(res.chars or chars)
|
|
except Exception as e:
|
|
logger.warning("add_usage failed: %s", e)
|
|
|
|
return res
|
|
|
|
|
|
# 全局单例
|
|
service = TranslationService()
|
|
|
|
|
|
# 让后端 worker 直接调
|
|
class _Protocol(Protocol):
|
|
async def translate(self, text: str, source: str = "auto", target: str = "zh") -> TranslationResult: ...
|