Was only Sonarr+Radarr. Now also probes Lidarr (existing /health), Seerr (/api/v1/status), Emby (/System/Info/Public), NZBGet (status RPC) and qBittorrent (transfer/info). Each line shows ✅ healthy / ⚠️ warning (with detail) / ⏸️ paused / ❌ unreachable. NZBGet surfaces DownloadPaused; qBt surfaces connection_status (firewalled vs disconnected helps spot VPN issues quickly).
104 lines
4.4 KiB
Python
104 lines
4.4 KiB
Python
"""Emby HTTP client.
|
|
|
|
API key passed as ?api_key=... (also accepts X-Emby-Token header).
|
|
Base URL must include the /emby path prefix.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import aiohttp
|
|
|
|
|
|
class EmbyError(RuntimeError):
|
|
pass
|
|
|
|
|
|
class EmbyClient:
|
|
def __init__(self, session: aiohttp.ClientSession, base_url: str, api_key: str) -> None:
|
|
self.session = session
|
|
self.base = base_url.rstrip("/")
|
|
self.api_key = api_key
|
|
|
|
async def _get(self, path: str, params: dict | None = None) -> dict | list:
|
|
merged = {"api_key": self.api_key, **(params or {})}
|
|
async with self.session.get(f"{self.base}{path}", params=merged) as r:
|
|
if r.status >= 400:
|
|
raise EmbyError(f"GET {path} → {r.status}: {(await r.text())[:200]}")
|
|
return await r.json()
|
|
|
|
async def system_info(self) -> dict:
|
|
"""Reachability + version probe via /System/Info/Public (no auth needed)."""
|
|
data = await self._get("/System/Info/Public")
|
|
return data if isinstance(data, dict) else {}
|
|
|
|
async def sessions(self) -> list[dict]:
|
|
data = await self._get("/Sessions")
|
|
return data if isinstance(data, list) else (data.get("Items") or [])
|
|
|
|
async def recently_added(self, user_id: str, limit: int = 10,
|
|
item_types: str | None = None) -> list[dict]:
|
|
params: dict = {"Limit": limit, "Fields": "PremiereDate,ProductionYear"}
|
|
if item_types:
|
|
params["IncludeItemTypes"] = item_types
|
|
data = await self._get(f"/Users/{user_id}/Items/Latest", params=params)
|
|
return data if isinstance(data, list) else (data.get("Items") or [])
|
|
|
|
async def user_played(self, user_id: str, limit: int = 10) -> list[dict]:
|
|
params = {
|
|
"IncludeItemTypes": "Movie,Episode",
|
|
"Recursive": "true",
|
|
"Filters": "IsPlayed",
|
|
"SortBy": "DatePlayed",
|
|
"SortOrder": "Descending",
|
|
"Limit": limit,
|
|
"Fields": "ProductionYear,SeriesName,IndexNumber,ParentIndexNumber",
|
|
}
|
|
data = await self._get(f"/Users/{user_id}/Items", params=params)
|
|
return (data or {}).get("Items", []) if isinstance(data, dict) else (data or [])
|
|
|
|
async def find(self, user_id: str, query: str, limit: int = 10) -> list[dict]:
|
|
"""Search the existing library — movies, series, episodes."""
|
|
params = {
|
|
"SearchTerm": query,
|
|
"IncludeItemTypes": "Movie,Series,Episode",
|
|
"Recursive": "true",
|
|
"Limit": limit,
|
|
"Fields": "ProductionYear,SeriesName,IndexNumber,ParentIndexNumber",
|
|
}
|
|
data = await self._get(f"/Users/{user_id}/Items", params=params)
|
|
return (data or {}).get("Items", []) if isinstance(data, dict) else (data or [])
|
|
|
|
async def resume(self, user_id: str, limit: int = 10) -> list[dict]:
|
|
"""Continue-watching list — partially watched items."""
|
|
params = {
|
|
"Limit": limit,
|
|
"Fields": "ProductionYear,SeriesName,IndexNumber,ParentIndexNumber",
|
|
}
|
|
data = await self._get(f"/Users/{user_id}/Items/Resume", params=params)
|
|
return (data or {}).get("Items", []) if isinstance(data, dict) else (data or [])
|
|
|
|
async def random_unplayed(self, user_id: str, item_type: str = "Movie") -> dict | None:
|
|
"""Pick one random unplayed item of the requested type."""
|
|
params = {
|
|
"IncludeItemTypes": item_type,
|
|
"Recursive": "true",
|
|
"Filters": "IsUnplayed",
|
|
"SortBy": "Random",
|
|
"Limit": 1,
|
|
"Fields": "ProductionYear,Overview",
|
|
}
|
|
data = await self._get(f"/Users/{user_id}/Items", params=params)
|
|
items = (data or {}).get("Items", []) if isinstance(data, dict) else (data or [])
|
|
return items[0] if items else None
|
|
|
|
def poster_url(self, item: dict, max_width: int = 500) -> str | None:
|
|
"""Build a primary-image URL for an Emby item."""
|
|
item_id = item.get("Id")
|
|
if not item_id:
|
|
return None
|
|
# If the item has an ImageTags.Primary, the cached/CDN URL works without auth;
|
|
# otherwise fall back to the api_key form.
|
|
tag = (item.get("ImageTags") or {}).get("Primary")
|
|
if tag:
|
|
return f"{self.base}/Items/{item_id}/Images/Primary?tag={tag}&maxWidth={max_width}"
|
|
return f"{self.base}/Items/{item_id}/Images/Primary?api_key={self.api_key}&maxWidth={max_width}"
|