"""RSS / Atom fetcher(基于 feedparser)。""" from __future__ import annotations from datetime import datetime, timezone from email.utils import parsedate_to_datetime import feedparser from dateutil import parser as dtp from app.services.fetchers.base import BaseFetcher, FetchedItem class RSSFetcher(BaseFetcher): async def fetch(self) -> list[FetchedItem]: raw = await self._http_get() # feedparser 在不同 Python 下处理 bytes/str try: text = raw.decode("utf-8") except UnicodeDecodeError: text = raw.decode("utf-8", errors="replace") feed = feedparser.parse(text) if feed.bozo and not feed.entries: # 整篇解析失败 raise RuntimeError(f"RSS parse failed: {feed.bozo_exception}") items: list[FetchedItem] = [] for e in feed.entries: url = e.get("link") or e.get("id") if not url: continue title = (e.get("title") or "").strip() if not title: continue body_html = None body_text = "" if e.get("content"): # 选最长 content contents = sorted(e["content"], key=lambda c: -len(c.get("value", ""))) body_html = contents[0].get("value") if not body_html: body_html = e.get("summary") if body_html: from bs4 import BeautifulSoup soup = BeautifulSoup(body_html, "lxml") # 去 script/style for tag in soup(["script", "style", "noscript"]): tag.decompose() body_text = soup.get_text(separator="\n", strip=True) published_at = _parse_dt(e.get("published") or e.get("updated") or e.get("created")) author = e.get("author") image_url = None if e.get("media_content"): try: image_url = e["media_content"][0].get("url") except (IndexError, KeyError, TypeError): pass if not image_url and e.get("media_thumbnail"): try: image_url = e["media_thumbnail"][0].get("url") except (IndexError, KeyError, TypeError): pass if not image_url and e.get("enclosures"): for enc in e["enclosures"]: if enc.get("type", "").startswith("image/"): image_url = enc.get("href") or enc.get("url") break items.append( FetchedItem( url=url, title=title, body_html=body_html, body_text=body_text, published_at=published_at, lang=e.get("language") or feed.feed.get("language"), author=author, image_url=image_url, guid=e.get("id") or e.get("guid"), ) ) return items def _parse_dt(s: str | None) -> datetime | None: if not s: return None try: dt = dtp.parse(s) except (ValueError, TypeError, dtp.ParserError): try: dt = parsedate_to_datetime(s) except Exception: return None if dt is None: return None if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc)