"""/articles 列表与详情。""" from __future__ import annotations from datetime import datetime from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy import and_, desc, func, or_, select from sqlalchemy.ext.asyncio import AsyncSession from app.core.deps import get_current_user from app.database import get_session from app.models.article import Article from app.models.bookmark import Bookmark from app.models.source import Source from app.models.user import User from app.schemas.article import ( ArticleDetail, ArticleListItem, ArticleListResponse, SourceBrief, ) router = APIRouter(prefix="/articles", tags=["articles"]) @router.get("", response_model=ArticleListResponse) async def list_articles( since: datetime | None = Query(default=None, description="起时间 UTC"), until: datetime | None = Query(default=None, description="止时间 UTC"), source: str | None = Query(default=None, description="逗号分隔 source slug"), category: str | None = None, q: str | None = Query(default=None, description="标题/正文搜索"), lang: Annotated[str, Query(pattern=r"^(src|zh|both)$")] = "both", page: int = Query(default=1, ge=1, description="页码(从 1 开始)"), page_size: int = Query(default=50, ge=1, le=200, description="每页条数"), starred_only: bool = False, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): # 公共筛选条件(用于 list + count) filters = [Article.duplicate_of.is_(None)] # 默认过去 24h if since is None and until is None: since = _default_since_24h() if since: filters.append(Article.published_at >= since) if until: filters.append(Article.published_at <= until) if category: filters.append(Article.category == category) if source: slugs = [s.strip() for s in source.split(",") if s.strip()] if slugs: filters.append(Source.slug.in_(slugs)) if q: like = f"%{q}%" filters.append(or_(Article.title.ilike(like), Article.body_text.ilike(like))) if lang == "zh": filters.append(Article.title_zh.is_not(None)) # ===== count 总数 ===== count_stmt = select(func.count(Article.id)).join(Source, Source.id == Article.source_id) for f in filters: count_stmt = count_stmt.where(f) if starred_only: count_stmt = count_stmt.join( Bookmark, and_(Bookmark.article_id == Article.id, Bookmark.user_id == user.id) ) total: int = (await session.execute(count_stmt)).scalar_one() total_pages = max(1, (total + page_size - 1) // page_size) # 越界保护:page 超出范围时返回空数组,total 仍真实 if page > total_pages: return ArticleListResponse( items=[], page=page, page_size=page_size, total=total, total_pages=total_pages ) # ===== 当前页数据 ===== stmt = select(Article, Source).join(Source, Source.id == Article.source_id) for f in filters: stmt = stmt.where(f) if starred_only: stmt = stmt.join(Bookmark, and_(Bookmark.article_id == Article.id, Bookmark.user_id == user.id)) offset = (page - 1) * page_size stmt = ( stmt.order_by(desc(Article.published_at), desc(Article.id)) .offset(offset) .limit(page_size) ) rows = (await session.execute(stmt)).all() # 标记 is_starred(批量) ids = [a.id for a, _ in rows] starred_ids: set[int] = set() if ids: bm_rows = ( await session.execute( select(Bookmark.article_id).where( Bookmark.user_id == user.id, Bookmark.article_id.in_(ids) ) ) ).all() starred_ids = {b[0] for b in bm_rows} items = [] for art, src in rows: items.append( ArticleListItem( id=art.id, source=SourceBrief.model_validate(src), title=art.title, title_zh=art.title_zh, body_zh_text=art.body_zh_text, summary_zh=art.summary_zh, lang_src=art.lang_src, translation_status=art.translation_status, category=art.category, published_at=art.published_at, fetched_at=art.fetched_at, image_url=art.image_url, # 列表预览钩子:分类 + LLM 点评 + AI 插图 缩略图 commentary=art.commentary, commentary_status=art.commentary_status, image_ai_url=art.image_ai_url, is_starred=art.id in starred_ids, ) ) return ArticleListResponse( items=items, page=page, page_size=page_size, total=total, total_pages=total_pages, ) @router.get("/{article_id}", response_model=ArticleDetail) async def get_article( article_id: int, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): result = await session.execute( select(Article, Source) .join(Source, Source.id == Article.source_id) .where(Article.id == article_id) ) art = result.first() if not art: raise HTTPException(status.HTTP_404_NOT_FOUND, "Article not found") article, source = art is_starred = ( await session.execute( select(Bookmark.id).where( Bookmark.user_id == user.id, Bookmark.article_id == article.id ) ) ).first() is not None return ArticleDetail( id=article.id, source=SourceBrief.model_validate(source), url=article.url, title=article.title, body_html=article.body_html, body_text=article.body_text, title_zh=article.title_zh, body_zh_html=article.body_zh_html, body_zh_text=article.body_zh_text, body_zh_formatted=article.body_zh_formatted, summary_zh=article.summary_zh, lang_src=article.lang_src, author=article.author, image_url=article.image_url, image_ai_url=article.image_ai_url, translation_status=article.translation_status, translation_engine=article.translation_engine, translated_at=article.translated_at, category=article.category, format_status=article.format_status, classify_status=article.classify_status, image_ai_status=article.image_ai_status, commentary_status=article.commentary_status, commentary=article.commentary, entities=article.entities, sentiment=article.sentiment, duplicate_of=article.duplicate_of, published_at=article.published_at, fetched_at=article.fetched_at, is_starred=is_starred, ) def _default_since_24h() -> datetime: from datetime import timedelta return datetime.utcnow() - timedelta(hours=24)