"""通用依赖:获取当前用户、要求 owner。""" from __future__ import annotations from datetime import datetime, timezone from fastapi import Depends, HTTPException, Request, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jwt.exceptions import InvalidTokenError from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.security import decode_token, hash_api_token from app.database import get_session from app.models.api_token import ApiToken from app.models.user import User, UserRole from app.services.active_ip import touch_ip_dependency _bearer = HTTPBearer(auto_error=False) async def _resolve_user( request: Request, creds: HTTPAuthorizationCredentials | None = Depends(_bearer), session: AsyncSession = Depends(get_session), ) -> User: if creds is None or not creds.credentials: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Missing credentials") token = creds.credentials # 1) 先试 API Token(sha256 比较) h = hash_api_token(token) result = await session.execute( select(ApiToken).where(ApiToken.token_hash == h, ApiToken.revoked_at.is_(None)) ) api_row = result.scalars().first() if api_row: if api_row.expires_at and api_row.expires_at < datetime.now(timezone.utc): raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Token expired") result = await session.execute(select(User).where(User.id == api_row.user_id)) user = result.scalars().first() if user and user.is_active: api_row.last_used_at = datetime.now(timezone.utc) await session.commit() # 顺带刷新活跃 IP(防 token 泄漏后被滥用,绑定客户端 IP) await touch_ip_dependency(request) return user # 2) 试 JWT try: payload = decode_token(token) if payload.get("type") != "access": raise InvalidTokenError("wrong type") uid = int(payload["sub"]) except (InvalidTokenError, KeyError, ValueError): raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid token") result = await session.execute( select(User).where(User.id == uid, User.is_active.is_(True)) ) user = result.scalars().first() if user is None: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "User not found or inactive") # 已认证 → 刷新当前 IP 活跃时间 await touch_ip_dependency(request) return user async def get_current_user(user: User = Depends(_resolve_user)) -> User: return user async def require_owner(user: User = Depends(get_current_user)) -> User: if user.role != UserRole.OWNER: raise HTTPException(status.HTTP_403_FORBIDDEN, "Owner only") return user