feat(auth): 限制同时在线 IP 数 (默认 30, 第 31 拒绝)

背景: 防 token 泄漏被滥用 + 限共享账号人数。

- 新增 app/services/active_ip.py:
    Redis ZSET 'active_ips' 存 IP,score=last_seen_unix
    登录/refresh 时 check_or_register_login_ip():
        IP 已在 set → 刷新 score,放行(老用户重连)
        IP 不在 set + ZCARD < 30 → 加入,放行
        IP 不在 set + ZCARD >= 30 → raise 429
    每个已认证请求 _resolve_user() 调 touch_ip_dependency()
        滑动 TTL,30 天没活动自动从 set 剔除
- get_client_ip() 取真实 IP,优先级 X-Forwarded-For > X-Real-IP > client.host
    trust_x_forwarded_for 默认 True(生产 Caddy/Nginx 后面)
- config 加 3 个开关:
    site_max_active_ips: int = 30
    site_active_ip_idle_days: int = 30
    trust_x_forwarded_for: bool = True
- admin.py 加 3 个端点:
    GET  /admin/active-ips       — 看当前活跃 IP 列表 + last_seen
    POST /admin/active-ips/kick  — 强制踢出指定 IP(body={ip})
    DELETE /admin/active-ips/{ip}— 简写踢出
- 注: refresh 也算 IP 占用(拿到 access token 就能用)
    但已存在的 IP 直接放行,不会踢自己
This commit is contained in:
xiaji
2026-06-13 18:22:40 +08:00
parent 0df6e79e56
commit b500613d22
5 changed files with 259 additions and 4 deletions

View File

@@ -205,3 +205,58 @@ async def reset_quota(payload: QuotaReset) -> dict[str, Any]:
key = f"translation:month:{now:%Y%m}"
await r.set(key, payload.used_chars)
return {"key": key, "value": payload.used_chars}
# === 活跃 IP 看板(防 token 泄漏 / 排查异地登录)===
class ActiveIpItem(BaseModel):
ip: str
last_seen_unix: int
last_seen_iso: str
idle_seconds: int
class ActiveIpList(BaseModel):
items: list[ActiveIpItem]
limit: int
idle_days: int
count: int
@router.get("/active-ips", response_model=ActiveIpList)
async def list_active_ips_admin():
from app.config import settings
from app.services.active_ip import list_active_ips
items = await list_active_ips()
return ActiveIpList(
items=[ActiveIpItem(**i) for i in items],
limit=settings.site_max_active_ips,
idle_days=settings.site_active_ip_idle_days,
count=len(items),
)
class KickIpRequest(BaseModel):
ip: str
class KickIpResponse(BaseModel):
kicked: bool
ip: str
@router.post("/active-ips/kick", response_model=KickIpResponse)
async def kick_active_ip(payload: KickIpRequest):
"""owner 强制剔除某个 IP。下次该 IP 登录时会再次被算作"新 IP""""
from app.services.active_ip import remove_active_ip
ok = await remove_active_ip(payload.ip)
return KickIpResponse(kicked=ok, ip=payload.ip)
@router.delete("/active-ips/{ip}", response_model=KickIpResponse)
async def delete_active_ip(ip: str):
from app.services.active_ip import remove_active_ip
ok = await remove_active_ip(ip)
return KickIpResponse(kicked=ok, ip=ip)