fix: API 全部改用显式两步走 await session.execute + result.scalars()

之前 (await ...).scalars() 链式在 SQLAlchemy 2.0 async 下报
'coroutine' has no attribute 'scalars' 错误。改为先 await 拿 result
再 .scalars(),这是 SQLAlchemy 2.0 推荐的 async 写法。
This commit is contained in:
Mavis
2026-06-07 23:22:56 +08:00
parent 2e75985a3c
commit 5109d6f824
8 changed files with 147 additions and 36 deletions

View File

@@ -29,7 +29,9 @@ router = APIRouter(prefix="/admin", tags=["admin"], dependencies=[Depends(requir
# === Source CRUD === # === Source CRUD ===
@router.get("/sources", response_model=list[SourceOut]) @router.get("/sources", response_model=list[SourceOut])
async def list_sources_all(session: AsyncSession = Depends(get_session)): async def list_sources_all(session: AsyncSession = Depends(get_session)):
rows = (await session.execute(select(Source).order_by(Source.id))).scalars() result = await session.execute(select(Source).order_by(Source.id))
rows = result.scalars()
return [SourceOut.model_validate(s) for s in rows] return [SourceOut.model_validate(s) for s in rows]
@@ -65,7 +67,7 @@ async def update_source(
body: SourceUpdate, body: SourceUpdate,
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
): ):
src = (await session.execute(select(Source).where(Source.id == source_id))).scalar_one_or_none() result = await session.execute(select(Source).where(Source.id == source_id))).scalar_one_or_none()
if not src: if not src:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Source not found") raise HTTPException(status.HTTP_404_NOT_FOUND, "Source not found")
for k, v in body.model_dump(exclude_unset=True).items(): for k, v in body.model_dump(exclude_unset=True).items():
@@ -155,7 +157,8 @@ class HealthOut(BaseModel):
@router.get("/health", response_model=list[HealthOut]) @router.get("/health", response_model=list[HealthOut])
async def health(session: AsyncSession = Depends(get_session)): async def health(session: AsyncSession = Depends(get_session)):
rows = (await session.execute(select(Source).order_by(Source.priority.desc()))).scalars() result = await session.execute(select(Source).order_by(Source.priority.desc()))
rows = result.scalars()
out: list[HealthOut] = [] out: list[HealthOut] = []
for s in rows: for s in rows:
c24 = ( c24 = (

View File

@@ -34,11 +34,8 @@ def _pair_for(user: User) -> TokenPair:
@router.post("/login", response_model=TokenPair) @router.post("/login", response_model=TokenPair)
async def login(body: LoginRequest, session: AsyncSession = Depends(get_session)): async def login(body: LoginRequest, session: AsyncSession = Depends(get_session)):
user = ( result = await session.execute(select(User).where(User.username == body.username))
await session.execute(select(User).where(User.username == body.username)) user = result.scalars().first()
.scalars()
.first()
)
if not user or not user.is_active or not verify_password(body.password, user.password_hash): if not user or not user.is_active or not verify_password(body.password, user.password_hash):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid credentials") raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid credentials")
user.last_login_at = datetime.now(timezone.utc) user.last_login_at = datetime.now(timezone.utc)
@@ -55,11 +52,8 @@ async def refresh(body: RefreshRequest, session: AsyncSession = Depends(get_sess
uid = int(payload["sub"]) uid = int(payload["sub"])
except (InvalidTokenError, KeyError, ValueError): except (InvalidTokenError, KeyError, ValueError):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid refresh token") raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid refresh token")
user = ( result = await session.execute(select(User).where(User.id == uid, User.is_active.is_(True)))
await session.execute(select(User).where(User.id == uid, User.is_active.is_(True))) user = result.scalars().first()
.scalars()
.first()
)
if not user: if not user:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "User not found") raise HTTPException(status.HTTP_401_UNAUTHORIZED, "User not found")
return _pair_for(user) return _pair_for(user)

View File

@@ -65,9 +65,8 @@ async def list_mine(
user: User = Depends(get_current_user), user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
): ):
rows = ( result = await session.execute(
await session.execute( select(Bookmark).where(Bookmark.user_id == user.id).order_by(Bookmark.created_at.desc())
select(Bookmark).where(Bookmark.user_id == user.id).order_by(Bookmark.created_at.desc()) )
) rows = result.scalars()
).scalars()
return [BookmarkOut.model_validate(b) for b in rows] return [BookmarkOut.model_validate(b) for b in rows]

View File

@@ -19,7 +19,6 @@ async def list_sources(
user: User = Depends(get_current_user), # noqa: ARG001 user: User = Depends(get_current_user), # noqa: ARG001
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
): ):
rows = ( result = await session.execute(select(Source).order_by(Source.priority.desc(), Source.name))
await session.execute(select(Source).order_by(Source.priority.desc(), Source.name)) rows = result.scalars()
).scalars()
return [SourceOut.model_validate(s) for s in rows] return [SourceOut.model_validate(s) for s in rows]

View File

@@ -38,13 +38,12 @@ async def list_mine(
user: User = Depends(get_current_user), user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
): ):
rows = ( result = await session.execute(
await session.execute( select(Subscription)
select(Subscription) .where(Subscription.user_id == user.id)
.where(Subscription.user_id == user.id) .order_by(Subscription.created_at.desc())
.order_by(Subscription.created_at.desc()) )
) rows = result.scalars()
).scalars()
return [SubscriptionOut.model_validate(s) for s in rows] return [SubscriptionOut.model_validate(s) for s in rows]

63
scripts/_fix_scalars.py Normal file
View File

@@ -0,0 +1,63 @@
"""批量修 API 文件:把 `(await ...).scalars()` 改成显式两步走。"""
import re
from pathlib import Path
api_dir = Path("D:/selftools/diary-news/backend/app/api")
files = list(api_dir.glob("*.py"))
# 模式 1: user = (await ...).scalars().first() (多行括号形式)
# 模式 2: rows = (await ...).scalars() (单行)
# 都改成 result = await ...; user = result.scalars().first() 这种
for f in files:
src = f.read_text(encoding="utf-8")
orig = src
changed = False
# 模式 1:跨行的( await ... ).scalars().first()
# 匹配:任意前缀(空白)+ ( 多行 await session.execute(...) ) .scalars() .first()
pat1 = re.compile(
r'(\s+)([\w_]+)\s*=\s*\(\s*\n'
r'(\s+)await\s+session\.execute\((.*?)\)\s*\n'
r'\s+\)\s*'
r'\.scalars\(\)\s*'
r'\.first\(\)',
re.DOTALL,
)
def repl1(m):
indent = m.group(1)
var = m.group(2)
inner_indent = m.group(3)
exec_arg = m.group(4)
return (
f"{indent}result = await session.execute({exec_arg})\n"
f"{indent}{var} = result.scalars().first()"
)
new = pat1.sub(repl1, src)
if new != src:
changed = True
src = new
# 模式 2:单行 (await session.execute(...)).scalars()
pat2 = re.compile(
r'(\s+)([\w_]+)\s*=\s*\(await\s+session\.execute\((.*?)\)\)\s*\.scalars\(\)',
re.DOTALL,
)
def repl2(m):
indent = m.group(1)
var = m.group(2)
exec_arg = m.group(3)
return (
f"{indent}result = await session.execute({exec_arg})\n"
f"{indent}{var} = result.scalars()"
)
new = pat2.sub(repl2, src)
if new != src:
changed = True
src = new
if changed:
f.write_text(src, encoding="utf-8")
print(f"[ok] {f.name}")
else:
print(f" {f.name}")

View File

@@ -16,13 +16,14 @@ def run(cmd, t=60):
return out return out
run("cd /srv/news && sudo -u news git pull --rebase 2>&1 | tail -3") run("cd /srv/news && sudo -u news git pull --rebase 2>&1 | tail -3")
run("cd /srv/news && sg docker -c 'docker compose up -d --force-recreate caddy' 2>&1 | tail -10") # 重启 caddy + api
time.sleep(5) run("cd /srv/news && sg docker -c 'docker compose up -d --force-recreate caddy api' 2>&1 | tail -8")
time.sleep(8)
# 测试路径 print("\n=== 验证 ===")
print("\n=== healthz 测试 ===") run("curl -s -o /dev/null -w 'healthz: %{http_code}\\n' http://localhost/api/v1/healthz")
run("curl -s -o /dev/null -w 'healthz: %{http_code}\\n' http://localhost/healthz") run("curl -s http://localhost/api/v1/healthz 2>&1")
run("curl -s -o /dev/null -w 'api/healthz: %{http_code}\\n' http://localhost/api/healthz") run("curl -s -o /dev/null -w 'articles (no auth): %{http_code}\\n' http://localhost/api/v1/articles")
run("curl -s -o /dev/null -w 'api/v1/articles: %{http_code}\\n' http://localhost/api/v1/articles") run("curl -s http://localhost/api/v1/articles 2>&1")
run("curl -s http://localhost/api/healthz 2>&1") run("curl -s -o /dev/null -w 'login OPTIONS: %{http_code}\\n' -X OPTIONS http://localhost/api/v1/auth/login")
c.close() c.close()

53
scripts/_smoke_test.py Normal file
View File

@@ -0,0 +1,53 @@
import os, paramiko, json, time
PW = os.environ["REMOTE_PASS"]
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
c.connect("207.57.129.228", port=19717, username="root", password=PW, timeout=15, allow_agent=False, look_for_keys=False)
def run(cmd, t=60):
si, so, se = c.exec_command(cmd, timeout=t)
out = so.read().decode("utf-8", "replace")
err = se.read().decode("utf-8", "replace")
rc = so.channel.recv_exit_status()
print(f"$ {cmd}")
if out: print(out, end="")
if err: print("[err]", err, end="", file=__import__("sys").stderr)
print(f" rc={rc}")
return out
# 1) 取 owner 密码
owner_pw = run("cat /root/.owner_pass").strip()
print(f"\n=== owner 密码: {owner_pw} ===\n")
# 2) 登录拿 token
login_cmd = f"""curl -s -X POST http://localhost/api/v1/auth/login -H 'Content-Type: application/json' -d '{{"username":"owner","password":"{owner_pw}"}}'"""
login_resp = run(login_cmd)
token = json.loads(login_resp).get("access_token", "")
print(f"\n=== token(前 40): {token[:40]}... ===\n")
# 3) 触发一次抓取
print("=== 触发一次抓取 ===")
run("cd /srv/news && sg docker -c \"docker compose exec -T worker python -c 'import asyncio; from app.workers.pipeline import run_once; asyncio.run(run_once())'\" 2>&1 | tail -30", t=180)
# 4) 等翻译完成,看文章数
time.sleep(10)
print("\n=== 查文章数 ===")
run("cd /srv/news && sg docker -c \"docker compose exec -T postgres psql -U news -d news -c 'SELECT count(*) FROM articles; SELECT count(*) FROM articles WHERE translation_status = '\\''ok'\\'';'\" 2>&1 | tail -10")
# 5) 拿一条翻译好的文章
print("\n=== 拉一条文章 ===")
list_cmd = f"curl -s -H 'Authorization: Bearer {token}' http://localhost/api/v1/articles?limit=1"
art_resp = run(list_cmd)
try:
data = json.loads(art_resp)
items = data.get("items", [])
if items:
art = items[0]
print(f" id: {art['id']}")
print(f" source: {art['source']['name']}")
print(f" title (en): {art['title'][:80]}")
print(f" title (zh): {art.get('title_zh', '(none)')[:80] if art.get('title_zh') else '(none)'}")
print(f" status: {art['translation_status']}")
except Exception as e:
print(f"parse err: {e}\n raw: {art_resp[:300]}")
c.close()