"""NZBGet (JSON-RPC over HTTP basic) and qBittorrent (cookie session) clients.""" from __future__ import annotations import aiohttp class DownloadError(RuntimeError): pass class NzbgetClient: def __init__(self, session: aiohttp.ClientSession, base_url: str, username: str, password: str) -> None: self.session = session self.base = base_url.rstrip("/") self.auth = aiohttp.BasicAuth(username, password) async def _rpc(self, method: str, params: list | None = None) -> dict | list: body = {"method": method, "params": params or [], "id": 1} async with self.session.post(f"{self.base}/jsonrpc", json=body, auth=self.auth) as r: if r.status >= 400: raise DownloadError(f"NZBGet {method} → {r.status}: {(await r.text())[:200]}") data = await r.json() if "error" in data and data["error"]: raise DownloadError(f"NZBGet {method}: {data['error']}") return data.get("result", []) async def listgroups(self) -> list[dict]: return await self._rpc("listgroups") # type: ignore[return-value] async def status(self) -> dict: return await self._rpc("status") # type: ignore[return-value] async def history(self, hidden: bool = False) -> list[dict]: return await self._rpc("history", [hidden]) # type: ignore[return-value] async def pause(self) -> bool: return bool(await self._rpc("pausedownload")) async def unpause(self) -> bool: return bool(await self._rpc("resumedownload")) class QbtClient: def __init__(self, session: aiohttp.ClientSession, base_url: str, username: str, password: str) -> None: self.session = session self.base = base_url.rstrip("/") self.username = username self.password = password self._logged_in = False async def _login(self) -> None: async with self.session.post( f"{self.base}/api/v2/auth/login", data={"username": self.username, "password": self.password}, headers={"Referer": self.base}, ) as r: text = (await r.text()).strip() if r.status >= 400 or text != "Ok.": raise DownloadError(f"qBt login → {r.status}: {text[:200]}") self._logged_in = True async def _get(self, path: str, params: dict | None = None) -> list | dict: if not self._logged_in: await self._login() async with self.session.get(f"{self.base}{path}", params=params) as r: if r.status == 403: self._logged_in = False await self._login() async with self.session.get(f"{self.base}{path}", params=params) as r2: if r2.status >= 400: raise DownloadError(f"qBt GET {path} → {r2.status}") return await r2.json() if r.status >= 400: raise DownloadError(f"qBt GET {path} → {r.status}: {(await r.text())[:200]}") return await r.json() async def downloading(self) -> list[dict]: data = await self._get("/api/v2/torrents/info", params={"filter": "downloading"}) return data if isinstance(data, list) else [] async def all_torrents(self) -> list[dict]: data = await self._get("/api/v2/torrents/info") return data if isinstance(data, list) else [] async def transfer_info(self) -> dict: data = await self._get("/api/v2/transfer/info") return data if isinstance(data, dict) else {} async def _post(self, path: str, data: dict | None = None) -> str: if not self._logged_in: await self._login() async with self.session.post(f"{self.base}{path}", data=data, headers={"Referer": self.base}) as r: if r.status == 403: self._logged_in = False await self._login() async with self.session.post(f"{self.base}{path}", data=data, headers={"Referer": self.base}) as r2: return await r2.text() return await r.text() async def pause_all(self) -> None: await self._post("/api/v2/torrents/pause", {"hashes": "all"}) async def resume_all(self) -> None: await self._post("/api/v2/torrents/resume", {"hashes": "all"})