diff --git a/backend/app/api/me.py b/backend/app/api/me.py index c0c7c99..100897c 100644 --- a/backend/app/api/me.py +++ b/backend/app/api/me.py @@ -90,6 +90,91 @@ async def usage( # `/reads/{article_id}` **之前**声明,否则 FastAPI 会把 "by-category" / "category-count" # 当成 article_id 匹配到 mark_read,然后 int 转换失败返 422。 # FastAPI 路由匹配是"先注册先匹配"(按装饰器执行顺序),不是按特异度。 +# ⚠️ Python 装饰器在 import 阶段立即执行,所以 @router.post(..., response_model=...) +# 引用到的 Pydantic class 必须在路由函数定义前就存在于 module globals。 +# 下面的 CategoryReadRequest / CategoryReadResponse / CategoryCountItem 必须放在这里。 +def _unread_subquery(user_id: int): + """未读文章 id 子查询(per-user)— 复用 articles.py 的 NOT EXISTS 模式。""" + return ( + select(ArticleRead.article_id) + .where(ArticleRead.user_id == user_id, ArticleRead.article_id == Article.id) + .exists() + ) + + +def _escape_like(term: str) -> str: + """转义 ilike 的元字符,避免 category 含 % / _ 时被当通配符。""" + return term.replace("\\", "\\\\").replace("%", r"\%").replace("_", r"\_") + + +class CategoryReadRequest(BaseModel): + category: str = Field(..., min_length=1, max_length=32, description="要批量已读的分类 tag") + scope: Literal["all_unread", "filtered_unread"] = Field( + default="filtered_unread", + description="过滤范围:filtered_unread=遵守 sources/q;all_unread=忽略", + ) + window_hours: int = Field( + default=24, ge=1, le=168, description="只标 published_at 在最近 N 小时内的未读" + ) + sources: list[str] | None = Field(default=None, description="源 slug 列表,过滤范围") + q: str | None = Field(default=None, description="关键词过滤(标题/正文模糊)") + + +class CategoryReadResponse(BaseModel): + category: str + matched: int # 命中的未读数 + marked: int # 实际新标已读数(去重后) + article_ids: list[int] # 给前端做滑出动画 + + +class CategoryCountItem(BaseModel): + category: str + unread_count: int + window_hours: int + + +def _build_category_filter( + category: str, + window_hours: int, + user_id: int, + sources: list[str] | None = None, + q: str | None = None, +): + """构造"分类 + 时间窗 + 未读"三合一 filter。 + + - category 用 ilike 模糊匹配(逗号分隔串中含此 tag 即可) + - window_hours 通过 published_at 过滤 + - 用 NOT EXISTS 排除当前用户已读 + - sources / q 可选,跟 articles.py 列表查询口径一致 + """ + pattern = f"%{_escape_like(category)}%" + # 用 text() 拼 interval,避免 make_interval 的 PG 函数参数绑定在某些 + # SQLAlchemy 版本下的兼容性问题。window_hours 是 int,只来自我们自己的 + # endpoint,不是用户原始字符串,安全。 + interval_sql = text(f"interval '{int(window_hours)} hours'") + filters = [ + Article.category.ilike(pattern, escape="\\"), + Article.published_at >= func.now() - interval_sql, + not_(_unread_subquery(user_id)), + ] + if sources: + slugs = [s.strip() for s in sources if s.strip()] + if slugs: + filters.append(Source.slug.in_(slugs)) + if q and q.strip(): + like = f"%{q.strip()}%" + filters.append( + or_( + Article.title.ilike(like), + Article.body_text.ilike(like), + Article.title_zh.ilike(like), + Article.body_zh_text.ilike(like), + Article.summary_zh.ilike(like), + ) + ) + return filters + + @router.post("/reads/by-category", response_model=CategoryReadResponse) async def mark_category_read( body: CategoryReadRequest, @@ -273,84 +358,5 @@ async def list_reads( # - 返回 article_ids 给前端做乐观滑出动画 -def _unread_subquery(user_id: int): - """未读文章 id 子查询(per-user)— 复用 articles.py 的 NOT EXISTS 模式。""" - return ( - select(ArticleRead.article_id) - .where(ArticleRead.user_id == user_id, ArticleRead.article_id == Article.id) - .exists() - ) -def _escape_like(term: str) -> str: - """转义 ilike 的元字符,避免 category 含 % / _ 时被当通配符。""" - return term.replace("\\", "\\\\").replace("%", r"\%").replace("_", r"\_") - - -class CategoryReadRequest(BaseModel): - category: str = Field(..., min_length=1, max_length=32, description="要批量已读的分类 tag") - scope: Literal["all_unread", "filtered_unread"] = Field( - default="filtered_unread", - description="过滤范围:filtered_unread=遵守 sources/q;all_unread=忽略", - ) - window_hours: int = Field( - default=24, ge=1, le=168, description="只标 published_at 在最近 N 小时内的未读" - ) - sources: list[str] | None = Field(default=None, description="源 slug 列表,过滤范围") - q: str | None = Field(default=None, description="关键词过滤(标题/正文模糊)") - - -class CategoryReadResponse(BaseModel): - category: str - matched: int # 命中的未读数 - marked: int # 实际新标已读数(去重后) - article_ids: list[int] # 给前端做滑出动画 - - -class CategoryCountItem(BaseModel): - category: str - unread_count: int - window_hours: int - - -def _build_category_filter( - category: str, - window_hours: int, - user_id: int, - sources: list[str] | None = None, - q: str | None = None, -): - """构造"分类 + 时间窗 + 未读"三合一 filter。 - - - category 用 ilike 模糊匹配(逗号分隔串中含此 tag 即可) - - window_hours 通过 published_at 过滤 - - 用 NOT EXISTS 排除当前用户已读 - - sources / q 可选,跟 articles.py 列表查询口径一致 - """ - pattern = f"%{_escape_like(category)}%" - # 用 text() 拼 interval,避免 make_interval 的 PG 函数参数绑定在某些 - # SQLAlchemy 版本下的兼容性问题。window_hours 是 int,只来自我们自己的 - # endpoint,不是用户原始字符串,安全。 - interval_sql = text(f"interval '{int(window_hours)} hours'") - filters = [ - Article.category.ilike(pattern, escape="\\"), - Article.published_at >= func.now() - interval_sql, - not_(_unread_subquery(user_id)), - ] - if sources: - slugs = [s.strip() for s in sources if s.strip()] - if slugs: - filters.append(Source.slug.in_(slugs)) - if q and q.strip(): - like = f"%{q.strip()}%" - filters.append( - or_( - Article.title.ilike(like), - Article.body_text.ilike(like), - Article.title_zh.ilike(like), - Article.body_zh_text.ilike(like), - Article.summary_zh.ilike(like), - ) - ) - return filters -