Files
diary-news/backend/app/services/fetchers/rss.py

162 lines
5.9 KiB
Python
Raw Normal View History

"""RSS / Atom fetcher(基于 feedparser)。
增强: content 太短(< BODY_MIN_LEN) item,自动去 article URL 抓全文
trafilatura 抽取( RSS 摘要升级到全文)
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
import feedparser
import httpx
import trafilatura
from bs4 import BeautifulSoup
from dateutil import parser as dtp
from app.services.fetchers.base import BaseFetcher, FetchedItem
logger = logging.getLogger("news.fetcher.rss")
# 如果 RSS 给的 body 不到这个字符数,就自动去 article URL 抓全文
BODY_MIN_LEN = 500
class RSSFetcher(BaseFetcher):
async def fetch(self) -> list[FetchedItem]:
raw = await self._http_get()
# feedparser 在不同 Python 下处理 bytes/str
try:
text = raw.decode("utf-8")
except UnicodeDecodeError:
text = raw.decode("utf-8", errors="replace")
feed = feedparser.parse(text)
if feed.bozo and not feed.entries:
# 整篇解析失败
raise RuntimeError(f"RSS parse failed: {feed.bozo_exception}")
# 拿到 fetch 上下文
self._http_client: httpx.AsyncClient | None = None
items: list[FetchedItem] = []
for e in feed.entries:
url = e.get("link") or e.get("id")
if not url:
continue
title = (e.get("title") or "").strip()
if not title:
continue
body_html, body_text = self._extract_from_entry(e)
# body 太短:去 article URL 抓全文(trafilatura)
if len(body_text) < BODY_MIN_LEN and url:
full_html, full_text = await self._fetch_fulltext(url)
if full_text and len(full_text) > len(body_text):
body_text = full_text
body_html = full_html or body_html
published_at = _parse_dt(e.get("published") or e.get("updated") or e.get("created"))
author = e.get("author")
image_url = self._extract_image(e)
items.append(
FetchedItem(
url=url,
title=title,
body_html=body_html,
body_text=body_text,
published_at=published_at,
lang=e.get("language") or feed.feed.get("language"),
author=author,
image_url=image_url,
guid=e.get("id") or e.get("guid"),
)
)
if self._http_client is not None:
await self._http_client.aclose()
return items
@staticmethod
def _extract_from_entry(e) -> tuple[str | None, str]:
body_html = None
if e.get("content"):
contents = sorted(e["content"], key=lambda c: -len(c.get("value", "")))
body_html = contents[0].get("value")
if not body_html:
body_html = e.get("summary")
if not body_html:
return None, ""
soup = BeautifulSoup(body_html, "lxml")
for tag in soup(["script", "style", "noscript"]):
tag.decompose()
text = soup.get_text(separator="\n", strip=True)
return body_html, text
@staticmethod
def _extract_image(e) -> str | None:
if e.get("media_content"):
try:
return e["media_content"][0].get("url")
except (IndexError, KeyError, TypeError):
pass
if e.get("media_thumbnail"):
try:
return e["media_thumbnail"][0].get("url")
except (IndexError, KeyError, TypeError):
pass
if e.get("enclosures"):
for enc in e["enclosures"]:
if enc.get("type", "").startswith("image/"):
return enc.get("href") or enc.get("url")
return None
async def _fetch_fulltext(self, url: str) -> tuple[str | None, str]:
"""去 article URL 抓全文,用 trafilatura 抽正文。"""
try:
if self._http_client is None:
# 用真实浏览器 UA(很多站[如 NHK news.web]把爬虫 UA 直接 403)
self._http_client = httpx.AsyncClient(
follow_redirects=True,
timeout=20,
headers={
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Language": "ja,en-US;q=0.9,en;q=0.8,zh;q=0.7",
},
)
r = await self._http_client.get(url)
r.raise_for_status()
except Exception as e:
logger.warning("fulltext fetch failed for %s: %s", url, e)
return None, ""
try:
html = trafilatura.extract(r.text, include_comments=False, include_tables=False, favor_recall=True, output_format="html") or ""
text = trafilatura.extract(r.text, include_comments=False, include_tables=False, favor_recall=True, output_format="txt") or ""
except Exception as e:
logger.warning("trafilatura extract failed for %s: %s", url, e)
return None, ""
return html, text
def _parse_dt(s: str | None) -> datetime | None:
if not s:
return None
try:
dt = dtp.parse(s)
except (ValueError, TypeError, dtp.ParserError):
try:
dt = parsedate_to_datetime(s)
except Exception:
return None
if dt is None:
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)