"""/articles 列表与详情。""" from __future__ import annotations import base64 import json from datetime import datetime from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy import and_, desc, func, or_, select from sqlalchemy.ext.asyncio import AsyncSession from app.core.deps import get_current_user from app.database import get_session from app.models.article import Article from app.models.bookmark import Bookmark from app.models.source import Source from app.models.user import User from app.schemas.article import ( ArticleDetail, ArticleListItem, ArticleListResponse, SourceBrief, ) router = APIRouter(prefix="/articles", tags=["articles"]) def _encode_cursor(article: Article) -> str: payload = {"id": article.id, "ts": int(article.fetched_at.timestamp())} return base64.urlsafe_b64encode(json.dumps(payload).encode()).decode() def _decode_cursor(cur: str) -> tuple[int, datetime]: try: data = json.loads(base64.urlsafe_b64decode(cur.encode()).decode()) return int(data["id"]), datetime.fromtimestamp(int(data["ts"])) except Exception: raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid cursor") @router.get("", response_model=ArticleListResponse) async def list_articles( since: datetime | None = Query(default=None, description="起时间 UTC"), until: datetime | None = Query(default=None, description="止时间 UTC"), source: str | None = Query(default=None, description="逗号分隔 source slug"), category: str | None = None, q: str | None = Query(default=None, description="标题/正文搜索"), lang: Annotated[str, Query(pattern=r"^(src|zh|both)$")] = "both", limit: int = Query(default=50, ge=1, le=200), cursor: str | None = None, starred_only: bool = False, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): stmt = ( select(Article, Source) .join(Source, Source.id == Article.source_id) .where(Article.duplicate_of.is_(None)) ) # 默认过去 24h if since is None and until is None and cursor is None: since = _default_since_24h() if since: stmt = stmt.where(Article.published_at >= since) if until: stmt = stmt.where(Article.published_at <= until) if category: stmt = stmt.where(Article.category == category) if source: slugs = [s.strip() for s in source.split(",") if s.strip()] if slugs: stmt = stmt.where(Source.slug.in_(slugs)) if q: like = f"%{q}%" stmt = stmt.where(or_(Article.title.ilike(like), Article.body_text.ilike(like))) # 语言过滤 if lang == "zh": stmt = stmt.where(Article.title_zh.is_not(None)) elif lang == "src": # 只要原文已有 pass if cursor: last_id, _ = _decode_cursor(cursor) stmt = stmt.where(Article.id < last_id) if starred_only: stmt = stmt.join(Bookmark, and_(Bookmark.article_id == Article.id, Bookmark.user_id == user.id)) stmt = stmt.order_by(desc(Article.published_at), desc(Article.id)).limit(limit + 1) result = await session.execute(stmt) rows = result.all() has_more = len(rows) > limit rows = rows[:limit] # 标记 is_starred(批量) ids = [a.id for a, _ in rows] starred_ids: set[int] = set() if ids: bm_rows = ( await session.execute( select(Bookmark.article_id).where( Bookmark.user_id == user.id, Bookmark.article_id.in_(ids) ) ) ).all() starred_ids = {b[0] for b in bm_rows} items = [] for art, src in rows: item = ArticleListItem( id=art.id, source=SourceBrief.model_validate(src), title=art.title, title_zh=art.title_zh, summary_zh=art.summary_zh, lang_src=art.lang_src, translation_status=art.translation_status, category=art.category, published_at=art.published_at, fetched_at=art.fetched_at, image_url=art.image_url, # 列表预览钩子:分类 + LLM 点评 + AI 插图 缩略图 commentary=art.commentary, commentary_status=art.commentary_status, image_ai_url=art.image_ai_url, is_starred=art.id in starred_ids, ) items.append(item) next_cursor = _encode_cursor(rows[-1][0]) if has_more and rows else None return ArticleListResponse(items=items, next_cursor=next_cursor, total=None) @router.get("/{article_id}", response_model=ArticleDetail) async def get_article( article_id: int, user: User = Depends(get_current_user), session: AsyncSession = Depends(get_session), ): result = await session.execute( select(Article, Source) .join(Source, Source.id == Article.source_id) .where(Article.id == article_id) ) art = result.first() if not art: raise HTTPException(status.HTTP_404_NOT_FOUND, "Article not found") article, source = art is_starred = ( await session.execute( select(Bookmark.id).where( Bookmark.user_id == user.id, Bookmark.article_id == article.id ) ) ).first() is not None return ArticleDetail( id=article.id, source=SourceBrief.model_validate(source), url=article.url, title=article.title, body_html=article.body_html, body_text=article.body_text, title_zh=article.title_zh, body_zh_html=article.body_zh_html, body_zh_text=article.body_zh_text, body_zh_formatted=article.body_zh_formatted, summary_zh=article.summary_zh, lang_src=article.lang_src, author=article.author, image_url=article.image_url, image_ai_url=article.image_ai_url, translation_status=article.translation_status, translation_engine=article.translation_engine, translated_at=article.translated_at, category=article.category, format_status=article.format_status, classify_status=article.classify_status, image_ai_status=article.image_ai_status, commentary_status=article.commentary_status, commentary=article.commentary, entities=article.entities, sentiment=article.sentiment, duplicate_of=article.duplicate_of, published_at=article.published_at, fetched_at=article.fetched_at, is_starred=is_starred, ) def _default_since_24h() -> datetime: from datetime import timedelta return datetime.utcnow() - timedelta(hours=24)