"""最简单的方式:把脚本内容写到容器内,再 docker exec 跑。""" import os, paramiko PW = os.environ["REMOTE_PASS"] c = paramiko.SSHClient() c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) c.connect("207.57.129.228", port=19717, username="root", password=PW, timeout=15, allow_agent=False, look_for_keys=False) def run(cmd, t=60): si, so, se = c.exec_command(cmd, timeout=t) out = so.read().decode("utf-8", "replace") err = se.read().decode("utf-8", "replace") rc = so.channel.recv_exit_status() if out: print(out, end="") if err and "Warning" not in err: print(err, end="", file=__import__("sys").stderr) return out rpw = run("grep ^REDIS_PASSWORD /srv/news/.env | cut -d= -f2").strip() # 1) 重置 run("docker exec news-aggregator-redis-1 redis-cli -a '" + rpw + "' DEL translation:month:202606 2>/dev/null") print("--- usage 重置 0 ---") # 2) 把脚本写到 server 本地 /tmp(用 heredoc 一次性写完) script_lines = [ "import asyncio", "from app.services.translation.service import service", "from app.redis_client import get_redis", "async def main():", " r = get_redis(); await r.ping()", " print('before:', await r.get('translation:month:202606') or 0, flush=True)", " res1 = await service.translate('Breaking news from Reuters today.', source='en', target='zh')", " print(' call 1: engine=', res1.engine, 'chars=', res1.chars, 'text=', res1.text[:40], flush=True)", " print('after 1:', await r.get('translation:month:202606') or 0, flush=True)", " res2 = await service.translate('The market fell sharply after the announcement.', source='en', target='zh')", " print(' call 2: engine=', res2.engine, 'chars=', res2.chars, flush=True)", " print('after 2:', await r.get('translation:month:2026') or 0, flush=True)" if False else " print('after 2:', await r.get('translation:month:202606') or 0, flush=True)", " res3 = await service.translate('Breaking news from Reuters today.', source='en', target='zh')", " print(' call 3 (cache): cached=', res3.cached, 'engine=', res3.engine, flush=True)", " print('after 3:', await r.get('translation:month:202606') or 0, flush=True)", "asyncio.run(main())", ] script = "\n".join(script_lines) # 写到 server /tmp local = "D:/selftools/diary-news/scripts/_tscript.py" with open(local, "w", encoding="utf-8") as f: f.write(script) # 复制到 server si, so, se = c.exec_command("cat > /tmp/_t.py", timeout=10) with open(local, "r", encoding="utf-8") as f: si.write(f.read().encode()) si.channel.shutdown_write() so.read() print("--- script 写到 /tmp/_t.py ---") # 复制到 worker 容器 run("docker cp /tmp/_t.py news-aggregator-worker-1:/app/_t.py") print("--- 跑 ---") run("docker exec -w /app news-aggregator-worker-1 python /app/_t.py 2>&1 | tail -15", t=30) # /me/usage out = run("curl -s -X POST http://localhost/api/v1/auth/login -H 'Content-Type: application/json' -d '{\"username\":\"owner\",\"password\":\"Owner2026!\"}'") token = __import__("json").loads(out)["access_token"] u = __import__("json").loads(run("curl -s -H 'Authorization: Bearer " + token + "' 'http://localhost/api/v1/me/usage'")) print("\n--- /me/usage ---") print(" ", u) c.close()