maubot-media/media_bot/clients/seerr.py
Maddox 5e3cd82145 v0.5.3: !media health covers all 7 backing services
Was only Sonarr+Radarr. Now also probes Lidarr (existing /health), Seerr
(/api/v1/status), Emby (/System/Info/Public), NZBGet (status RPC) and
qBittorrent (transfer/info). Each line shows  healthy / ⚠️ warning
(with detail) / ⏸️ paused /  unreachable. NZBGet surfaces
DownloadPaused; qBt surfaces connection_status (firewalled vs disconnected
helps spot VPN issues quickly).
2026-05-03 14:40:11 -04:00

87 lines
3.5 KiB
Python

"""Seerr (Overseerr fork) HTTP client.
Docs reference (Overseerr-compatible):
GET /api/v1/search?query=...
POST /api/v1/request {"mediaType":"movie|tv","mediaId":<tmdb>,"userId":<id>}
GET /api/v1/user/{id}/requests?take=10&filter=pending,processing
GET /api/v1/discover/trending
"""
from __future__ import annotations
from typing import Optional
from urllib.parse import quote
import aiohttp
class SeerrError(RuntimeError):
pass
def _qs(params: dict) -> str:
"""RFC 3986 query string — Seerr rejects aiohttp's default '+' for spaces."""
return "&".join(f"{quote(k, safe='')}={quote(str(v), safe='')}" for k, v in params.items())
class SeerrClient:
def __init__(self, session: aiohttp.ClientSession, base_url: str, api_key: str) -> None:
self.session = session
self.base = base_url.rstrip("/")
self.headers = {"X-Api-Key": api_key, "Accept": "application/json"}
async def _get(self, path: str, params: dict | None = None) -> dict | list:
url = f"{self.base}{path}"
if params:
url += "?" + _qs(params)
async with self.session.get(url, headers=self.headers) as r:
if r.status >= 400:
raise SeerrError(f"GET {path}{r.status}: {(await r.text())[:200]}")
return await r.json()
async def _post(self, path: str, body: dict) -> dict:
async with self.session.post(f"{self.base}{path}", headers=self.headers, json=body) as r:
if r.status >= 400:
raise SeerrError(f"POST {path}{r.status}: {(await r.text())[:200]}")
return await r.json()
async def status(self) -> dict:
"""Return /api/v1/status — used as a reachability + version probe."""
data = await self._get("/api/v1/status")
return data if isinstance(data, dict) else {}
async def search(self, query: str) -> list[dict]:
data = await self._get("/api/v1/search", params={"query": query})
return (data or {}).get("results", []) if isinstance(data, dict) else (data or [])
async def request(self, media_type: str, tmdb_id: int, user_id: int, *,
seasons: str | int = "all") -> dict:
body: dict = {"mediaType": media_type, "mediaId": tmdb_id, "userId": user_id}
if media_type == "tv":
body["seasons"] = seasons
return await self._post("/api/v1/request", body)
async def user_requests(self, user_id: int, take: int = 10) -> list[dict]:
data = await self._get(
f"/api/v1/user/{user_id}/requests",
params={"take": take, "filter": "pending,processing"},
)
return (data or {}).get("results", []) if isinstance(data, dict) else (data or [])
async def trending(self) -> list[dict]:
data = await self._get("/api/v1/discover/trending")
return (data or {}).get("results", []) if isinstance(data, dict) else (data or [])
async def approve(self, request_id: int) -> dict:
return await self._post(f"/api/v1/request/{request_id}/approve", {})
async def decline(self, request_id: int) -> dict:
return await self._post(f"/api/v1/request/{request_id}/decline", {})
@staticmethod
def poster_url(item: dict, size: str = "w500") -> Optional[str]:
"""Build a TMDB poster URL from a Seerr search/result item."""
path = item.get("posterPath") or item.get("backdropPath")
if not path:
return None
return f"https://image.tmdb.org/t/p/{size}{path}"