"""Admin API(仅 owner)。 - 源管理 CRUD - 手动触发抓取 / 重译 - 源健康看板 - 翻译配额管理 """ from __future__ import annotations from datetime import datetime, timedelta, timezone from typing import Any from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status from pydantic import BaseModel from sqlalchemy import func, select from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from app.core.deps import require_owner from app.database import get_session from app.models.article import Article from app.models.source import Source from app.models.user import User from app.schemas.source import SourceIn, SourceOut, SourceUpdate router = APIRouter(prefix="/admin", tags=["admin"], dependencies=[Depends(require_owner)]) # === Source CRUD === @router.get("/sources", response_model=list[SourceOut]) async def list_sources_all(session: AsyncSession = Depends(get_session)): result = await session.execute(select(Source).order_by(Source.id)) rows = result.scalars() return [SourceOut.model_validate(s) for s in rows] @router.post("/sources", response_model=SourceOut, status_code=status.HTTP_201_CREATED) async def create_source(body: SourceIn, session: AsyncSession = Depends(get_session)): src = Source( name=body.name, slug=body.slug, kind=body.kind, url=str(body.url), detail_selector=body.detail_selector, region=body.region, language_src=body.language_src, priority=body.priority, fetch_interval_min=body.fetch_interval_min, translate_to=body.translate_to, enabled=body.enabled, headers_json=body.headers_json, ) session.add(src) try: await session.commit() except IntegrityError as e: await session.rollback() raise HTTPException(status.HTTP_409_CONFLICT, f"slug '{body.slug}' already exists") from e await session.refresh(src) return SourceOut.model_validate(src) @router.patch("/sources/{source_id}", response_model=SourceOut) async def update_source( source_id: int, body: SourceUpdate, session: AsyncSession = Depends(get_session), ): result = await session.execute(select(Source).where(Source.id == source_id)) src = result.scalar_one_or_none() if not src: raise HTTPException(status.HTTP_404_NOT_FOUND, "Source not found") for k, v in body.model_dump(exclude_unset=True).items(): setattr(src, k, v) await session.commit() await session.refresh(src) return SourceOut.model_validate(src) @router.delete("/sources/{source_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_source(source_id: int, session: AsyncSession = Depends(get_session)): result = await session.execute(select(Source).where(Source.id == source_id)) src = result.scalar_one_or_none() if not src: raise HTTPException(status.HTTP_404_NOT_FOUND, "Source not found") await session.delete(src) await session.commit() return None # === 手动触发 === class TriggerResponse(BaseModel): triggered: bool detail: str = "" @router.post("/refresh/{source_id}", response_model=TriggerResponse) async def refresh_source( source_id: int, background: BackgroundTasks, session: AsyncSession = Depends(get_session), ): result = await session.execute(select(Source).where(Source.id == source_id)) src = result.scalar_one_or_none() if not src: raise HTTPException(status.HTTP_404_NOT_FOUND, "Source not found") if not src.enabled: raise HTTPException(status.HTTP_400_BAD_REQUEST, "Source disabled") # 走 background,不等结果 from app.workers.pipeline import fetch_one_source background.add_task(fetch_one_source, source_id) return TriggerResponse(triggered=True, detail=f"queued fetch for {src.slug}") async def _run_fetch(source_id: int) -> None: """(deprecated) 走 background 用的薄包装,见 refresh_source。""" from app.workers.pipeline import fetch_one_source await fetch_one_source(source_id) @router.post("/translation/rerun/{article_id}", response_model=TriggerResponse) async def rerun_translation( article_id: int, background: BackgroundTasks, session: AsyncSession = Depends(get_session), ): result = await session.execute(select(Article).where(Article.id == article_id)) art = result.scalar_one_or_none() if not art: raise HTTPException(status.HTTP_404_NOT_FOUND, "Article not found") art.translation_status = "pending" art.title_zh = None art.body_zh_text = None art.body_zh_html = None art.translated_at = None art.translation_engine = None await session.commit() from app.workers.pipeline import translate_article background.add_task(translate_article, article_id) return TriggerResponse(triggered=True, detail=f"queued translation for article {article_id}") # === 健康看板 === class HealthOut(BaseModel): source_id: int slug: str name: str enabled: bool last_fetched_at: datetime | None last_status: str | None consecutive_failures: int fetch_interval_min: int article_count_24h: int @router.get("/health", response_model=list[HealthOut]) async def health(session: AsyncSession = Depends(get_session)): result = await session.execute(select(Source).order_by(Source.priority.desc())) rows = result.scalars() out: list[HealthOut] = [] for s in rows: c24 = ( await session.execute( select(func.count(Article.id)).where( Article.source_id == s.id, Article.fetched_at >= datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=24), ) ) ).scalar_one() out.append( HealthOut( source_id=s.id, slug=s.slug, name=s.name, enabled=s.enabled, last_fetched_at=s.last_fetched_at, last_status=s.last_status, consecutive_failures=s.consecutive_failures, fetch_interval_min=s.fetch_interval_min, article_count_24h=c24 or 0, ) ) return out # === 翻译配额(管理员视图) === class QuotaReset(BaseModel): used_chars: int = 0 @router.post("/translation/quota/reset") async def reset_quota(payload: QuotaReset) -> dict[str, Any]: from app.redis_client import get_redis r = get_redis() now = datetime.now(timezone.utc) key = f"translation:month:{now:%Y%m}" await r.set(key, payload.used_chars) return {"key": key, "value": payload.used_chars}