Files
diary-news/backend/app/scripts/create_user.py

57 lines
1.8 KiB
Python

"""创建用户(默认 owner)。"""
from __future__ import annotations
import argparse
import asyncio
import sys
from getpass import getpass
from sqlalchemy import select
from app.core.security import hash_password
from app.database import AsyncSessionLocal
from app.models.user import User, UserRole
async def main(username: str, password: str, email: str | None, role: UserRole) -> int:
async with AsyncSessionLocal() as session:
exists = (await session.execute(select(User).where(User.username == username))).scalar_one_or_none()
if exists:
print(f"user '{username}' already exists (id={exists.id})", file=sys.stderr)
return 1
u = User(
username=username,
email=email,
password_hash=hash_password(password),
role=role,
is_active=True,
)
session.add(u)
await session.commit()
await session.refresh(u)
print(f"created user id={u.id} username={u.username} role={u.role.value}")
return 0
def cli() -> None:
p = argparse.ArgumentParser()
p.add_argument("--username", required=True)
p.add_argument("--password", default=None, help="缺省则交互输入")
p.add_argument("--email", default=None)
p.add_argument("--role", choices=["owner", "member"], default="owner")
args = p.parse_args()
password = args.password
if not password:
pw1 = getpass("password: ")
pw2 = getpass("password (again): ")
if pw1 != pw2 or len(pw1) < 6:
print("passwords differ or too short", file=sys.stderr)
sys.exit(2)
password = pw1
rc = asyncio.run(main(args.username, password, args.email, UserRole(args.role)))
sys.exit(rc)
if __name__ == "__main__":
cli()