"""站点活跃 IP 限制(防 token 泄漏被滥用) - Redis SortedSet `active_ips` 存当前活跃客户端 IP - member = IP 字符串 - score = last_seen unix 秒 - 每次已认证请求: 刷新该 IP 的 score(滑动 TTL) - 登录时: 检查 ZCARD - 当前 IP 已在 set → 直接放行(自己人不踢) - 当前 IP 不在 set 且 ZCARD >= limit → 拒绝登录 - 当前 IP 不在 set 且 ZCARD < limit → 加入 set,放行 - 30 天没活动的 IP 自动从 set 剔除 - 整个 key 30 天无活动自动过期(EXPIRE) 注意: - 信任 X-Forwarded-For 必须显式开启(trust_x_forwarded_for=True), 否则攻击者伪造头绕过 IP 限制 - 不限流请求本身(只限流"新 IP 登录"),活跃 IP 持续刷新即可 - API token(api_tokens 表)也走这个限流(同一个 IP 拿到 token 后被滥用也防) """ from __future__ import annotations import logging import time from typing import Annotated from fastapi import Depends, HTTPException, Request, status from app.config import settings from app.redis_client import get_redis logger = logging.getLogger("news.auth.active_ip") REDIS_KEY = "active_ips" def get_client_ip(request: Request) -> str: """提取客户端真实 IP。 优先级: 1) X-Forwarded-For 第一个(被代理链信任时) 2) X-Real-IP(Nginx 风格) 3) request.client.host(直连) 仅在配置 trust_x_forwarded_for=True 时信任 1+2,否则只用 3。 """ import sys xff_raw = request.headers.get("x-forwarded-for") xri_raw = request.headers.get("x-real-ip") client_host = request.client.host if request.client else None print(f"[GET_CLIENT_IP] xff={xff_raw!r} xri={xri_raw!r} client_host={client_host!r} trust={settings.trust_x_forwarded_for}", file=sys.stderr, flush=True) if settings.trust_x_forwarded_for: xff = xff_raw if xff: ip = xff.split(",")[0].strip() if ip: print(f"[GET_CLIENT_IP] -> use xff split[0] = {ip!r}", file=sys.stderr, flush=True) return ip xri = xri_raw if xri: return xri.strip() if client_host: return client_host return "unknown" async def _prune_and_count(now: int) -> int: """清理过期 IP 并返回当前活跃数。""" r = get_redis() idle_cutoff = now - settings.site_active_ip_idle_days * 86400 # 删 score < idle_cutoff await r.zremrangebyscore(REDIS_KEY, 0, idle_cutoff - 1) return await r.zcard(REDIS_KEY) async def check_or_register_login_ip(ip: str) -> None: """登录/refresh 时调用。 - 如果 IP 已在 set 里:刷新 score,放行 - 如果 IP 不在 set 里,且活跃数 < limit:加入 set,放行 - 如果 IP 不在 set 里,且活跃数 >= limit:raise 429 注意:不限制 owner/非 owner(整个站点一个池,所有用户共享) """ r = get_redis() now = int(time.time()) limit = settings.site_max_active_ips import sys print(f"[CHECK] ip={ip!r} limit={limit}", file=sys.stderr, flush=True) # 看 IP 是否已在 set existing = await r.zscore(REDIS_KEY, ip) if existing is not None: # 已在 set,刷新 score 即可(滑动 TTL) await r.zadd(REDIS_KEY, {ip: now}) await r.expire(REDIS_KEY, settings.site_active_ip_idle_days * 86400) return # 不在 set — 先清理过期再计数 count = await _prune_and_count(now) if count >= limit: # 第 (limit+1) 个新 IP,拒绝 logger.warning( "active IP limit reached: %d >= %d, reject login from %s", count, limit, ip, ) raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=( f"站点同时在线 IP 数已达上限({limit} 个)。" f"如有疑问请等待空闲 IP 退出后重试,或联系 owner 调整 site_max_active_ips。" ), headers={"Retry-After": "3600"}, ) # 通过:加入 set await r.zadd(REDIS_KEY, {ip: now}) await r.expire(REDIS_KEY, settings.site_active_ip_idle_days * 86400) logger.info("active IP registered: %s (total now %d)", ip, count + 1) async def touch_active_ip(ip: str) -> None: """每次已认证请求时调用,刷新该 IP 的 score(滑动 TTL)。 已废弃 IP 不主动删,等下次 login 时 prune。 """ r = get_redis() now = int(time.time()) # NX:仅当 member 不存在时设置 — 但我们要 UPDATE score,不能用 NX # 正常 ZADD 即可:member 存在就更新 score await r.zadd(REDIS_KEY, {ip: now}) # 同时顺手 prune,避免 set 无限膨胀 await _prune_and_count(now) await r.expire(REDIS_KEY, settings.site_active_ip_idle_days * 86400) async def list_active_ips() -> list[dict]: """给 owner 看:当前活跃 IP + last_seen。 按 last_seen 倒序。 """ r = get_redis() now = int(time.time()) await _prune_and_count(now) rows = await r.zrange(REDIS_KEY, 0, -1, withscores=True, desc=True) return [ { "ip": ip, "last_seen_unix": int(score), "last_seen_iso": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(score)), "idle_seconds": int(now - score), } for ip, score in rows ] async def remove_active_ip(ip: str) -> bool: """owner 强制踢人。""" r = get_redis() removed = await r.zrem(REDIS_KEY, ip) return bool(removed) # === FastAPI 依赖 === async def touch_ip_dependency(request: Request) -> None: """每个已认证请求的依赖:刷新当前 IP 的 last_seen。 不会失败(吞掉异常),避免 Redis 抖动影响正常请求。 """ try: import sys ip = get_client_ip(request) # 临时诊断:看实际 IP 提取 xff = request.headers.get("x-forwarded-for") xri = request.headers.get("x-real-ip") client_host = request.client.host if request.client else None print(f"[TOUCH_DEP] ip={ip!r} xff={xff!r} xri={xri!r} client_host={client_host!r}", file=sys.stderr, flush=True) await touch_active_ip(ip) except Exception as e: # noqa: BLE001 logger.debug("touch_active_ip failed (ignored): %s", e)