diff --git a/base-config.yaml b/base-config.yaml index 86e4e88..9140334 100644 --- a/base-config.yaml +++ b/base-config.yaml @@ -33,6 +33,11 @@ qbittorrent: # Map Matrix user IDs to per-service user identifiers. # Senders not in this map get an "unauthorized" reply. +# +# Optional per-user defaults: +# default_media_type: "movie" | "tv" — used by `!media request ` when no +# --tv/--movie flag is given +# result_count: int — overrides default_results above user_map: "@maddox:fails.me": seerr_user_id: 1 diff --git a/maubot.yaml b/maubot.yaml index df48f16..94997e8 100644 --- a/maubot.yaml +++ b/maubot.yaml @@ -1,6 +1,6 @@ maubot: 0.3.1 id: com.3ddbrewery.media -version: 0.1.0 +version: 0.2.0 license: MIT modules: - media_bot diff --git a/media_bot/bot.py b/media_bot/bot.py index 8923eeb..28b193f 100644 --- a/media_bot/bot.py +++ b/media_bot/bot.py @@ -82,6 +82,11 @@ class MediaBot(Plugin): nzbget: NzbgetClient qbt: QbtClient + # Per-(room, sender) cache of last search/trending results — used by + # `!media request ` to skip re-typing the query. In-memory only; a bot + # restart clears it (acceptable, results are cheap to rebuild). + _search_cache: dict[tuple[str, str], list[dict]] + async def start(self) -> None: self.config.load_and_update() timeout = aiohttp.ClientTimeout(total=self.config["http_timeout"]) @@ -98,6 +103,7 @@ class MediaBot(Plugin): self.nzbget = NzbgetClient(self.session, n["url"], n["username"], n["password"]) self.qbt = QbtClient(self.session, q["url"], q["username"], q["password"]) + self._search_cache = {} self.log.info("Media bot started — users=%d", len(self.config["user_map"] or {})) async def stop(self) -> None: @@ -123,6 +129,18 @@ class MediaBot(Plugin): return True return False + def _user_pref(self, sender: str, key: str, default: Any) -> Any: + cfg = self._resolve_user(sender) or {} + val = cfg.get(key) + return val if val is not None else default + + def _result_count(self, sender: str) -> int: + return int(self._user_pref(sender, "result_count", self.config["default_results"])) + + def _stash_search(self, evt: MessageEvent, results: list[dict]) -> None: + """Cache search/trending results so the user can `!media request `.""" + self._search_cache[(evt.room_id, evt.sender)] = list(results) + # ---------- top-level command ---------- @command.new("media", aliases=["m"], help="Media stack bot — try `!media help`", @@ -136,19 +154,28 @@ class MediaBot(Plugin): msg = ( "**Media bot commands**\n\n" "*Search & request (Seerr)*\n" - "- `!media search ` — top results\n" + "- `!media search ` — top results (numbered)\n" "- `!media request [--tv|--movie]` — request top hit\n" + "- `!media request ` — pick item N from your last search/trending\n" "- `!media requests` — your pending/processing requests\n" - "- `!media trending` — what's trending\n\n" + "- `!media trending` — what's trending (numbered)\n\n" "*Library (Emby)*\n" "- `!media nowplaying` — active sessions\n" "- `!media recent [movies|tv]` — recently added\n" - "- `!media watched` — what you recently finished\n\n" - "*Downloads (Sonarr/Radarr/NZB/qBt)*\n" - "- `!media queue` — Sonarr + Radarr queue\n" - "- `!media activity` — NZBGet + qBt active downloads\n" + "- `!media watched` — what you recently finished\n" + "- `!media find ` — search the existing library\n" + "- `!media resume` — your continue-watching list\n" + "- `!media random [movie|tv]` — random unwatched pick\n\n" + "*Sonarr / Radarr*\n" + "- `!media queue` — combined queue\n" "- `!media upcoming` — Sonarr calendar (next 7 days)\n" "- `!media missing` — Sonarr wanted/missing\n" + "- `!media health` — active warnings on either arr\n\n" + "*Downloads (NZBGet + qBt)*\n" + "- `!media activity` — current downloads\n" + "- `!media completed` — finished in the last 24h\n" + "- `!media speed` — aggregate down/up speed\n" + "- `!media pause` / `!media unpause` — global pause/resume\n" ) await self._say(evt, msg) @@ -165,40 +192,64 @@ class MediaBot(Plugin): except SeerrError as ex: await self._say(evt, f"Search failed: {ex}") return + results = [r for r in results if r.get("mediaType") in ("movie", "tv")] if not results: await self._say(evt, f"No Seerr results for **{query}**.") return - n = self.config["default_results"] - lines = [f"**Top {min(n, len(results))} for '{query}':**"] - for r in results[:n]: - lines.append(self._fmt_seerr(r)) + n = self._result_count(evt.sender) + shown = results[:n] + self._stash_search(evt, shown) + lines = [f"**Top {len(shown)} for '{query}':**"] + for i, r in enumerate(shown, 1): + lines.append(f"{i}. " + self._fmt_seerr(r, leading_dash=False)) + lines.append("") + lines.append("_Pick one with `!media request `._") await self._say(evt, "\n".join(lines)) - @media.subcommand("request", help="Request the top hit; pass --tv or --movie to force type") + @media.subcommand("request", + help="Request a movie/show. Pass `` or a number from a recent search.") @command.argument("query", pass_raw=True, required=True) async def cmd_request(self, evt: MessageEvent, query: str) -> None: await evt.mark_read() if await self._reject_unmapped(evt): return + user_cfg = self._resolve_user(evt.sender) or {} + seerr_uid = user_cfg.get("seerr_user_id") + if not seerr_uid: + await self._say(evt, "Your Matrix user has no Seerr user_id mapped — ask maddox.") + return + + # Numbered selection: !media request 2 → 2nd item from last search/trending + stripped = query.strip() + if stripped.isdigit(): + cached = self._search_cache.get((evt.room_id, evt.sender)) or [] + idx = int(stripped) - 1 + if not cached: + await self._say(evt, "No recent search results to pick from. Run `!media search ` first.") + return + if idx < 0 or idx >= len(cached): + await self._say(evt, f"Pick a number 1-{len(cached)}.") + return + top = cached[idx] + await self._do_request(evt, seerr_uid, top) + return + + # Free-text path: optional --tv/--movie flag, else fall back to user default forced_type: Optional[str] = None - words = query.split() - cleaned = [] - for w in words: + cleaned: list[str] = [] + for w in stripped.split(): if w == "--tv": forced_type = "tv" elif w == "--movie": forced_type = "movie" else: cleaned.append(w) + if forced_type is None: + forced_type = user_cfg.get("default_media_type") q = " ".join(cleaned).strip() if not q: await self._say(evt, "Need a search query, e.g. `!media request dune --movie`.") return - user_cfg = self._resolve_user(evt.sender) - seerr_uid = user_cfg.get("seerr_user_id") if user_cfg else None - if not seerr_uid: - await self._say(evt, "Your Matrix user has no Seerr user_id mapped — ask maddox.") - return try: results = await self.seerr.search(q) except SeerrError as ex: @@ -210,12 +261,14 @@ class MediaBot(Plugin): if not candidates: await self._say(evt, f"No matching results for **{q}**.") return - top = candidates[0] - media_type = top["mediaType"] - tmdb_id = top.get("id") - title = top.get("title") or top.get("name") or q - if not tmdb_id: - await self._say(evt, f"Top result for **{q}** has no TMDB id — try `!media search`.") + await self._do_request(evt, seerr_uid, candidates[0]) + + async def _do_request(self, evt: MessageEvent, seerr_uid: int, item: dict) -> None: + media_type = item.get("mediaType") + tmdb_id = item.get("id") + title = item.get("title") or item.get("name") or "?" + if not (media_type and tmdb_id): + await self._say(evt, f"**{title}** is missing a media type or TMDB id — can't request.") return try: req = await self.seerr.request(media_type, tmdb_id, seerr_uid) @@ -263,17 +316,20 @@ class MediaBot(Plugin): except SeerrError as ex: await self._say(evt, f"Trending fetch failed: {ex}") return - n = self.config["default_results"] + n = self._result_count(evt.sender) results = [r for r in results if r.get("mediaType") in ("movie", "tv")][:n] if not results: await self._say(evt, "Nothing trending right now.") return + self._stash_search(evt, results) lines = [f"**Trending ({len(results)}):**"] - for r in results: - lines.append(self._fmt_seerr(r)) + for i, r in enumerate(results, 1): + lines.append(f"{i}. " + self._fmt_seerr(r, leading_dash=False)) + lines.append("") + lines.append("_Pick one with `!media request `._") await self._say(evt, "\n".join(lines)) - def _fmt_seerr(self, r: dict) -> str: + def _fmt_seerr(self, r: dict, leading_dash: bool = True) -> str: title = r.get("title") or r.get("name") or "?" mt = r.get("mediaType") or "?" date_s = r.get("releaseDate") or r.get("firstAirDate") or "" @@ -286,7 +342,8 @@ class MediaBot(Plugin): bits.append(f"— {mt}") if avail: bits.append(f"[{avail}]") - return "- " + " ".join(bits) + prefix = "- " if leading_dash else "" + return prefix + " ".join(bits) # ---------- Emby ---------- @@ -377,6 +434,101 @@ class MediaBot(Plugin): lines.append("- " + self._fmt_emby_item(it)) await self._say(evt, "\n".join(lines)) + @media.subcommand("find", help="Search the existing Emby library") + @command.argument("query", pass_raw=True, required=True) + async def cmd_find(self, evt: MessageEvent, query: str) -> None: + await evt.mark_read() + if await self._reject_unmapped(evt): + return + emby_uid = self._any_emby_uid(evt.sender) + if not emby_uid: + await self._say(evt, "No usable Emby user_id in config.") + return + try: + items = await self.emby.find(emby_uid, query, limit=10) + except EmbyError as ex: + await self._say(evt, f"Emby fetch failed: {ex}") + return + if not items: + await self._say(evt, f"Nothing in the library matching **{query}**.") + return + lines = [f"**Found in library ({len(items)}):**"] + for it in items: + lines.append("- " + self._fmt_emby_item(it)) + await self._say(evt, "\n".join(lines)) + + @media.subcommand("resume", help="Your continue-watching list") + async def cmd_resume(self, evt: MessageEvent) -> None: + await evt.mark_read() + if await self._reject_unmapped(evt): + return + user_cfg = self._resolve_user(evt.sender) or {} + emby_uid = user_cfg.get("emby_user_id") + if not emby_uid or emby_uid == "TBD": + await self._say(evt, "Your Matrix user has no Emby user_id mapped — ask maddox.") + return + try: + items = await self.emby.resume(emby_uid, limit=10) + except EmbyError as ex: + await self._say(evt, f"Emby fetch failed: {ex}") + return + if not items: + await self._say(evt, "Nothing to resume — you've finished everything you started.") + return + lines = [f"**Continue watching ({len(items)}):**"] + for it in items: + pct = "" + ud = it.get("UserData") or {} + played_pct = ud.get("PlayedPercentage") + if played_pct: + pct = f" · {played_pct:.0f}%" + lines.append("- " + self._fmt_emby_item(it) + pct) + await self._say(evt, "\n".join(lines)) + + @media.subcommand("random", help="Random unwatched pick — `!media random [movie|tv]`") + @command.argument("kind", required=False) + async def cmd_random(self, evt: MessageEvent, kind: str = "") -> None: + await evt.mark_read() + if await self._reject_unmapped(evt): + return + user_cfg = self._resolve_user(evt.sender) or {} + emby_uid = user_cfg.get("emby_user_id") + if not emby_uid or emby_uid == "TBD": + emby_uid = self._any_emby_uid(evt.sender) + if not emby_uid: + await self._say(evt, "No usable Emby user_id in config.") + return + item_type = "Movie" + if kind.lower() in ("tv", "series", "show", "shows"): + item_type = "Series" + try: + item = await self.emby.random_unplayed(emby_uid, item_type=item_type) + except EmbyError as ex: + await self._say(evt, f"Emby fetch failed: {ex}") + return + if not item: + await self._say(evt, f"No unwatched {item_type.lower()}s found — go you.") + return + line = self._fmt_emby_item(item) + overview = (item.get("Overview") or "").strip() + if len(overview) > 240: + overview = overview[:240].rsplit(" ", 1)[0] + "…" + msg = f"**Random pick:** {line}" + if overview: + msg += f"\n\n{overview}" + await self._say(evt, msg) + + def _any_emby_uid(self, sender: str) -> Optional[str]: + """Sender's emby_user_id, falling back to any other mapped user's id.""" + cfg = self._resolve_user(sender) or {} + uid = cfg.get("emby_user_id") + if uid and uid != "TBD": + return uid + for v in (self.config["user_map"] or {}).values(): + if v and v.get("emby_user_id") and v["emby_user_id"] != "TBD": + return v["emby_user_id"] + return None + def _fmt_emby_item(self, it: dict) -> str: t = it.get("Type") name = it.get("Name") or "?" @@ -482,6 +634,30 @@ class MediaBot(Plugin): lines.append(f"- *{series}* {tag} — {t} (aired {air})") await self._say(evt, "\n".join(lines)) + @media.subcommand("health", help="Sonarr + Radarr active health warnings") + async def cmd_health(self, evt: MessageEvent) -> None: + await evt.mark_read() + if await self._reject_unmapped(evt): + return + sonarr_h, radarr_h = await asyncio.gather( + self.sonarr.health(), self.radarr.health(), + return_exceptions=True, + ) + lines: list[str] = [] + for label, data in (("Sonarr", sonarr_h), ("Radarr", radarr_h)): + if isinstance(data, Exception): + lines.append(f"**{label}:** unreachable ({data})") + continue + if not data: + lines.append(f"**{label}:** ✅ healthy") + continue + lines.append(f"**{label} ({len(data)} warnings):**") + for h in data: + src = h.get("source") or "?" + msg = h.get("message") or "?" + lines.append(f"- [{src}] {msg}") + await self._say(evt, "\n".join(lines)) + def _arr_pct(self, q: dict) -> str: size = q.get("size") or 0 left = q.get("sizeleft") or 0 @@ -529,3 +705,100 @@ class MediaBot(Plugin): eta = _human_eta(t.get("eta")) lines.append(f"- *{name}* — {pct:.0f}% · {speed} · ETA {eta}") await self._say(evt, "\n".join(lines)) + + @media.subcommand("speed", help="Aggregate down/up speeds across NZBGet + qBt") + async def cmd_speed(self, evt: MessageEvent) -> None: + await evt.mark_read() + if await self._reject_unmapped(evt): + return + nzb_status, qbt_xfer = await asyncio.gather( + self.nzbget.status(), self.qbt.transfer_info(), + return_exceptions=True, + ) + lines: list[str] = [] + if isinstance(nzb_status, Exception): + lines.append(f"**NZBGet:** unreachable ({nzb_status})") + else: + dl = nzb_status.get("DownloadRate") or 0 # bytes/s + paused = nzb_status.get("DownloadPaused") or nzb_status.get("ServerPaused") + tag = " (paused)" if paused else "" + lines.append(f"**NZBGet:** ↓ {_human_bytes(dl)}/s{tag}") + if isinstance(qbt_xfer, Exception): + lines.append(f"**qBittorrent:** unreachable ({qbt_xfer})") + else: + dl = qbt_xfer.get("dl_info_speed") or 0 + ul = qbt_xfer.get("up_info_speed") or 0 + state = qbt_xfer.get("connection_status") or "?" + lines.append(f"**qBittorrent:** ↓ {_human_bytes(dl)}/s · ↑ {_human_bytes(ul)}/s [{state}]") + await self._say(evt, "\n".join(lines)) + + @media.subcommand("completed", help="Downloads finished in the last 24h") + async def cmd_completed(self, evt: MessageEvent) -> None: + await evt.mark_read() + if await self._reject_unmapped(evt): + return + cutoff = datetime.now(timezone.utc).timestamp() - 86400 + nzb_hist, qbt_all = await asyncio.gather( + self.nzbget.history(), self.qbt.all_torrents(), + return_exceptions=True, + ) + lines: list[str] = [] + # NZBGet history items have HistoryTime (epoch) and FileSizeMB + if isinstance(nzb_hist, Exception): + lines.append(f"**NZBGet:** unreachable ({nzb_hist})") + else: + recent = [h for h in nzb_hist if (h.get("HistoryTime") or 0) >= cutoff] + lines.append(f"**NZBGet — completed last 24h ({len(recent)}):**") + for h in recent[:10]: + name = h.get("Name") or h.get("NZBName") or "?" + size_mb = h.get("FileSizeMB") or 0 + status = h.get("Status") or "?" + lines.append(f"- *{name}* — {size_mb / 1024:.1f} GB [{status}]") + lines.append("") + # qBt: completion_on (epoch); finished torrents have it set + if isinstance(qbt_all, Exception): + lines.append(f"**qBittorrent:** unreachable ({qbt_all})") + else: + recent = [t for t in qbt_all + if (t.get("completion_on") or 0) >= cutoff + and (t.get("completion_on") or 0) > 0] + lines.append(f"**qBittorrent — completed last 24h ({len(recent)}):**") + for t in recent[:10]: + name = t.get("name") or "?" + size = t.get("size") or 0 + lines.append(f"- *{name}* — {_human_bytes(size)}") + await self._say(evt, "\n".join(lines)) + + @media.subcommand("pause", help="Pause all downloads (NZBGet + qBittorrent)") + async def cmd_pause(self, evt: MessageEvent) -> None: + await evt.mark_read() + if await self._reject_unmapped(evt): + return + results = await asyncio.gather( + self.nzbget.pause(), self.qbt.pause_all(), + return_exceptions=True, + ) + nzb_ok = not isinstance(results[0], Exception) + qbt_ok = not isinstance(results[1], Exception) + msg = ( + f"NZBGet: {'paused ⏸' if nzb_ok else f'failed ({results[0]})'}\n" + f"qBittorrent: {'paused ⏸' if qbt_ok else f'failed ({results[1]})'}" + ) + await self._say(evt, msg) + + @media.subcommand("unpause", help="Resume all downloads (NZBGet + qBittorrent)") + async def cmd_unpause(self, evt: MessageEvent) -> None: + await evt.mark_read() + if await self._reject_unmapped(evt): + return + results = await asyncio.gather( + self.nzbget.unpause(), self.qbt.resume_all(), + return_exceptions=True, + ) + nzb_ok = not isinstance(results[0], Exception) + qbt_ok = not isinstance(results[1], Exception) + msg = ( + f"NZBGet: {'resumed ▶' if nzb_ok else f'failed ({results[0]})'}\n" + f"qBittorrent: {'resumed ▶' if qbt_ok else f'failed ({results[1]})'}" + ) + await self._say(evt, msg) diff --git a/media_bot/clients/arr.py b/media_bot/clients/arr.py index 78468ff..ccc25c4 100644 --- a/media_bot/clients/arr.py +++ b/media_bot/clients/arr.py @@ -21,6 +21,11 @@ class ArrClient: raise ArrError(f"GET {path} → {r.status}: {(await r.text())[:200]}") return await r.json() + async def health(self) -> list[dict]: + """Returns active health-check warnings (empty list = healthy).""" + data = await self._get("/api/v3/health") + return data if isinstance(data, list) else (data or []) + async def queue(self, page_size: int = 50) -> list[dict]: params = { "pageSize": page_size, diff --git a/media_bot/clients/downloads.py b/media_bot/clients/downloads.py index bb92c65..71417b8 100644 --- a/media_bot/clients/downloads.py +++ b/media_bot/clients/downloads.py @@ -29,6 +29,18 @@ class NzbgetClient: async def listgroups(self) -> list[dict]: return await self._rpc("listgroups") # type: ignore[return-value] + async def status(self) -> dict: + return await self._rpc("status") # type: ignore[return-value] + + async def history(self, hidden: bool = False) -> list[dict]: + return await self._rpc("history", [hidden]) # type: ignore[return-value] + + async def pause(self) -> bool: + return bool(await self._rpc("pausedownload")) + + async def unpause(self) -> bool: + return bool(await self._rpc("resumedownload")) + class QbtClient: def __init__(self, session: aiohttp.ClientSession, base_url: str, @@ -68,3 +80,30 @@ class QbtClient: async def downloading(self) -> list[dict]: data = await self._get("/api/v2/torrents/info", params={"filter": "downloading"}) return data if isinstance(data, list) else [] + + async def all_torrents(self) -> list[dict]: + data = await self._get("/api/v2/torrents/info") + return data if isinstance(data, list) else [] + + async def transfer_info(self) -> dict: + data = await self._get("/api/v2/transfer/info") + return data if isinstance(data, dict) else {} + + async def _post(self, path: str, data: dict | None = None) -> str: + if not self._logged_in: + await self._login() + async with self.session.post(f"{self.base}{path}", data=data, + headers={"Referer": self.base}) as r: + if r.status == 403: + self._logged_in = False + await self._login() + async with self.session.post(f"{self.base}{path}", data=data, + headers={"Referer": self.base}) as r2: + return await r2.text() + return await r.text() + + async def pause_all(self) -> None: + await self._post("/api/v2/torrents/pause", {"hashes": "all"}) + + async def resume_all(self) -> None: + await self._post("/api/v2/torrents/resume", {"hashes": "all"}) diff --git a/media_bot/clients/emby.py b/media_bot/clients/emby.py index 3c01691..3346ea8 100644 --- a/media_bot/clients/emby.py +++ b/media_bot/clients/emby.py @@ -50,3 +50,38 @@ class EmbyClient: } data = await self._get(f"/Users/{user_id}/Items", params=params) return (data or {}).get("Items", []) if isinstance(data, dict) else (data or []) + + async def find(self, user_id: str, query: str, limit: int = 10) -> list[dict]: + """Search the existing library — movies, series, episodes.""" + params = { + "SearchTerm": query, + "IncludeItemTypes": "Movie,Series,Episode", + "Recursive": "true", + "Limit": limit, + "Fields": "ProductionYear,SeriesName,IndexNumber,ParentIndexNumber", + } + data = await self._get(f"/Users/{user_id}/Items", params=params) + return (data or {}).get("Items", []) if isinstance(data, dict) else (data or []) + + async def resume(self, user_id: str, limit: int = 10) -> list[dict]: + """Continue-watching list — partially watched items.""" + params = { + "Limit": limit, + "Fields": "ProductionYear,SeriesName,IndexNumber,ParentIndexNumber", + } + data = await self._get(f"/Users/{user_id}/Items/Resume", params=params) + return (data or {}).get("Items", []) if isinstance(data, dict) else (data or []) + + async def random_unplayed(self, user_id: str, item_type: str = "Movie") -> dict | None: + """Pick one random unplayed item of the requested type.""" + params = { + "IncludeItemTypes": item_type, + "Recursive": "true", + "Filters": "IsUnplayed", + "SortBy": "Random", + "Limit": 1, + "Fields": "ProductionYear,Overview", + } + data = await self._get(f"/Users/{user_id}/Items", params=params) + items = (data or {}).get("Items", []) if isinstance(data, dict) else (data or []) + return items[0] if items else None