From 4db29ed4a1ba8c9fe45423ba220e75dad6a5d4bb Mon Sep 17 00:00:00 2001 From: xiaji Date: Wed, 17 Jun 2026 07:40:03 +0800 Subject: [PATCH] =?UTF-8?q?test(smoke):=20=E6=96=B0=E5=A2=9E=20/admin/user?= =?UTF-8?q?s=20=E7=AB=AF=E5=88=B0=E7=AB=AF=20smoke=20=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 参考 scripts/smoke_ingest.py 的风格,跑 6 步全链路验证: 1. owner 登录拿 access_token 2. GET /admin/users 列出全部用户(>=1 且无 password_hash 泄露) 3. POST /admin/users 创建 _smoke_test_user_(role 必须为 member) 4. GET /admin/users 确认新用户在列表里 5. DELETE /admin/users/{id} 软删除(is_active=False) 6. 已禁用用户登录被拒(401) 用法: python scripts/smoke_users.py # 默认 http://localhost/api/v1 python scripts/smoke_users.py --base https://xxx/api/v1 --owner-user owner # 密码交互输入(不回显) 期望输出: ALL PASS (6/6) --- scripts/smoke_users.py | 176 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 scripts/smoke_users.py diff --git a/scripts/smoke_users.py b/scripts/smoke_users.py new file mode 100644 index 0000000..99eaf5b --- /dev/null +++ b/scripts/smoke_users.py @@ -0,0 +1,176 @@ +"""Smoke test: /admin/users 接口全链路(owner 端用户管理)。 + +用法: + python scripts/smoke_users.py # 默认 http://localhost/api/v1 + python scripts/smoke_users.py --base http://news.xiaji.com/api/v1 + python scripts/smoke_users.py --owner-user owner --owner-pass YOUR_PASS + +期望输出: + ALL PASS (6/6) + +验证: + 1. owner 登录成功,拿 access_token + 2. GET /admin/users 列出全部用户(应该 >=1) + 3. POST /admin/users 创建 _smoke_test_user_(role 强制 member) + 4. GET /admin/users 列表包含新用户 + 5. DELETE /admin/users/{id} 软删除(返回 is_active=False + detail) + 6. 新用户登录被拒(is_active=False → 401) + +跑完会自动清理(如果创建成功但删除失败,会留个 _smoke_test_user_*,手动 DELETE 即可) +""" +from __future__ import annotations + +import argparse +import json +import sys +import time +from getpass import getpass + +import httpx + + +def _err(msg: str) -> None: + print(f" [FAIL] {msg}", file=sys.stderr) + + +def _ok(msg: str) -> None: + print(f" [OK] {msg}") + + +def main() -> int: + p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + p.add_argument("--base", default="http://localhost/api/v1", + help="API base URL (default: http://localhost/api/v1)") + p.add_argument("--owner-user", default="owner") + p.add_argument("--owner-pass", default=None, + help="owner 密码(省略则交互输入)") + args = p.parse_args() + + owner_pass = args.owner_pass + if not owner_pass: + owner_pass = getpass("owner password: ") + + base = args.base.rstrip("/") + suffix = str(int(time.time())) + test_user = f"_smoke_test_user_{suffix}" + test_pass = "smoke_pass_123" + + passed = 0 + total = 6 + created_id: int | None = None + headers = {"Content-Type": "application/json"} + + with httpx.Client(timeout=15) as c: + # === 1. owner 登录 === + print(f"\n=== 1. owner 登录 ===") + try: + r = c.post(f"{base}/auth/login", + json={"username": args.owner_user, "password": owner_pass}, + headers=headers) + except Exception as e: + _err(f"登录请求失败: {e}") + return 1 + if r.status_code != 200: + _err(f"登录失败 (status={r.status_code}): {r.text}") + return 1 + token = r.json().get("access_token") + if not token: + _err(f"登录响应无 access_token: {r.text}") + return 1 + _ok(f"登录成功,token 前 8 字符: {token[:8]}...") + headers["Authorization"] = f"Bearer {token}" + passed += 1 + + # === 2. GET /admin/users 列出全部 === + print(f"\n=== 2. GET /admin/users ===") + r = c.get(f"{base}/admin/users", headers=headers) + if r.status_code != 200: + _err(f"列表失败 (status={r.status_code}): {r.text}") + return 1 + users = r.json() + if not isinstance(users, list): + _err(f"响应不是列表: {r.text}") + return 1 + if len(users) < 1: + _err(f"用户列表为空(至少要有 owner 自己): {r.text}") + return 1 + # 确认 owner 在列表里 + if not any(u["username"] == args.owner_user for u in users): + _err(f"owner '{args.owner_user}' 不在列表里") + return 1 + # 确认没返回 password_hash + if any("password_hash" in u for u in users): + _err(f"列表泄露 password_hash!") + return 1 + _ok(f"列表 OK,共 {len(users)} 个用户,且无 password_hash 泄露") + passed += 1 + + # === 3. POST /admin/users 创建新用户 === + print(f"\n=== 3. POST /admin/users 创建 {test_user} ===") + r = c.post(f"{base}/admin/users", + json={"username": test_user, "password": test_pass, + "display_name": "smoke test user"}, + headers=headers) + if r.status_code != 201: + _err(f"创建失败 (status={r.status_code}): {r.text}") + return 1 + new_u = r.json() + if new_u.get("username") != test_user: + _err(f"用户名不对: {new_u}") + return 1 + if new_u.get("role") != "member": + _err(f"role 不是 member(可能被越权): {new_u}") + return 1 + if new_u.get("is_active") is not True: + _err(f"is_active 不是 true: {new_u}") + return 1 + if "password_hash" in new_u: + _err(f"创建响应泄露 password_hash!") + return 1 + created_id = new_u["id"] + _ok(f"创建成功 id={created_id} role=member") + passed += 1 + + # === 4. GET /admin/users 再次列表,确认新用户在 === + print(f"\n=== 4. GET /admin/users 确认新用户在 ===") + r = c.get(f"{base}/admin/users", headers=headers) + if r.status_code != 200: + _err(f"列表失败: {r.text}") + return 1 + users = r.json() + if not any(u["username"] == test_user for u in users): + _err(f"新用户 '{test_user}' 不在列表里") + return 1 + _ok(f"确认新用户在列表里") + passed += 1 + + # === 5. DELETE /admin/users/{id} 软删除 === + print(f"\n=== 5. DELETE /admin/users/{created_id} 软删除 ===") + r = c.delete(f"{base}/admin/users/{created_id}", headers=headers) + if r.status_code != 200: + _err(f"软删除失败 (status={r.status_code}): {r.text}") + return 1 + result = r.json() + if result.get("is_active") is not False: + _err(f"软删除后 is_active 不是 False: {result}") + return 1 + _ok(f"软删除成功: {result.get('detail')}") + passed += 1 + + # === 6. 新用户登录被拒(inactive)=== + print(f"\n=== 6. 新用户登录被拒(is_active=False → 401) ===") + r2 = c.post(f"{base}/auth/login", + json={"username": test_user, "password": test_pass}, + headers={"Content-Type": "application/json"}) + if r2.status_code != 401: + _err(f"已禁用用户登录应该被拒(401),实际 status={r2.status_code}: {r2.text}") + return 1 + _ok(f"已禁用用户登录被拒: {r2.json().get('title', r2.text)}") + passed += 1 + + print(f"\n=== ALL PASS ({passed}/{total}) ===") + return 0 if passed == total else 1 + + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file