Files
diary-news/backend/app/models/source.py

69 lines
2.4 KiB
Python
Raw Normal View History

"""采集源模型。"""
from __future__ import annotations
import enum
from datetime import datetime
from sqlalchemy import (
JSON,
Boolean,
DateTime,
Enum,
Integer,
String,
Text,
func,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
class SourceKind(str, enum.Enum):
RSS = "rss"
HTML_LIST = "html_list"
TG_CHANNEL = "tg_channel"
class Source(Base):
__tablename__ = "sources"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(128), nullable=False)
slug: Mapped[str] = mapped_column(String(128), unique=True, index=True, nullable=False)
kind: Mapped[SourceKind] = mapped_column(
Enum(
SourceKind,
name="source_kind",
values_callable=lambda x: [e.value for e in x],
),
default=SourceKind.RSS,
nullable=False,
)
url: Mapped[str] = mapped_column(Text, nullable=False)
detail_selector: Mapped[dict | None] = mapped_column(JSON)
fetch_interval_min: Mapped[int] = mapped_column(Integer, default=60, nullable=False)
fetch_cron: Mapped[str | None] = mapped_column(String(64)) # 5 段 cron
translate_to: Mapped[str] = mapped_column(String(8), default="zh", nullable=False)
enabled: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
region: Mapped[str | None] = mapped_column(String(32), index=True)
language_src: Mapped[str | None] = mapped_column(String(8))
priority: Mapped[int] = mapped_column(Integer, default=50, nullable=False, index=True)
headers_json: Mapped[dict | None] = mapped_column(JSON)
last_fetched_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
last_status: Mapped[str | None] = mapped_column(String(64))
consecutive_failures: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False
)
articles: Mapped[list["Article"]] = relationship( # noqa: F821
back_populates="source", cascade="all, delete-orphan", lazy="noload"
)
def __repr__(self) -> str:
return f"<Source id={self.id} slug={self.slug} kind={self.kind.value}>"