"""登录/刷新/登出。""" from __future__ import annotations from datetime import datetime, timezone from fastapi import APIRouter, Depends, HTTPException, status from jwt.exceptions import InvalidTokenError from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.core.security import ( create_access_token, create_refresh_token, decode_token, verify_password, ) from app.database import get_session from app.models.user import User from app.schemas.auth import LoginRequest, RefreshRequest, TokenPair router = APIRouter(prefix="/auth", tags=["auth"]) def _pair_for(user: User) -> TokenPair: access = create_access_token(user.id, extra={"role": user.role.value}) refresh = create_refresh_token(user.id) return TokenPair( access_token=access, refresh_token=refresh, expires_in=settings.access_token_ttl_min * 60, ) @router.post("/login", response_model=TokenPair) async def login(body: LoginRequest, session: AsyncSession = Depends(get_session)): result = await session.execute(select(User).where(User.username == body.username)) user = result.scalars().first() if not user or not user.is_active or not verify_password(body.password, user.password_hash): raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid credentials") user.last_login_at = datetime.now(timezone.utc) await session.commit() return _pair_for(user) @router.post("/refresh", response_model=TokenPair) async def refresh(body: RefreshRequest, session: AsyncSession = Depends(get_session)): try: payload = decode_token(body.refresh_token) if payload.get("type") != "refresh": raise InvalidTokenError("wrong type") uid = int(payload["sub"]) except (InvalidTokenError, KeyError, ValueError): raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid refresh token") result = await session.execute(select(User).where(User.id == uid, User.is_active.is_(True))) user = result.scalars().first() if not user: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "User not found") return _pair_for(user)