feat: initial MVP - FastAPI backend + Vue3 frontend + docker-compose

- backend: FastAPI + SQLAlchemy 2.0(async) + asyncpg + Alembic
- 7 API routes: auth/me/articles/sources/bookmarks/subscriptions/admin
- models: User/Source/Article/Bookmark/Subscription/ApiToken
- services: RSS fetcher (feedparser) + Tencent TMT translator with quota + cache + local NLLB fallback
- workers: APScheduler + asyncio pipeline (fetch -> dedupe -> insert -> translate)
- seed scripts: create_user, seed_sources (5 RSS: Reuters/BBC/Al Jazeera/NHK/DW)
- frontend: Vue 3 + Vite + Naive UI + Pinia + vue-router
- pages: Login, Feed (24h), ArticleDetail, Sources, Bookmarks, AdminSources
- deploy: docker-compose (postgres/redis/api/worker/frontend/caddy)
- docs: README, DEPLOY, architecture, acceptance
This commit is contained in:
Mavis
2026-06-07 21:51:01 +08:00
commit 60b062daf2
81 changed files with 5540 additions and 0 deletions

View File

@@ -0,0 +1 @@
"""Background workers (fetch + translate + scheduler)."""

View File

@@ -0,0 +1,112 @@
"""Worker 入口:启动调度器 + 异步任务。
`docker compose exec worker python -m app.workers`
"""
from __future__ import annotations
import asyncio
import logging
import signal
from datetime import datetime, timezone
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import select
from app.config import settings
from app.database import AsyncSessionLocal
from app.models.source import Source
from app.workers.pipeline import fetch_one_source, run_once
logger = logging.getLogger("news.worker")
logging.basicConfig(
level=settings.log_level,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
async def _rebuild_jobs(scheduler: AsyncIOScheduler) -> None:
"""从 sources 表动态构建 job(可热更新)。"""
scheduler.remove_all_jobs()
async with AsyncSessionLocal() as s:
rows = (await s.execute(select(Source).where(Source.enabled.is_(True)))).scalars()
sources = list(rows)
if not sources:
logger.warning("no enabled sources; scheduler idle")
return
for src in sources:
trigger = (
CronTrigger.from_crontab(src.fetch_cron)
if src.fetch_cron
else IntervalTrigger(minutes=src.fetch_interval_min)
)
scheduler.add_job(
fetch_one_source,
trigger=trigger,
args=[src.id],
id=f"src:{src.slug}",
replace_existing=True,
max_instances=1,
coalesce=True,
misfire_grace_time=300,
)
logger.info("scheduled %s every %s", src.slug, src.fetch_cron or f"{src.fetch_interval_min}m")
async def _daily_rebuild() -> None:
"""每天 00:30 重建 job 列表(支持运行时新增源)。"""
scheduler = AsyncIOScheduler()
# 临时实例,只为重建用
# 实际用全局 scheduler 实例
pass
def build_scheduler() -> AsyncIOScheduler:
sched = AsyncIOScheduler(timezone="Asia/Hong_Kong")
return sched
async def main() -> None:
scheduler = build_scheduler()
await _rebuild_jobs(scheduler)
# 每天 00:30 重建一次
scheduler.add_job(
_rebuild_jobs,
trigger=CronTrigger(hour=0, minute=30),
args=[scheduler],
id="rebuild_jobs",
replace_existing=True,
)
# 启动时立即跑一次
scheduler.add_job(
run_once,
trigger=IntervalTrigger(minutes=0),
id="startup_run",
next_run_time=datetime.now(timezone.utc),
)
scheduler.start()
logger.info("scheduler started with %d jobs", len(scheduler.get_jobs()))
stop = asyncio.Event()
def _signal_handler():
logger.info("shutdown signal received")
stop.set()
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, _signal_handler)
except NotImplementedError:
# Windows 等不支持
pass
await stop.wait()
logger.info("stopping scheduler")
scheduler.shutdown(wait=False)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,274 @@
"""核心 pipeline:
- 抓取(去重 + 入库)
- 翻译(分块 + 配额管理)
- 手动 run_once / fetch_one_source / translate_article
"""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timezone
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from app.config import settings
from app.database import AsyncSessionLocal
from app.models.article import Article
from app.models.source import Source, SourceKind
from app.services.fetchers import get_fetcher
from app.services.fetchers.base import FetchedItem, url_hash
from app.services.translation.service import service as translation_service
logger = logging.getLogger("news.pipeline")
TRANSLATE_BODY_MAX = 8000 # 单篇正文最大翻译字符
SEM_PER_SOURCE = asyncio.Semaphore(2) # 同一源抓取并发
# === 抓取 + 入库 ===
async def fetch_one_source(source_id: int) -> None:
async with SEM_PER_SOURCE:
async with AsyncSessionLocal() as session:
src = (
await session.execute(select(Source).where(Source.id == source_id))
).scalar_one_or_none()
if not src or not src.enabled:
logger.info("source %s disabled or missing", source_id)
return
try:
fetcher = get_fetcher(src.kind.value, url=src.url, headers=src.headers_json)
items = await fetcher.fetch()
except Exception as e:
logger.exception("fetch failed for %s: %s", src.slug, e)
await _mark_failure(source_id, f"fetch: {type(e).__name__}: {e}")
return
if not items:
await _mark_success(source_id, n_new=0)
return
n_new = await _bulk_insert(src, items)
await _mark_success(source_id, n_new=n_new)
logger.info("source %s: %d new articles", src.slug, n_new)
# 入库后,挑高优先级 / 没翻译的开始翻译
await _translate_recent_for_source(source_id, max_n=20)
async def _mark_failure(source_id: int, status: str) -> None:
async with AsyncSessionLocal() as session:
src = (
await session.execute(select(Source).where(Source.id == source_id))
).scalar_one_or_none()
if not src:
return
src.last_status = status
src.consecutive_failures += 1
src.last_fetched_at = datetime.now(timezone.utc)
if src.consecutive_failures >= settings.fetch_fail_pause_threshold:
# 退避:把 interval 翻倍,封顶 720 分钟
src.fetch_interval_min = min(720, src.fetch_interval_min * 2)
logger.warning(
"source %s paused, interval bumped to %dm",
src.slug,
src.fetch_interval_min,
)
await session.commit()
async def _mark_success(source_id: int, n_new: int) -> None:
async with AsyncSessionLocal() as session:
src = (
await session.execute(select(Source).where(Source.id == source_id))
).scalar_one_or_none()
if not src:
return
src.last_status = f"ok:new={n_new}"
src.consecutive_failures = 0
src.last_fetched_at = datetime.now(timezone.utc)
await session.commit()
async def _bulk_insert(src: Source, items: list[FetchedItem]) -> int:
"""用 PG ON CONFLICT DO NOTHING 去重;返回新插入行数。"""
if not items:
return 0
rows = []
for it in items:
if not it.title or not it.url:
continue
rows.append(
{
"source_id": src.id,
"url": it.url,
"url_hash": url_hash(it.url),
"guid": it.guid,
"title": it.title[:512],
"body_html": (it.body_html or "")[:65535],
"body_text": (it.body_text or "")[:65535],
"lang_src": it.lang or src.language_src,
"author": it.author,
"image_url": it.image_url,
"published_at": it.published_at,
"translation_status": "pending",
"translate_to": src.translate_to,
}
)
if not rows:
return 0
async with AsyncSessionLocal() as session:
stmt = (
pg_insert(Article)
.values(rows)
.on_conflict_do_nothing(index_elements=["url_hash"])
.returning(Article.id)
)
result = await session.execute(stmt)
inserted_ids = [r[0] for r in result.all()]
await session.commit()
return len(inserted_ids)
# === 翻译 ===
async def _translate_recent_for_source(source_id: int, max_n: int = 20) -> None:
async with AsyncSessionLocal() as session:
rows = (
await session.execute(
select(Article)
.where(Article.source_id == source_id, Article.translation_status == "pending")
.order_by(Article.published_at.desc().nullslast(), Article.id.desc())
.limit(max_n)
)
).scalars()
article_ids = [a.id for a in rows]
for aid in article_ids:
await translate_article(aid)
async def translate_article(article_id: int) -> None:
async with AsyncSessionLocal() as session:
art = (
await session.execute(select(Article).where(Article.id == article_id))
).scalar_one_or_none()
if not art:
return
if art.translation_status not in ("pending", "failed"):
return
title = art.title
body_text = (art.body_text or "")[:TRANSLATE_BODY_MAX]
lang_src = art.lang_src or "auto"
target = "zh"
article_id_ref = art.id
if not body_text and not title:
return
total_chars = 0
try:
# title
tr_title = await translation_service.translate(title, source=lang_src, target=target)
total_chars += tr_title.chars
# body 段落切分 + 重组
chunks = _chunk_text(body_text, max_chars=settings.tencent_tmt_max_chars_per_req)
translated_chunks: list[str] = []
for ch in chunks:
tr = await translation_service.translate(ch, source=lang_src, target=target)
total_chars += tr.chars
translated_chunks.append(tr.text)
tr_body = "\n\n".join(translated_chunks)
engine_label = "tencent"
status = "ok" if (tr_title.text and tr_body) else "partial"
except Exception as e:
logger.exception("translate article %s failed: %s", article_id, e)
async with AsyncSessionLocal() as session:
art = (
await session.execute(select(Article).where(Article.id == article_id))
).scalar_one_or_none()
if art:
art.translation_status = "failed"
await session.commit()
return
# 写回
async with AsyncSessionLocal() as session:
art = (
await session.execute(select(Article).where(Article.id == article_id_ref))
).scalar_one_or_none()
if art:
art.title_zh = tr_title.text if tr_title.text else None
art.body_zh_text = tr_body or None
art.body_zh_html = _wrap_html(tr_body) if tr_body else None
art.translation_status = status
art.translation_engine = engine_label
art.translation_chars = total_chars
art.translated_at = datetime.now(timezone.utc)
await session.commit()
logger.info("article %s translated: %d chars, %s", article_id, total_chars, engine_label)
def _chunk_text(text: str, max_chars: int) -> list[str]:
if not text:
return []
paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
chunks: list[str] = []
cur = ""
for p in paragraphs:
if len(p) > max_chars:
# 单段过长:按句号切
sentences = _split_long_para(p, max_chars)
for s in sentences:
if len(cur) + len(s) + 2 > max_chars:
if cur:
chunks.append(cur)
cur = s
else:
cur = (cur + "\n\n" + s).strip() if cur else s
else:
if len(cur) + len(p) + 2 > max_chars:
if cur:
chunks.append(cur)
cur = p
else:
cur = (cur + "\n\n" + p).strip() if cur else p
if cur:
chunks.append(cur)
return chunks
def _split_long_para(para: str, max_chars: int) -> list[str]:
parts: list[str] = []
cur = ""
for ch in para:
cur += ch
if ch in ".!?。!?" and len(cur) >= max_chars // 2:
parts.append(cur.strip())
cur = ""
if cur.strip():
parts.append(cur.strip())
if not parts:
return [para[:max_chars]]
return parts
def _wrap_html(text: str) -> str:
"""把译文包成 HTML 段落。"""
from bs4 import BeautifulSoup
parts = [f"<p>{p.strip()}</p>" for p in text.split("\n\n") if p.strip()]
return "\n".join(parts) if parts else ""
# === 全量跑(供测试 / 手动触发) ===
async def run_once() -> None:
async with AsyncSessionLocal() as session:
rows = (await session.execute(select(Source).where(Source.enabled.is_(True)))).scalars()
sources = list(rows)
logger.info("run_once: %d enabled sources", len(sources))
tasks = [fetch_one_source(s.id) for s in sources]
await asyncio.gather(*tasks, return_exceptions=True)