"""创建用户(默认 owner)。""" from __future__ import annotations import argparse import asyncio import sys from getpass import getpass from sqlalchemy import select from app.core.security import hash_password from app.database import AsyncSessionLocal from app.models.user import User, UserRole async def main(username: str, password: str, email: str | None, role: UserRole) -> int: async with AsyncSessionLocal() as session: exists = (await session.execute(select(User).where(User.username == username))).scalar_one_or_none() if exists: print(f"user '{username}' already exists (id={exists.id})", file=sys.stderr) return 1 u = User( username=username, email=email, password_hash=hash_password(password), role=role, is_active=True, ) session.add(u) await session.commit() await session.refresh(u) print(f"created user id={u.id} username={u.username} role={u.role.value}") return 0 def cli() -> None: p = argparse.ArgumentParser() p.add_argument("--username", required=True) p.add_argument("--password", default=None, help="缺省则交互输入") p.add_argument("--email", default=None) p.add_argument("--role", choices=["owner", "member"], default="member") args = p.parse_args() password = args.password if not password: pw1 = getpass("password: ") pw2 = getpass("password (again): ") if pw1 != pw2 or len(pw1) < 6: print("passwords differ or too short", file=sys.stderr) sys.exit(2) password = pw1 rc = asyncio.run(main(args.username, password, args.email, UserRole(args.role))) sys.exit(rc) if __name__ == "__main__": cli()