"""讯飞星火 Spark Lite 翻译后端(WebSocket 协议,用户 6/11 要求改回 wss)。 - 端点:wss://spark-api.xf-yun.com/v1.1/chat - 模型:Spark Lite(v1.1 通用轻量版,限时免费) - 鉴权:URL QueryString 带 authorization(HMAC-SHA256 签名) - 需要 APPID + APIKey + APISecret - 签名算法见 _build_auth_url() 设计上独立于 LlmClient(不在通用 OpenAI 协议内), 鉴权 URL 每次调用前重算(因为 date 是当前时间)。 """ from __future__ import annotations import asyncio import base64 import hashlib import hmac import json import logging import random from datetime import datetime from urllib.parse import urlencode import websockets from app.config import settings from app.services.translation.base import BaseTranslator, TranslationResult logger = logging.getLogger("news.translate.spark") # 讯飞 v1.1 域 _SPARK_HOST = "spark-api.xf-yun.com" _SPARK_PATH = "/v1.1/chat" # 讯飞做翻译的 system / user prompt 包装 _SYSTEM_PROMPT = ( "你是一个翻译助手。请将用户输入的英文或日文文本翻译成简体中文。" "严格遵守:不要输出分析、不要输出注释、不要添加任何包裹文字,只输出译文本身。" ) def _build_auth_url(api_key: str, api_secret: str) -> str: """构造带鉴权 query 的 WebSocket URL。 算法来自讯飞开放平台官方文档: 1) date = 当前 GMT 时间 (RFC 1123 格式) 2) signature_origin = "host: {host}\\ndate: {date}\\n GET {path} HTTP/1.1" 3) signature_sha = HMAC-SHA256(api_secret, signature_origin) 4) signature = base64(signature_sha) 5) authorization_origin = "api_key=\\"{api_key}\\", algorithm=\\"hmac-sha256\\", " "headers=\\"host date request-line\\", signature=\\"{signature}\\"" 6) authorization = base64(authorization_origin) 7) 最终 URL: wss://{host}{path}?host={host}&date={urlencoded_date}&authorization={urlencoded_authorization} """ now = datetime.utcnow() date = now.strftime("%a, %d %b %Y %H:%M:%S GMT") signature_origin = f"host: {_SPARK_HOST}\ndate: {date}\n GET {_SPARK_PATH} HTTP/1.1" signature_sha = hmac.new( api_secret.encode("utf-8"), signature_origin.encode("utf-8"), digestmod=hashlib.sha256, ).digest() signature = base64.b64encode(signature_sha).decode("utf-8") authorization_origin = ( f'api_key="{api_key}", algorithm="hmac-sha256", ' f'headers="host date request-line", signature="{signature}"' ) authorization = base64.b64encode(authorization_origin.encode("utf-8")).decode("utf-8") # 注意 date 里带冒号 / 逗号,必须 url-encode return f"wss://{_SPARK_HOST}{_SPARK_PATH}?{urlencode({'host': _SPARK_HOST, 'date': date, 'authorization': authorization})}" class SparkTranslator(BaseTranslator): """讯飞星火 Spark Lite 翻译后端(WebSocket)。""" name = "spark" def __init__(self): if not settings.spark_appid or not settings.spark_api_key or not settings.spark_api_secret: raise RuntimeError("讯飞星火 APPID / APIKey / APISecret 未配置(需要 WSS 鉴权三件套)") self.appid = settings.spark_appid self.api_key = settings.spark_api_key self.api_secret = settings.spark_api_secret self.domain = settings.spark_domain self.interval_sec = settings.spark_interval_sec def is_configured(self) -> bool: return bool(self.appid and self.api_key and self.api_secret) async def translate( self, text: str, source: str = "auto", target: str = "zh" ) -> TranslationResult: if not text.strip(): return TranslationResult(text=text, engine=self.name, chars=0) if not self.is_configured(): raise RuntimeError("讯飞星火未配置(APPID/APIKey/APISecret)") # 长度截断:Spark Lite 单轮 8K context,留余量 max_input = 4000 truncated = text[:max_input] if len(text) > max_input else text # 构造请求体 payload = { "header": {"app_id": self.appid, "uid": "translator"}, "parameter": { "chat": { "domain": self.domain, "temperature": 0.1, # 翻译低温度 "max_tokens": 4096, } }, "payload": { "message": { "text": [ {"role": "system", "content": _SYSTEM_PROMPT}, {"role": "user", "content": truncated}, ] } }, } # 鉴权 URL(每次重算) auth_url = _build_auth_url(self.api_key, self.api_secret) # 简单重试 1 次 last_exc: Exception | None = None for attempt in range(2): try: content = await self._send_once(auth_url, payload) # 节流(避免被限流 — 2 秒/次) await asyncio.sleep(self.interval_sec) return TranslationResult( text=content, engine=self.name, chars=len(text), cached=False ) except Exception as e: last_exc = e logger.warning("spark attempt %s failed: %s", attempt, e) if attempt == 0: await asyncio.sleep(0.5 + random.random()) else: raise assert last_exc is not None raise last_exc async def _send_once(self, auth_url: str, payload: dict) -> str: """单次 WebSocket 调用,聚合流式响应,返回完整文本。""" # websockets 13+ 异步上下文(关闭旧版 serve / connect 双 API 模糊) async with websockets.connect(auth_url, ping_interval=None) as ws: await ws.send(json.dumps(payload, ensure_ascii=False)) collected: list[str] = [] while True: raw = await ws.recv() data = json.loads(raw) # 错误响应(header.code != 0) header = data.get("header", {}) code = header.get("code", 0) if code != 0: msg = header.get("message", "") raise RuntimeError(f"Spark 错误 {code}: {msg}") # 流式增量 choices = data.get("payload", {}).get("choices", {}) status = choices.get("status", 0) text_parts = choices.get("text", []) if text_parts: for t in text_parts: content = t.get("content", "") if content: collected.append(content) # status=2 表示结束 if status == 2: break full = "".join(collected).strip() if not full: raise RuntimeError("Spark 返回空 content") return full