maubot-media/media_bot/clients/downloads.py
Maddox bec9a1b8e7 v0.2.0: discovery, downloads control, numbered selection
Emby:
- !media find <q> — search the existing library
- !media resume — continue-watching list (with progress %)
- !media random [movie|tv] — random unwatched pick

Sonarr/Radarr:
- !media health — active warnings on either arr

Downloads:
- !media speed — aggregate down/up across NZBGet + qBt
- !media completed — finished in the last 24h
- !media pause / !media unpause — global pause/resume

QoL:
- Numbered selection: !media search dune then !media request 2
- Optional per-user defaults: default_media_type, result_count
2026-04-28 17:57:50 -04:00

109 lines
4.3 KiB
Python

"""NZBGet (JSON-RPC over HTTP basic) and qBittorrent (cookie session) clients."""
from __future__ import annotations
import aiohttp
class DownloadError(RuntimeError):
pass
class NzbgetClient:
def __init__(self, session: aiohttp.ClientSession, base_url: str,
username: str, password: str) -> None:
self.session = session
self.base = base_url.rstrip("/")
self.auth = aiohttp.BasicAuth(username, password)
async def _rpc(self, method: str, params: list | None = None) -> dict | list:
body = {"method": method, "params": params or [], "id": 1}
async with self.session.post(f"{self.base}/jsonrpc", json=body, auth=self.auth) as r:
if r.status >= 400:
raise DownloadError(f"NZBGet {method}{r.status}: {(await r.text())[:200]}")
data = await r.json()
if "error" in data and data["error"]:
raise DownloadError(f"NZBGet {method}: {data['error']}")
return data.get("result", [])
async def listgroups(self) -> list[dict]:
return await self._rpc("listgroups") # type: ignore[return-value]
async def status(self) -> dict:
return await self._rpc("status") # type: ignore[return-value]
async def history(self, hidden: bool = False) -> list[dict]:
return await self._rpc("history", [hidden]) # type: ignore[return-value]
async def pause(self) -> bool:
return bool(await self._rpc("pausedownload"))
async def unpause(self) -> bool:
return bool(await self._rpc("resumedownload"))
class QbtClient:
def __init__(self, session: aiohttp.ClientSession, base_url: str,
username: str, password: str) -> None:
self.session = session
self.base = base_url.rstrip("/")
self.username = username
self.password = password
self._logged_in = False
async def _login(self) -> None:
async with self.session.post(
f"{self.base}/api/v2/auth/login",
data={"username": self.username, "password": self.password},
headers={"Referer": self.base},
) as r:
text = (await r.text()).strip()
if r.status >= 400 or text != "Ok.":
raise DownloadError(f"qBt login → {r.status}: {text[:200]}")
self._logged_in = True
async def _get(self, path: str, params: dict | None = None) -> list | dict:
if not self._logged_in:
await self._login()
async with self.session.get(f"{self.base}{path}", params=params) as r:
if r.status == 403:
self._logged_in = False
await self._login()
async with self.session.get(f"{self.base}{path}", params=params) as r2:
if r2.status >= 400:
raise DownloadError(f"qBt GET {path}{r2.status}")
return await r2.json()
if r.status >= 400:
raise DownloadError(f"qBt GET {path}{r.status}: {(await r.text())[:200]}")
return await r.json()
async def downloading(self) -> list[dict]:
data = await self._get("/api/v2/torrents/info", params={"filter": "downloading"})
return data if isinstance(data, list) else []
async def all_torrents(self) -> list[dict]:
data = await self._get("/api/v2/torrents/info")
return data if isinstance(data, list) else []
async def transfer_info(self) -> dict:
data = await self._get("/api/v2/transfer/info")
return data if isinstance(data, dict) else {}
async def _post(self, path: str, data: dict | None = None) -> str:
if not self._logged_in:
await self._login()
async with self.session.post(f"{self.base}{path}", data=data,
headers={"Referer": self.base}) as r:
if r.status == 403:
self._logged_in = False
await self._login()
async with self.session.post(f"{self.base}{path}", data=data,
headers={"Referer": self.base}) as r2:
return await r2.text()
return await r.text()
async def pause_all(self) -> None:
await self._post("/api/v2/torrents/pause", {"hashes": "all"})
async def resume_all(self) -> None:
await self._post("/api/v2/torrents/resume", {"hashes": "all"})