fix(me): 把 Pydantic class 移到路由之前,避免 NameError on import
上一次的路由重排把 @router.post(/reads/by-category) 挪到了 mark_read 之前, 但 Pydantic class (CategoryReadRequest/Response/CategoryCountItem) 还在文件底部。 Python 装饰器在 import 阶段立即执行,@router.post(..., response_model=ClassName) 会立刻查找 ClassName — 找不到就 NameError: NameError: name 'CategoryReadResponse' is not defined 修法:把 class 和 _build_category_filter helper 一起挪到路由装饰器之前。 Python 函数体内的名字查找是延迟的(运行时),但装饰器参数是立即的。
This commit is contained in:
@@ -90,6 +90,91 @@ async def usage(
|
||||
# `/reads/{article_id}` **之前**声明,否则 FastAPI 会把 "by-category" / "category-count"
|
||||
# 当成 article_id 匹配到 mark_read,然后 int 转换失败返 422。
|
||||
# FastAPI 路由匹配是"先注册先匹配"(按装饰器执行顺序),不是按特异度。
|
||||
# ⚠️ Python 装饰器在 import 阶段立即执行,所以 @router.post(..., response_model=...)
|
||||
# 引用到的 Pydantic class 必须在路由函数定义前就存在于 module globals。
|
||||
# 下面的 CategoryReadRequest / CategoryReadResponse / CategoryCountItem 必须放在这里。
|
||||
def _unread_subquery(user_id: int):
|
||||
"""未读文章 id 子查询(per-user)— 复用 articles.py 的 NOT EXISTS 模式。"""
|
||||
return (
|
||||
select(ArticleRead.article_id)
|
||||
.where(ArticleRead.user_id == user_id, ArticleRead.article_id == Article.id)
|
||||
.exists()
|
||||
)
|
||||
|
||||
|
||||
def _escape_like(term: str) -> str:
|
||||
"""转义 ilike 的元字符,避免 category 含 % / _ 时被当通配符。"""
|
||||
return term.replace("\\", "\\\\").replace("%", r"\%").replace("_", r"\_")
|
||||
|
||||
|
||||
class CategoryReadRequest(BaseModel):
|
||||
category: str = Field(..., min_length=1, max_length=32, description="要批量已读的分类 tag")
|
||||
scope: Literal["all_unread", "filtered_unread"] = Field(
|
||||
default="filtered_unread",
|
||||
description="过滤范围:filtered_unread=遵守 sources/q;all_unread=忽略",
|
||||
)
|
||||
window_hours: int = Field(
|
||||
default=24, ge=1, le=168, description="只标 published_at 在最近 N 小时内的未读"
|
||||
)
|
||||
sources: list[str] | None = Field(default=None, description="源 slug 列表,过滤范围")
|
||||
q: str | None = Field(default=None, description="关键词过滤(标题/正文模糊)")
|
||||
|
||||
|
||||
class CategoryReadResponse(BaseModel):
|
||||
category: str
|
||||
matched: int # 命中的未读数
|
||||
marked: int # 实际新标已读数(去重后)
|
||||
article_ids: list[int] # 给前端做滑出动画
|
||||
|
||||
|
||||
class CategoryCountItem(BaseModel):
|
||||
category: str
|
||||
unread_count: int
|
||||
window_hours: int
|
||||
|
||||
|
||||
def _build_category_filter(
|
||||
category: str,
|
||||
window_hours: int,
|
||||
user_id: int,
|
||||
sources: list[str] | None = None,
|
||||
q: str | None = None,
|
||||
):
|
||||
"""构造"分类 + 时间窗 + 未读"三合一 filter。
|
||||
|
||||
- category 用 ilike 模糊匹配(逗号分隔串中含此 tag 即可)
|
||||
- window_hours 通过 published_at 过滤
|
||||
- 用 NOT EXISTS 排除当前用户已读
|
||||
- sources / q 可选,跟 articles.py 列表查询口径一致
|
||||
"""
|
||||
pattern = f"%{_escape_like(category)}%"
|
||||
# 用 text() 拼 interval,避免 make_interval 的 PG 函数参数绑定在某些
|
||||
# SQLAlchemy 版本下的兼容性问题。window_hours 是 int,只来自我们自己的
|
||||
# endpoint,不是用户原始字符串,安全。
|
||||
interval_sql = text(f"interval '{int(window_hours)} hours'")
|
||||
filters = [
|
||||
Article.category.ilike(pattern, escape="\\"),
|
||||
Article.published_at >= func.now() - interval_sql,
|
||||
not_(_unread_subquery(user_id)),
|
||||
]
|
||||
if sources:
|
||||
slugs = [s.strip() for s in sources if s.strip()]
|
||||
if slugs:
|
||||
filters.append(Source.slug.in_(slugs))
|
||||
if q and q.strip():
|
||||
like = f"%{q.strip()}%"
|
||||
filters.append(
|
||||
or_(
|
||||
Article.title.ilike(like),
|
||||
Article.body_text.ilike(like),
|
||||
Article.title_zh.ilike(like),
|
||||
Article.body_zh_text.ilike(like),
|
||||
Article.summary_zh.ilike(like),
|
||||
)
|
||||
)
|
||||
return filters
|
||||
|
||||
|
||||
@router.post("/reads/by-category", response_model=CategoryReadResponse)
|
||||
async def mark_category_read(
|
||||
body: CategoryReadRequest,
|
||||
@@ -273,84 +358,5 @@ async def list_reads(
|
||||
# - 返回 article_ids 给前端做乐观滑出动画
|
||||
|
||||
|
||||
def _unread_subquery(user_id: int):
|
||||
"""未读文章 id 子查询(per-user)— 复用 articles.py 的 NOT EXISTS 模式。"""
|
||||
return (
|
||||
select(ArticleRead.article_id)
|
||||
.where(ArticleRead.user_id == user_id, ArticleRead.article_id == Article.id)
|
||||
.exists()
|
||||
)
|
||||
|
||||
|
||||
def _escape_like(term: str) -> str:
|
||||
"""转义 ilike 的元字符,避免 category 含 % / _ 时被当通配符。"""
|
||||
return term.replace("\\", "\\\\").replace("%", r"\%").replace("_", r"\_")
|
||||
|
||||
|
||||
class CategoryReadRequest(BaseModel):
|
||||
category: str = Field(..., min_length=1, max_length=32, description="要批量已读的分类 tag")
|
||||
scope: Literal["all_unread", "filtered_unread"] = Field(
|
||||
default="filtered_unread",
|
||||
description="过滤范围:filtered_unread=遵守 sources/q;all_unread=忽略",
|
||||
)
|
||||
window_hours: int = Field(
|
||||
default=24, ge=1, le=168, description="只标 published_at 在最近 N 小时内的未读"
|
||||
)
|
||||
sources: list[str] | None = Field(default=None, description="源 slug 列表,过滤范围")
|
||||
q: str | None = Field(default=None, description="关键词过滤(标题/正文模糊)")
|
||||
|
||||
|
||||
class CategoryReadResponse(BaseModel):
|
||||
category: str
|
||||
matched: int # 命中的未读数
|
||||
marked: int # 实际新标已读数(去重后)
|
||||
article_ids: list[int] # 给前端做滑出动画
|
||||
|
||||
|
||||
class CategoryCountItem(BaseModel):
|
||||
category: str
|
||||
unread_count: int
|
||||
window_hours: int
|
||||
|
||||
|
||||
def _build_category_filter(
|
||||
category: str,
|
||||
window_hours: int,
|
||||
user_id: int,
|
||||
sources: list[str] | None = None,
|
||||
q: str | None = None,
|
||||
):
|
||||
"""构造"分类 + 时间窗 + 未读"三合一 filter。
|
||||
|
||||
- category 用 ilike 模糊匹配(逗号分隔串中含此 tag 即可)
|
||||
- window_hours 通过 published_at 过滤
|
||||
- 用 NOT EXISTS 排除当前用户已读
|
||||
- sources / q 可选,跟 articles.py 列表查询口径一致
|
||||
"""
|
||||
pattern = f"%{_escape_like(category)}%"
|
||||
# 用 text() 拼 interval,避免 make_interval 的 PG 函数参数绑定在某些
|
||||
# SQLAlchemy 版本下的兼容性问题。window_hours 是 int,只来自我们自己的
|
||||
# endpoint,不是用户原始字符串,安全。
|
||||
interval_sql = text(f"interval '{int(window_hours)} hours'")
|
||||
filters = [
|
||||
Article.category.ilike(pattern, escape="\\"),
|
||||
Article.published_at >= func.now() - interval_sql,
|
||||
not_(_unread_subquery(user_id)),
|
||||
]
|
||||
if sources:
|
||||
slugs = [s.strip() for s in sources if s.strip()]
|
||||
if slugs:
|
||||
filters.append(Source.slug.in_(slugs))
|
||||
if q and q.strip():
|
||||
like = f"%{q.strip()}%"
|
||||
filters.append(
|
||||
or_(
|
||||
Article.title.ilike(like),
|
||||
Article.body_text.ilike(like),
|
||||
Article.title_zh.ilike(like),
|
||||
Article.body_zh_text.ilike(like),
|
||||
Article.summary_zh.ilike(like),
|
||||
)
|
||||
)
|
||||
return filters
|
||||
|
||||
|
||||
Reference in New Issue
Block a user