import os, paramiko, base64 PW = os.environ["REMOTE_PASS"] c = paramiko.SSHClient() c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) c.connect("207.57.129.228", port=19717, username="root", password=PW, timeout=15, allow_agent=False, look_for_keys=False) def run(cmd, t=60): si, so, se = c.exec_command(cmd, timeout=t) out = so.read().decode("utf-8", "replace") err = se.read().decode("utf-8", "replace") rc = so.channel.recv_exit_status() if out: print(out, end="") if err: print("[err]", err, end="", file=__import__("sys").stderr) return out # 在 API 容器里直接读 redis script = ( "import asyncio\n" "from app.redis_client import get_redis\n" "async def main():\n" " r = get_redis()\n" " await r.ping()\n" " v = await r.get('translation:month:202606')\n" " print('api sees:', v)\n" " # 手动设一个值再读\n" " await r.set('translation:month:202606', 999)\n" " v2 = await r.get('translation:month:202606')\n" " print('after set 999:', v2)\n" "asyncio.run(main())\n" ) b64 = base64.b64encode(script.encode()).decode() run(f"docker exec news-aggregator-api-1 sh -c 'echo {b64} | base64 -d > /app/_test_redis.py'") print("--- api 容器读 redis ---") run("docker exec -w /app news-aggregator-api-1 python /app/_test_redis.py", t=15) # 立即调 /me/usage import json out = run("curl -s -X POST http://localhost/api/v1/auth/login -H 'Content-Type: application/json' -d '{\"username\":\"owner\",\"password\":\"Owner2026!\"}'") token = json.loads(out)["access_token"] u = json.loads(run(f"curl -s -H 'Authorization: Bearer {token}' 'http://localhost/api/v1/me/usage'")) print(f"--- /me/usage: {u}") c.close()