maubot-media/media_bot/clients/emby.py
Maddox ae624744e3 v0.2.0: discovery, downloads control, numbered selection
Emby:
- !media find <q> — search the existing library
- !media resume — continue-watching list (with progress %)
- !media random [movie|tv] — random unwatched pick

Sonarr/Radarr:
- !media health — active warnings on either arr

Downloads:
- !media speed — aggregate down/up across NZBGet + qBt
- !media completed — finished in the last 24h
- !media pause / !media unpause — global pause/resume

QoL:
- Numbered selection: !media search dune then !media request 2
- Optional per-user defaults: default_media_type, result_count
2026-04-28 17:57:50 -04:00

87 lines
3.6 KiB
Python

"""Emby HTTP client.
API key passed as ?api_key=... (also accepts X-Emby-Token header).
Base URL must include the /emby path prefix.
"""
from __future__ import annotations
import aiohttp
class EmbyError(RuntimeError):
pass
class EmbyClient:
def __init__(self, session: aiohttp.ClientSession, base_url: str, api_key: str) -> None:
self.session = session
self.base = base_url.rstrip("/")
self.api_key = api_key
async def _get(self, path: str, params: dict | None = None) -> dict | list:
merged = {"api_key": self.api_key, **(params or {})}
async with self.session.get(f"{self.base}{path}", params=merged) as r:
if r.status >= 400:
raise EmbyError(f"GET {path}{r.status}: {(await r.text())[:200]}")
return await r.json()
async def sessions(self) -> list[dict]:
data = await self._get("/Sessions")
return data if isinstance(data, list) else (data.get("Items") or [])
async def recently_added(self, user_id: str, limit: int = 10,
item_types: str | None = None) -> list[dict]:
params: dict = {"Limit": limit, "Fields": "PremiereDate,ProductionYear"}
if item_types:
params["IncludeItemTypes"] = item_types
data = await self._get(f"/Users/{user_id}/Items/Latest", params=params)
return data if isinstance(data, list) else (data.get("Items") or [])
async def user_played(self, user_id: str, limit: int = 10) -> list[dict]:
params = {
"IncludeItemTypes": "Movie,Episode",
"Recursive": "true",
"Filters": "IsPlayed",
"SortBy": "DatePlayed",
"SortOrder": "Descending",
"Limit": limit,
"Fields": "ProductionYear,SeriesName,IndexNumber,ParentIndexNumber",
}
data = await self._get(f"/Users/{user_id}/Items", params=params)
return (data or {}).get("Items", []) if isinstance(data, dict) else (data or [])
async def find(self, user_id: str, query: str, limit: int = 10) -> list[dict]:
"""Search the existing library — movies, series, episodes."""
params = {
"SearchTerm": query,
"IncludeItemTypes": "Movie,Series,Episode",
"Recursive": "true",
"Limit": limit,
"Fields": "ProductionYear,SeriesName,IndexNumber,ParentIndexNumber",
}
data = await self._get(f"/Users/{user_id}/Items", params=params)
return (data or {}).get("Items", []) if isinstance(data, dict) else (data or [])
async def resume(self, user_id: str, limit: int = 10) -> list[dict]:
"""Continue-watching list — partially watched items."""
params = {
"Limit": limit,
"Fields": "ProductionYear,SeriesName,IndexNumber,ParentIndexNumber",
}
data = await self._get(f"/Users/{user_id}/Items/Resume", params=params)
return (data or {}).get("Items", []) if isinstance(data, dict) else (data or [])
async def random_unplayed(self, user_id: str, item_type: str = "Movie") -> dict | None:
"""Pick one random unplayed item of the requested type."""
params = {
"IncludeItemTypes": item_type,
"Recursive": "true",
"Filters": "IsUnplayed",
"SortBy": "Random",
"Limit": 1,
"Fields": "ProductionYear,Overview",
}
data = await self._get(f"/Users/{user_id}/Items", params=params)
items = (data or {}).get("Items", []) if isinstance(data, dict) else (data or [])
return items[0] if items else None