maubot-media/media_bot/clients/seerr.py
Maddox 8c62c9fd31 Initial commit: media bot v0.1.0
Maubot plugin: Matrix companion for the homelab media stack.

Services wrapped:
- Seerr: search, request, requests, trending
- Emby: nowplaying, recent, watched
- Sonarr/Radarr: queue, upcoming, missing
- NZBGet/qBittorrent: activity

Each Matrix sender is mapped to per-service user IDs via plugin config;
unmapped senders are rejected. Replies are MXID-prefixed for shared rooms.
2026-04-28 08:22:38 -04:00

57 lines
2.3 KiB
Python

"""Seerr (Overseerr fork) HTTP client.
Docs reference (Overseerr-compatible):
GET /api/v1/search?query=...
POST /api/v1/request {"mediaType":"movie|tv","mediaId":<tmdb>,"userId":<id>}
GET /api/v1/user/{id}/requests?take=10&filter=pending,processing
GET /api/v1/discover/trending
"""
from __future__ import annotations
import aiohttp
class SeerrError(RuntimeError):
pass
class SeerrClient:
def __init__(self, session: aiohttp.ClientSession, base_url: str, api_key: str) -> None:
self.session = session
self.base = base_url.rstrip("/")
self.headers = {"X-Api-Key": api_key, "Accept": "application/json"}
async def _get(self, path: str, params: dict | None = None) -> dict | list:
async with self.session.get(f"{self.base}{path}", headers=self.headers, params=params) as r:
if r.status >= 400:
raise SeerrError(f"GET {path}{r.status}: {(await r.text())[:200]}")
return await r.json()
async def _post(self, path: str, body: dict) -> dict:
async with self.session.post(f"{self.base}{path}", headers=self.headers, json=body) as r:
if r.status >= 400:
raise SeerrError(f"POST {path}{r.status}: {(await r.text())[:200]}")
return await r.json()
async def search(self, query: str) -> list[dict]:
data = await self._get("/api/v1/search", params={"query": query})
return (data or {}).get("results", []) if isinstance(data, dict) else (data or [])
async def request(self, media_type: str, tmdb_id: int, user_id: int, *,
seasons: str | int = "all") -> dict:
body: dict = {"mediaType": media_type, "mediaId": tmdb_id, "userId": user_id}
if media_type == "tv":
body["seasons"] = seasons
return await self._post("/api/v1/request", body)
async def user_requests(self, user_id: int, take: int = 10) -> list[dict]:
data = await self._get(
f"/api/v1/user/{user_id}/requests",
params={"take": take, "filter": "pending,processing"},
)
return (data or {}).get("results", []) if isinstance(data, dict) else (data or [])
async def trending(self) -> list[dict]:
data = await self._get("/api/v1/discover/trending")
return (data or {}).get("results", []) if isinstance(data, dict) else (data or [])