"""Smoke test: /admin/users 接口全链路(owner 端用户管理)。 用法: python scripts/smoke_users.py # 默认 http://localhost/api/v1 python scripts/smoke_users.py --base http://news.xiaji.com/api/v1 python scripts/smoke_users.py --owner-user owner --owner-pass YOUR_PASS 期望输出: ALL PASS (6/6) 验证: 1. owner 登录成功,拿 access_token 2. GET /admin/users 列出全部用户(应该 >=1) 3. POST /admin/users 创建 _smoke_test_user_(role 强制 member) 4. GET /admin/users 列表包含新用户 5. DELETE /admin/users/{id} 软删除(返回 is_active=False + detail) 6. 新用户登录被拒(is_active=False → 401) 跑完会自动清理(如果创建成功但删除失败,会留个 _smoke_test_user_*,手动 DELETE 即可) """ from __future__ import annotations import argparse import json import sys import time from getpass import getpass import httpx def _err(msg: str) -> None: print(f" [FAIL] {msg}", file=sys.stderr) def _ok(msg: str) -> None: print(f" [OK] {msg}") def main() -> int: p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) p.add_argument("--base", default="http://localhost/api/v1", help="API base URL (default: http://localhost/api/v1)") p.add_argument("--owner-user", default="owner") p.add_argument("--owner-pass", default=None, help="owner 密码(省略则交互输入)") args = p.parse_args() owner_pass = args.owner_pass if not owner_pass: owner_pass = getpass("owner password: ") base = args.base.rstrip("/") suffix = str(int(time.time())) test_user = f"_smoke_test_user_{suffix}" test_pass = "smoke_pass_123" passed = 0 total = 6 created_id: int | None = None headers = {"Content-Type": "application/json"} with httpx.Client(timeout=15) as c: # === 1. owner 登录 === print(f"\n=== 1. owner 登录 ===") try: r = c.post(f"{base}/auth/login", json={"username": args.owner_user, "password": owner_pass}, headers=headers) except Exception as e: _err(f"登录请求失败: {e}") return 1 if r.status_code != 200: _err(f"登录失败 (status={r.status_code}): {r.text}") return 1 token = r.json().get("access_token") if not token: _err(f"登录响应无 access_token: {r.text}") return 1 _ok(f"登录成功,token 前 8 字符: {token[:8]}...") headers["Authorization"] = f"Bearer {token}" passed += 1 # === 2. GET /admin/users 列出全部 === print(f"\n=== 2. GET /admin/users ===") r = c.get(f"{base}/admin/users", headers=headers) if r.status_code != 200: _err(f"列表失败 (status={r.status_code}): {r.text}") return 1 users = r.json() if not isinstance(users, list): _err(f"响应不是列表: {r.text}") return 1 if len(users) < 1: _err(f"用户列表为空(至少要有 owner 自己): {r.text}") return 1 # 确认 owner 在列表里 if not any(u["username"] == args.owner_user for u in users): _err(f"owner '{args.owner_user}' 不在列表里") return 1 # 确认没返回 password_hash if any("password_hash" in u for u in users): _err(f"列表泄露 password_hash!") return 1 _ok(f"列表 OK,共 {len(users)} 个用户,且无 password_hash 泄露") passed += 1 # === 3. POST /admin/users 创建新用户 === print(f"\n=== 3. POST /admin/users 创建 {test_user} ===") r = c.post(f"{base}/admin/users", json={"username": test_user, "password": test_pass, "display_name": "smoke test user"}, headers=headers) if r.status_code != 201: _err(f"创建失败 (status={r.status_code}): {r.text}") return 1 new_u = r.json() if new_u.get("username") != test_user: _err(f"用户名不对: {new_u}") return 1 if new_u.get("role") != "member": _err(f"role 不是 member(可能被越权): {new_u}") return 1 if new_u.get("is_active") is not True: _err(f"is_active 不是 true: {new_u}") return 1 if "password_hash" in new_u: _err(f"创建响应泄露 password_hash!") return 1 created_id = new_u["id"] _ok(f"创建成功 id={created_id} role=member") passed += 1 # === 4. GET /admin/users 再次列表,确认新用户在 === print(f"\n=== 4. GET /admin/users 确认新用户在 ===") r = c.get(f"{base}/admin/users", headers=headers) if r.status_code != 200: _err(f"列表失败: {r.text}") return 1 users = r.json() if not any(u["username"] == test_user for u in users): _err(f"新用户 '{test_user}' 不在列表里") return 1 _ok(f"确认新用户在列表里") passed += 1 # === 5. DELETE /admin/users/{id} 软删除 === print(f"\n=== 5. DELETE /admin/users/{created_id} 软删除 ===") r = c.delete(f"{base}/admin/users/{created_id}", headers=headers) if r.status_code != 200: _err(f"软删除失败 (status={r.status_code}): {r.text}") return 1 result = r.json() if result.get("is_active") is not False: _err(f"软删除后 is_active 不是 False: {result}") return 1 _ok(f"软删除成功: {result.get('detail')}") passed += 1 # === 6. 新用户登录被拒(inactive)=== print(f"\n=== 6. 新用户登录被拒(is_active=False → 401) ===") r2 = c.post(f"{base}/auth/login", json={"username": test_user, "password": test_pass}, headers={"Content-Type": "application/json"}) if r2.status_code != 401: _err(f"已禁用用户登录应该被拒(401),实际 status={r2.status_code}: {r2.text}") return 1 _ok(f"已禁用用户登录被拒: {r2.json().get('title', r2.text)}") passed += 1 print(f"\n=== ALL PASS ({passed}/{total}) ===") return 0 if passed == total else 1 if __name__ == "__main__": sys.exit(main())