Files
diary-news/backend/app/api/auth.py

79 lines
2.8 KiB
Python
Raw Normal View History

"""登录/刷新/登出。"""
from __future__ import annotations
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Request, status
from jwt.exceptions import InvalidTokenError
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.core.security import (
create_access_token,
create_refresh_token,
decode_token,
verify_password,
)
from app.database import get_session
from app.models.user import User
from app.schemas.auth import LoginRequest, RefreshRequest, TokenPair
from app.services.active_ip import check_or_register_login_ip, get_client_ip
router = APIRouter(prefix="/auth", tags=["auth"])
def _pair_for(user: User) -> TokenPair:
access = create_access_token(user.id, extra={"role": user.role.value})
refresh = create_refresh_token(user.id)
return TokenPair(
access_token=access,
refresh_token=refresh,
expires_in=settings.access_token_ttl_min * 60,
)
@router.post("/login", response_model=TokenPair)
async def login(
body: LoginRequest,
request: Request,
session: AsyncSession = Depends(get_session),
):
# === 先校验 IP 上限(在查 DB 前,避免密码错误也消耗 DB)===
# 注:这一步是公开的(未认证),不暴露"这个 IP 是不是 owner"的信息
client_ip = get_client_ip(request)
await check_or_register_login_ip(client_ip)
result = await session.execute(select(User).where(User.username == body.username))
user = result.scalars().first()
if not user or not user.is_active or not verify_password(body.password, user.password_hash):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid credentials")
user.last_login_at = datetime.now(timezone.utc)
await session.commit()
return _pair_for(user)
@router.post("/refresh", response_model=TokenPair)
async def refresh(
body: RefreshRequest,
request: Request,
session: AsyncSession = Depends(get_session),
):
# refresh 也要算 IP 占用(因为 refresh 拿到 access token 就能用)
# 但如果 IP 已经在 set 里,等于合法老用户,直接放过
client_ip = get_client_ip(request)
await check_or_register_login_ip(client_ip)
try:
payload = decode_token(body.refresh_token)
if payload.get("type") != "refresh":
raise InvalidTokenError("wrong type")
uid = int(payload["sub"])
except (InvalidTokenError, KeyError, ValueError):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid refresh token")
result = await session.execute(select(User).where(User.id == uid, User.is_active.is_(True)))
user = result.scalars().first()
if not user:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "User not found")
return _pair_for(user)