"""Media bot โ€” Matrix companion for the homelab media stack. Wraps Seerr (search/request), Emby (library/playback), Sonarr/Radarr (queue/calendar), NZBGet + qBittorrent (active downloads). Each Matrix sender is mapped to per-service user IDs via plugin config; unmapped senders are rejected. All replies are prefixed with the sender's MXID so notifications in shared rooms stay readable (matches the Books Bot convention). """ from __future__ import annotations import asyncio from datetime import date, datetime, timedelta, timezone from typing import Any, Optional, Type import aiohttp from aiohttp.web import Request, Response, json_response from maubot import MessageEvent, Plugin from maubot.handlers import command, event, web from mautrix.types import ( EventType, Format, ImageInfo, MediaMessageEventContent, MessageType, ReactionEvent, RoomID, TextMessageEventContent, ) from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper from .clients.arr import ArrError, RadarrClient, SonarrClient from .clients.downloads import DownloadError, NzbgetClient, QbtClient from .clients.emby import EmbyClient, EmbyError from .clients.seerr import SeerrClient, SeerrError # --- Seerr availability codes ---------------------------------------------- # https://api-docs.overseerr.dev โ€” `Status` enum SEERR_AVAIL = { 1: "unknown", 2: "pending", 3: "processing", 4: "partial", 5: "available", } def _human_bytes(n: float) -> str: for unit in ("B", "KB", "MB", "GB", "TB"): if abs(n) < 1024: return f"{n:.1f} {unit}" n /= 1024 return f"{n:.1f} PB" def _human_eta(seconds: int | float | None) -> str: if not seconds or seconds < 0 or seconds > 8640000: return "?" s = int(seconds) h, rem = divmod(s, 3600) m, s = divmod(rem, 60) if h: return f"{h}h{m}m" if m: return f"{m}m{s}s" return f"{s}s" class Config(BaseProxyConfig): def do_update(self, helper: ConfigUpdateHelper) -> None: helper.copy("http_timeout") helper.copy("default_results") helper.copy("posters_enabled") helper.copy("admin_users") helper.copy("notifications_room") helper.copy("seerr_webhook_secret") helper.copy("seerr") helper.copy("sonarr") helper.copy("radarr") helper.copy("emby") helper.copy("nzbget") helper.copy("qbittorrent") helper.copy("user_map") # Reactions counted as approve/decline. Skin-tone variants pass via startswith. APPROVE_KEYS = ("๐Ÿ‘", "โœ…", "๐ŸŸข") DECLINE_KEYS = ("๐Ÿ‘Ž", "โŒ", "๐ŸŸฅ") def _matches(key: str, candidates: tuple[str, ...]) -> bool: return any(key.startswith(c) for c in candidates) class MediaBot(Plugin): config: Config session: aiohttp.ClientSession seerr: SeerrClient sonarr: SonarrClient radarr: RadarrClient emby: EmbyClient nzbget: NzbgetClient qbt: QbtClient # Per-(room, sender) cache of last search/trending results โ€” used by # `!media request ` to skip re-typing the query. In-memory only; a bot # restart clears it (acceptable, results are cheap to rebuild). _search_cache: dict[tuple[str, str], list[dict]] # Map of bot-message event_id โ†’ Seerr request_id, for reaction-based # approve/decline. Capped at 200; oldest entries are evicted on overflow. _pending_requests: dict[str, int] async def start(self) -> None: self.config.load_and_update() timeout = aiohttp.ClientTimeout(total=self.config["http_timeout"]) self.session = aiohttp.ClientSession(timeout=timeout) s, so, r, e, n, q = ( self.config["seerr"], self.config["sonarr"], self.config["radarr"], self.config["emby"], self.config["nzbget"], self.config["qbittorrent"], ) self.seerr = SeerrClient(self.session, s["url"], s["api_key"]) self.sonarr = SonarrClient(self.session, so["url"], so["api_key"]) self.radarr = RadarrClient(self.session, r["url"], r["api_key"]) self.emby = EmbyClient(self.session, e["url"], e["api_key"]) self.nzbget = NzbgetClient(self.session, n["url"], n["username"], n["password"]) self.qbt = QbtClient(self.session, q["url"], q["username"], q["password"]) self._search_cache = {} self._pending_requests = {} self.log.info("Media bot started โ€” users=%d", len(self.config["user_map"] or {})) async def stop(self) -> None: if hasattr(self, "session"): await self.session.close() @classmethod def get_config_class(cls) -> Type[BaseProxyConfig]: return Config # ---------- helpers ---------- def _resolve_user(self, sender: str) -> Optional[dict]: return (self.config["user_map"] or {}).get(sender) async def _say(self, evt: MessageEvent, message: str) -> None: await evt.respond(f"{evt.sender}\n\n{message}") async def _reject_unmapped(self, evt: MessageEvent) -> bool: if not self._resolve_user(evt.sender): self.log.warning("Unauthorized sender: %s", evt.sender) await self._say(evt, "Sorry, your Matrix user isn't authorized to use the media bot.") return True return False def _user_pref(self, sender: str, key: str, default: Any) -> Any: cfg = self._resolve_user(sender) or {} val = cfg.get(key) return val if val is not None else default def _result_count(self, sender: str) -> int: return int(self._user_pref(sender, "result_count", self.config["default_results"])) def _stash_search(self, evt: MessageEvent, results: list[dict]) -> None: """Cache search/trending results so the user can `!media request `.""" self._search_cache[(evt.room_id, evt.sender)] = list(results) def _track_request(self, event_id: str, request_id: int) -> None: self._pending_requests[event_id] = request_id if len(self._pending_requests) > 200: oldest = next(iter(self._pending_requests)) del self._pending_requests[oldest] async def _post_poster(self, room_id: str, image_url: str, caption: str) -> bool: """Download a poster URL, upload to Matrix, send as m.image. Returns True on success.""" if not self.config["posters_enabled"]: return False try: async with self.session.get(image_url) as r: if r.status >= 400: self.log.info("poster fetch %s โ†’ %s", image_url, r.status) return False data = await r.read() mime = r.headers.get("Content-Type", "image/jpeg").split(";")[0].strip() mxc = await self.client.upload_media(data, mime_type=mime) content = MediaMessageEventContent( msgtype=MessageType.IMAGE, body=caption, url=mxc, info=ImageInfo(mimetype=mime, size=len(data)), ) await self.client.send_message(RoomID(room_id), content) return True except Exception as ex: self.log.warning("poster upload failed: %s", ex) return False # ---------- top-level command ---------- @command.new("media", aliases=["m"], help="Media stack bot โ€” try `!media help`", require_subcommand=True) async def media(self) -> None: pass @media.subcommand("help", help="Show available commands") async def cmd_help(self, evt: MessageEvent) -> None: await evt.mark_read() msg = ( "**Media bot commands**\n\n" "*Search & request (Seerr)*\n" "- `!media search ` โ€” top results (numbered)\n" "- `!media request [--tv|--movie]` โ€” request top hit\n" "- `!media request ` โ€” pick item N from your last search/trending\n" "- `!media requests` โ€” your pending/processing requests\n" "- `!media trending` โ€” what's trending (numbered)\n\n" "*Library (Emby)*\n" "- `!media nowplaying` โ€” active sessions\n" "- `!media recent [movies|tv]` โ€” recently added\n" "- `!media watched` โ€” what you recently finished\n" "- `!media find ` โ€” search the existing library\n" "- `!media resume` โ€” your continue-watching list\n" "- `!media random [movie|tv]` โ€” random unwatched pick\n\n" "*Sonarr / Radarr*\n" "- `!media queue` โ€” combined queue\n" "- `!media upcoming` โ€” Sonarr calendar (next 7 days)\n" "- `!media missing` โ€” Sonarr wanted/missing\n" "- `!media health` โ€” active warnings on either arr\n\n" "*Downloads (NZBGet + qBt)*\n" "- `!media activity` โ€” current downloads\n" "- `!media completed` โ€” finished in the last 24h\n" "- `!media speed` โ€” aggregate down/up speed\n" "- `!media pause` / `!media unpause` โ€” global pause/resume\n\n" "_Admins can react ๐Ÿ‘/๐Ÿ‘Ž on a request to approve/decline._" ) await self._say(evt, msg) # ---------- Seerr ---------- @media.subcommand("search", help="Search Seerr for a movie or show") @command.argument("query", pass_raw=True, required=True) async def cmd_search(self, evt: MessageEvent, query: str) -> None: await evt.mark_read() if await self._reject_unmapped(evt): return try: results = await self.seerr.search(query) except SeerrError as ex: await self._say(evt, f"Search failed: {ex}") return results = [r for r in results if r.get("mediaType") in ("movie", "tv")] if not results: await self._say(evt, f"No Seerr results for **{query}**.") return n = self._result_count(evt.sender) shown = results[:n] self._stash_search(evt, shown) lines = [f"**Top {len(shown)} for '{query}':**"] for i, r in enumerate(shown, 1): lines.append(f"{i}. " + self._fmt_seerr(r, leading_dash=False)) lines.append("") lines.append("_Pick one with `!media request `._") await self._say(evt, "\n".join(lines)) @media.subcommand("request", help="Request a movie/show. Pass `` or a number from a recent search.") @command.argument("query", pass_raw=True, required=True) async def cmd_request(self, evt: MessageEvent, query: str) -> None: await evt.mark_read() if await self._reject_unmapped(evt): return user_cfg = self._resolve_user(evt.sender) or {} seerr_uid = user_cfg.get("seerr_user_id") if not seerr_uid: await self._say(evt, "Your Matrix user has no Seerr user_id mapped โ€” ask maddox.") return # Numbered selection: !media request 2 โ†’ 2nd item from last search/trending stripped = query.strip() if stripped.isdigit(): cached = self._search_cache.get((evt.room_id, evt.sender)) or [] idx = int(stripped) - 1 if not cached: await self._say(evt, "No recent search results to pick from. Run `!media search ` first.") return if idx < 0 or idx >= len(cached): await self._say(evt, f"Pick a number 1-{len(cached)}.") return top = cached[idx] await self._do_request(evt, seerr_uid, top) return # Free-text path: optional --tv/--movie flag, else fall back to user default forced_type: Optional[str] = None cleaned: list[str] = [] for w in stripped.split(): if w == "--tv": forced_type = "tv" elif w == "--movie": forced_type = "movie" else: cleaned.append(w) if forced_type is None: forced_type = user_cfg.get("default_media_type") q = " ".join(cleaned).strip() if not q: await self._say(evt, "Need a search query, e.g. `!media request dune --movie`.") return try: results = await self.seerr.search(q) except SeerrError as ex: await self._say(evt, f"Search failed: {ex}") return candidates = [r for r in results if r.get("mediaType") in ("movie", "tv")] if forced_type: candidates = [r for r in candidates if r.get("mediaType") == forced_type] if not candidates: await self._say(evt, f"No matching results for **{q}**.") return await self._do_request(evt, seerr_uid, candidates[0]) async def _do_request(self, evt: MessageEvent, seerr_uid: int, item: dict) -> None: media_type = item.get("mediaType") tmdb_id = item.get("id") title = item.get("title") or item.get("name") or "?" if not (media_type and tmdb_id): await self._say(evt, f"**{title}** is missing a media type or TMDB id โ€” can't request.") return try: req = await self.seerr.request(media_type, tmdb_id, seerr_uid) except SeerrError as ex: await self._say(evt, f"Request failed: {ex}") return status_id = (req or {}).get("status") status = {1: "pending approval", 2: "approved", 3: "declined"}.get(status_id, str(status_id)) request_id = (req or {}).get("id") # Optional poster preview poster = SeerrClient.poster_url(item) if poster: await self._post_poster(evt.room_id, poster, f"{title} ({media_type})") msg = f"Requested **{title}** ({media_type}) โ€” status: *{status}*." if status_id == 1 and request_id: msg += "\n\n_Admins: react ๐Ÿ‘ to approve or ๐Ÿ‘Ž to decline._" sent_id = await evt.respond(f"{evt.sender}\n\n{msg}") if request_id and status_id == 1 and sent_id: self._track_request(sent_id, request_id) @media.subcommand("requests", help="Show your pending/processing Seerr requests") async def cmd_requests(self, evt: MessageEvent) -> None: await evt.mark_read() if await self._reject_unmapped(evt): return user_cfg = self._resolve_user(evt.sender) seerr_uid = (user_cfg or {}).get("seerr_user_id") if not seerr_uid: await self._say(evt, "Your Matrix user has no Seerr user_id mapped โ€” ask maddox.") return try: reqs = await self.seerr.user_requests(seerr_uid, take=10) except SeerrError as ex: await self._say(evt, f"Lookup failed: {ex}") return if not reqs: await self._say(evt, "No pending or processing requests.") return lines = [f"**Your requests ({len(reqs)}):**"] for r in reqs: mi = r.get("media") or {} t = mi.get("title") or mi.get("name") or f"tmdb:{mi.get('tmdbId')}" mt = mi.get("mediaType") or "?" avail = SEERR_AVAIL.get(mi.get("status", 0), "?") lines.append(f"- *{t}* ({mt}) โ€” {avail}") await self._say(evt, "\n".join(lines)) @media.subcommand("trending", help="Show what's trending in Seerr") async def cmd_trending(self, evt: MessageEvent) -> None: await evt.mark_read() if await self._reject_unmapped(evt): return try: results = await self.seerr.trending() except SeerrError as ex: await self._say(evt, f"Trending fetch failed: {ex}") return n = self._result_count(evt.sender) results = [r for r in results if r.get("mediaType") in ("movie", "tv")][:n] if not results: await self._say(evt, "Nothing trending right now.") return self._stash_search(evt, results) lines = [f"**Trending ({len(results)}):**"] for i, r in enumerate(results, 1): lines.append(f"{i}. " + self._fmt_seerr(r, leading_dash=False)) lines.append("") lines.append("_Pick one with `!media request `._") await self._say(evt, "\n".join(lines)) def _fmt_seerr(self, r: dict, leading_dash: bool = True) -> str: title = r.get("title") or r.get("name") or "?" mt = r.get("mediaType") or "?" date_s = r.get("releaseDate") or r.get("firstAirDate") or "" year = date_s[:4] if date_s else "" info = (r.get("mediaInfo") or {}).get("status") avail = SEERR_AVAIL.get(info, None) if info else None bits = [f"*{title}*"] if year: bits.append(f"({year})") bits.append(f"โ€” {mt}") if avail: bits.append(f"[{avail}]") prefix = "- " if leading_dash else "" return prefix + " ".join(bits) # ---------- Emby ---------- @media.subcommand("nowplaying", aliases=["np"], help="Active Emby sessions") async def cmd_nowplaying(self, evt: MessageEvent) -> None: await evt.mark_read() if await self._reject_unmapped(evt): return try: sessions = await self.emby.sessions() except EmbyError as ex: await self._say(evt, f"Emby fetch failed: {ex}") return active = [s for s in sessions if s.get("NowPlayingItem")] if not active: await self._say(evt, "Nothing playing on Emby right now.") return lines = [f"**Now playing on Emby ({len(active)}):**"] for s in active: item = s.get("NowPlayingItem") or {} user = s.get("UserName") or "?" client = s.get("Client") or "" t = self._fmt_emby_item(item) ps = s.get("PlayState") or {} pos = ps.get("PositionTicks") or 0 run = item.get("RunTimeTicks") or 0 pct = (pos / run * 100) if run else 0 paused = " (paused)" if ps.get("IsPaused") else "" lines.append(f"- **{user}** โ€” {t} ยท {pct:.0f}%{paused} [{client}]") await self._say(evt, "\n".join(lines)) @media.subcommand("recent", help="Recently added โ€” `!media recent [movies|tv]`") @command.argument("kind", required=False) async def cmd_recent(self, evt: MessageEvent, kind: str = "") -> None: await evt.mark_read() if await self._reject_unmapped(evt): return # Use the sender's emby_user_id if set, else fall back to the first mapped user's id user_cfg = self._resolve_user(evt.sender) or {} emby_uid = user_cfg.get("emby_user_id") if not emby_uid or emby_uid == "TBD": for v in (self.config["user_map"] or {}).values(): if v and v.get("emby_user_id") and v["emby_user_id"] != "TBD": emby_uid = v["emby_user_id"] break if not emby_uid: await self._say(evt, "No usable Emby user_id in config.") return item_types = None if kind.lower() in ("movie", "movies"): item_types = "Movie" elif kind.lower() in ("tv", "series", "shows"): item_types = "Series,Episode" try: items = await self.emby.recently_added(emby_uid, limit=10, item_types=item_types) except EmbyError as ex: await self._say(evt, f"Emby fetch failed: {ex}") return if not items: await self._say(evt, "Nothing recently added.") return label = f" ({kind})" if kind else "" lines = [f"**Recently added{label} ({len(items)}):**"] for it in items: lines.append("- " + self._fmt_emby_item(it)) await self._say(evt, "\n".join(lines)) @media.subcommand("watched", help="Your recently watched items on Emby") async def cmd_watched(self, evt: MessageEvent) -> None: await evt.mark_read() if await self._reject_unmapped(evt): return user_cfg = self._resolve_user(evt.sender) or {} emby_uid = user_cfg.get("emby_user_id") if not emby_uid or emby_uid == "TBD": await self._say(evt, "Your Matrix user has no Emby user_id mapped โ€” ask maddox.") return try: items = await self.emby.user_played(emby_uid, limit=10) except EmbyError as ex: await self._say(evt, f"Emby fetch failed: {ex}") return if not items: await self._say(evt, "No recently watched items.") return lines = [f"**Recently watched ({len(items)}):**"] for it in items: lines.append("- " + self._fmt_emby_item(it)) await self._say(evt, "\n".join(lines)) @media.subcommand("find", help="Search the existing Emby library") @command.argument("query", pass_raw=True, required=True) async def cmd_find(self, evt: MessageEvent, query: str) -> None: await evt.mark_read() if await self._reject_unmapped(evt): return emby_uid = self._any_emby_uid(evt.sender) if not emby_uid: await self._say(evt, "No usable Emby user_id in config.") return try: items = await self.emby.find(emby_uid, query, limit=10) except EmbyError as ex: await self._say(evt, f"Emby fetch failed: {ex}") return if not items: await self._say(evt, f"Nothing in the library matching **{query}**.") return lines = [f"**Found in library ({len(items)}):**"] for it in items: lines.append("- " + self._fmt_emby_item(it)) await self._say(evt, "\n".join(lines)) @media.subcommand("resume", help="Your continue-watching list") async def cmd_resume(self, evt: MessageEvent) -> None: await evt.mark_read() if await self._reject_unmapped(evt): return user_cfg = self._resolve_user(evt.sender) or {} emby_uid = user_cfg.get("emby_user_id") if not emby_uid or emby_uid == "TBD": await self._say(evt, "Your Matrix user has no Emby user_id mapped โ€” ask maddox.") return try: items = await self.emby.resume(emby_uid, limit=10) except EmbyError as ex: await self._say(evt, f"Emby fetch failed: {ex}") return if not items: await self._say(evt, "Nothing to resume โ€” you've finished everything you started.") return lines = [f"**Continue watching ({len(items)}):**"] for it in items: pct = "" ud = it.get("UserData") or {} played_pct = ud.get("PlayedPercentage") if played_pct: pct = f" ยท {played_pct:.0f}%" lines.append("- " + self._fmt_emby_item(it) + pct) await self._say(evt, "\n".join(lines)) @media.subcommand("random", help="Random unwatched pick โ€” `!media random [movie|tv]`") @command.argument("kind", required=False) async def cmd_random(self, evt: MessageEvent, kind: str = "") -> None: await evt.mark_read() if await self._reject_unmapped(evt): return user_cfg = self._resolve_user(evt.sender) or {} emby_uid = user_cfg.get("emby_user_id") if not emby_uid or emby_uid == "TBD": emby_uid = self._any_emby_uid(evt.sender) if not emby_uid: await self._say(evt, "No usable Emby user_id in config.") return item_type = "Movie" if kind.lower() in ("tv", "series", "show", "shows"): item_type = "Series" try: item = await self.emby.random_unplayed(emby_uid, item_type=item_type) except EmbyError as ex: await self._say(evt, f"Emby fetch failed: {ex}") return if not item: await self._say(evt, f"No unwatched {item_type.lower()}s found โ€” go you.") return line = self._fmt_emby_item(item) overview = (item.get("Overview") or "").strip() if len(overview) > 240: overview = overview[:240].rsplit(" ", 1)[0] + "โ€ฆ" poster = self.emby.poster_url(item) if poster: await self._post_poster(evt.room_id, poster, item.get("Name") or "poster") msg = f"**Random pick:** {line}" if overview: msg += f"\n\n{overview}" await self._say(evt, msg) def _any_emby_uid(self, sender: str) -> Optional[str]: """Sender's emby_user_id, falling back to any other mapped user's id.""" cfg = self._resolve_user(sender) or {} uid = cfg.get("emby_user_id") if uid and uid != "TBD": return uid for v in (self.config["user_map"] or {}).values(): if v and v.get("emby_user_id") and v["emby_user_id"] != "TBD": return v["emby_user_id"] return None def _fmt_emby_item(self, it: dict) -> str: t = it.get("Type") name = it.get("Name") or "?" year = it.get("ProductionYear") if t == "Episode": series = it.get("SeriesName") or "?" sn = it.get("ParentIndexNumber") ep = it.get("IndexNumber") tag = f"S{sn:02d}E{ep:02d}" if sn and ep else "" return f"*{series}* {tag} โ€” {name}".strip() if year: return f"*{name}* ({year})" return f"*{name}*" # ---------- Sonarr / Radarr ---------- @media.subcommand("queue", help="Combined Sonarr + Radarr queue") async def cmd_queue(self, evt: MessageEvent) -> None: await evt.mark_read() if await self._reject_unmapped(evt): return try: sonarr_q, radarr_q = await asyncio.gather( self.sonarr.queue(), self.radarr.queue(), return_exceptions=True, ) except Exception as ex: await self._say(evt, f"Queue fetch failed: {ex}") return lines: list[str] = [] if isinstance(sonarr_q, Exception): lines.append(f"**Sonarr:** unreachable ({sonarr_q})") else: lines.append(f"**Sonarr queue ({len(sonarr_q)}):**") for q in sonarr_q[:10]: series = (q.get("series") or {}).get("title") or "?" ep = q.get("episode") or {} tag = "" if ep.get("seasonNumber") is not None and ep.get("episodeNumber") is not None: tag = f" S{ep['seasonNumber']:02d}E{ep['episodeNumber']:02d}" status = q.get("status") or "?" pct = self._arr_pct(q) lines.append(f"- *{series}*{tag} โ€” {status} {pct}") lines.append("") if isinstance(radarr_q, Exception): lines.append(f"**Radarr:** unreachable ({radarr_q})") else: lines.append(f"**Radarr queue ({len(radarr_q)}):**") for q in radarr_q[:10]: title = (q.get("movie") or {}).get("title") or q.get("title") or "?" status = q.get("status") or "?" pct = self._arr_pct(q) lines.append(f"- *{title}* โ€” {status} {pct}") await self._say(evt, "\n".join(lines)) @media.subcommand("upcoming", help="Sonarr calendar โ€” episodes airing in next 7 days") async def cmd_upcoming(self, evt: MessageEvent) -> None: await evt.mark_read() if await self._reject_unmapped(evt): return today = date.today() end = today + timedelta(days=7) try: cal = await self.sonarr.calendar(today.isoformat(), end.isoformat()) except ArrError as ex: await self._say(evt, f"Calendar fetch failed: {ex}") return if not cal: await self._say(evt, "Nothing airing in the next 7 days.") return lines = [f"**Upcoming ({len(cal)}):**"] for ep in cal[:15]: series = (ep.get("series") or {}).get("title") or "?" sn = ep.get("seasonNumber") en = ep.get("episodeNumber") tag = f"S{sn:02d}E{en:02d}" if sn is not None and en is not None else "" t = ep.get("title") or "" air = (ep.get("airDateUtc") or ep.get("airDate") or "")[:10] lines.append(f"- {air}: *{series}* {tag} โ€” {t}") await self._say(evt, "\n".join(lines)) @media.subcommand("missing", help="Sonarr wanted/missing list") async def cmd_missing(self, evt: MessageEvent) -> None: await evt.mark_read() if await self._reject_unmapped(evt): return try: items = await self.sonarr.missing(page_size=10) except ArrError as ex: await self._say(evt, f"Missing fetch failed: {ex}") return if not items: await self._say(evt, "No missing episodes โ€” Sonarr is happy.") return lines = [f"**Wanted/Missing ({len(items)}):**"] for ep in items: series = (ep.get("series") or {}).get("title") or "?" sn = ep.get("seasonNumber") en = ep.get("episodeNumber") tag = f"S{sn:02d}E{en:02d}" if sn is not None and en is not None else "" t = ep.get("title") or "" air = (ep.get("airDateUtc") or ep.get("airDate") or "")[:10] lines.append(f"- *{series}* {tag} โ€” {t} (aired {air})") await self._say(evt, "\n".join(lines)) @media.subcommand("health", help="Sonarr + Radarr active health warnings") async def cmd_health(self, evt: MessageEvent) -> None: await evt.mark_read() if await self._reject_unmapped(evt): return sonarr_h, radarr_h = await asyncio.gather( self.sonarr.health(), self.radarr.health(), return_exceptions=True, ) lines: list[str] = [] for label, data in (("Sonarr", sonarr_h), ("Radarr", radarr_h)): if isinstance(data, Exception): lines.append(f"**{label}:** unreachable ({data})") continue if not data: lines.append(f"**{label}:** โœ… healthy") continue lines.append(f"**{label} ({len(data)} warnings):**") for h in data: src = h.get("source") or "?" msg = h.get("message") or "?" lines.append(f"- [{src}] {msg}") await self._say(evt, "\n".join(lines)) def _arr_pct(self, q: dict) -> str: size = q.get("size") or 0 left = q.get("sizeleft") or 0 if size <= 0: return "" pct = (1 - left / size) * 100 return f"{pct:.0f}%" # ---------- Downloads ---------- @media.subcommand("activity", aliases=["dl"], help="NZBGet + qBittorrent active downloads") async def cmd_activity(self, evt: MessageEvent) -> None: await evt.mark_read() if await self._reject_unmapped(evt): return nzb_res, qbt_res = await asyncio.gather( self.nzbget.listgroups(), self.qbt.downloading(), return_exceptions=True, ) lines: list[str] = [] # NZBGet if isinstance(nzb_res, Exception): lines.append(f"**NZBGet:** unreachable ({nzb_res})") else: active = [g for g in nzb_res if (g.get("RemainingSizeMB") or 0) > 0] lines.append(f"**NZBGet ({len(active)} active):**" if active else "**NZBGet:** idle") for g in active[:8]: name = g.get("NZBNicename") or g.get("NZBName") or "?" rate_mbps = (g.get("DownloadRate") or 0) / 1024 / 1024 # NZBGet reports B/s size_mb = g.get("FileSizeMB") or 0 left_mb = g.get("RemainingSizeMB") or 0 pct = (1 - left_mb / size_mb) * 100 if size_mb else 0 lines.append(f"- *{name}* โ€” {pct:.0f}% ยท {rate_mbps:.1f} MB/s") lines.append("") # qBittorrent if isinstance(qbt_res, Exception): lines.append(f"**qBittorrent:** unreachable ({qbt_res})") else: lines.append(f"**qBittorrent ({len(qbt_res)} active):**" if qbt_res else "**qBittorrent:** idle") for t in qbt_res[:8]: name = t.get("name") or "?" pct = (t.get("progress") or 0) * 100 speed = _human_bytes(t.get("dlspeed") or 0) + "/s" eta = _human_eta(t.get("eta")) lines.append(f"- *{name}* โ€” {pct:.0f}% ยท {speed} ยท ETA {eta}") await self._say(evt, "\n".join(lines)) @media.subcommand("speed", help="Aggregate down/up speeds across NZBGet + qBt") async def cmd_speed(self, evt: MessageEvent) -> None: await evt.mark_read() if await self._reject_unmapped(evt): return nzb_status, qbt_xfer = await asyncio.gather( self.nzbget.status(), self.qbt.transfer_info(), return_exceptions=True, ) lines: list[str] = [] if isinstance(nzb_status, Exception): lines.append(f"**NZBGet:** unreachable ({nzb_status})") else: dl = nzb_status.get("DownloadRate") or 0 # bytes/s paused = nzb_status.get("DownloadPaused") or nzb_status.get("ServerPaused") tag = " (paused)" if paused else "" lines.append(f"**NZBGet:** โ†“ {_human_bytes(dl)}/s{tag}") if isinstance(qbt_xfer, Exception): lines.append(f"**qBittorrent:** unreachable ({qbt_xfer})") else: dl = qbt_xfer.get("dl_info_speed") or 0 ul = qbt_xfer.get("up_info_speed") or 0 state = qbt_xfer.get("connection_status") or "?" lines.append(f"**qBittorrent:** โ†“ {_human_bytes(dl)}/s ยท โ†‘ {_human_bytes(ul)}/s [{state}]") await self._say(evt, "\n".join(lines)) @media.subcommand("completed", help="Downloads finished in the last 24h") async def cmd_completed(self, evt: MessageEvent) -> None: await evt.mark_read() if await self._reject_unmapped(evt): return cutoff = datetime.now(timezone.utc).timestamp() - 86400 nzb_hist, qbt_all = await asyncio.gather( self.nzbget.history(), self.qbt.all_torrents(), return_exceptions=True, ) lines: list[str] = [] # NZBGet history items have HistoryTime (epoch) and FileSizeMB if isinstance(nzb_hist, Exception): lines.append(f"**NZBGet:** unreachable ({nzb_hist})") else: recent = [h for h in nzb_hist if (h.get("HistoryTime") or 0) >= cutoff] lines.append(f"**NZBGet โ€” completed last 24h ({len(recent)}):**") for h in recent[:10]: name = h.get("Name") or h.get("NZBName") or "?" size_mb = h.get("FileSizeMB") or 0 status = h.get("Status") or "?" lines.append(f"- *{name}* โ€” {size_mb / 1024:.1f} GB [{status}]") lines.append("") # qBt: completion_on (epoch); finished torrents have it set if isinstance(qbt_all, Exception): lines.append(f"**qBittorrent:** unreachable ({qbt_all})") else: recent = [t for t in qbt_all if (t.get("completion_on") or 0) >= cutoff and (t.get("completion_on") or 0) > 0] lines.append(f"**qBittorrent โ€” completed last 24h ({len(recent)}):**") for t in recent[:10]: name = t.get("name") or "?" size = t.get("size") or 0 lines.append(f"- *{name}* โ€” {_human_bytes(size)}") await self._say(evt, "\n".join(lines)) @media.subcommand("pause", help="Pause all downloads (NZBGet + qBittorrent)") async def cmd_pause(self, evt: MessageEvent) -> None: await evt.mark_read() if await self._reject_unmapped(evt): return results = await asyncio.gather( self.nzbget.pause(), self.qbt.pause_all(), return_exceptions=True, ) nzb_ok = not isinstance(results[0], Exception) qbt_ok = not isinstance(results[1], Exception) msg = ( f"NZBGet: {'paused โธ' if nzb_ok else f'failed ({results[0]})'}\n" f"qBittorrent: {'paused โธ' if qbt_ok else f'failed ({results[1]})'}" ) await self._say(evt, msg) @media.subcommand("unpause", help="Resume all downloads (NZBGet + qBittorrent)") async def cmd_unpause(self, evt: MessageEvent) -> None: await evt.mark_read() if await self._reject_unmapped(evt): return results = await asyncio.gather( self.nzbget.unpause(), self.qbt.resume_all(), return_exceptions=True, ) nzb_ok = not isinstance(results[0], Exception) qbt_ok = not isinstance(results[1], Exception) msg = ( f"NZBGet: {'resumed โ–ถ' if nzb_ok else f'failed ({results[0]})'}\n" f"qBittorrent: {'resumed โ–ถ' if qbt_ok else f'failed ({results[1]})'}" ) await self._say(evt, msg) # ---------- Reactions: admin approve/decline ---------- @event.on(EventType.REACTION) async def on_reaction(self, evt: ReactionEvent) -> None: relates = getattr(evt.content, "relates_to", None) if not relates or not relates.event_id or not relates.key: return request_id = self._pending_requests.get(relates.event_id) if not request_id: return # not one of our request messages admins = self.config["admin_users"] or [] if evt.sender not in admins: self.log.info("Reaction from non-admin %s ignored", evt.sender) return key = relates.key if _matches(key, APPROVE_KEYS): label, fn = "approved โœ…", self.seerr.approve elif _matches(key, DECLINE_KEYS): label, fn = "declined โŒ", self.seerr.decline else: return # unrelated reaction try: await fn(request_id) except SeerrError as ex: await self.client.send_text(evt.room_id, text=f"Request {request_id} action failed: {ex}") return self._pending_requests.pop(relates.event_id, None) await self.client.send_text( evt.room_id, text=f"Request {request_id} {label} (by {evt.sender})" ) # ---------- Webhook: Seerr โ†’ Matrix notifications ---------- @web.post("/seerr-webhook") async def seerr_webhook(self, req: Request) -> Response: secret = self.config["seerr_webhook_secret"] or "" if secret: # Case-insensitive scheme per RFC 7235 โ€” Seerr's UI lowercases "Bearer" auth = (req.headers.get("Authorization") or "").strip() scheme, _, token = auth.partition(" ") if scheme.lower() != "bearer" or token != secret: self.log.warning("Seerr webhook bad auth from %s", req.remote) return Response(status=401, text="unauthorized") try: payload = await req.json() except Exception: return Response(status=400, text="invalid json") room = (self.config["notifications_room"] or "").strip() if not room: return json_response({"ok": False, "error": "notifications_room not configured"}) text, html = self._format_seerr_event(payload) try: content = TextMessageEventContent( msgtype=MessageType.NOTICE, body=text, format=Format.HTML, formatted_body=html, ) await self.client.send_message(RoomID(room), content) except Exception as ex: self.log.exception("Failed to post Seerr webhook to %s", room) return json_response({"ok": False, "error": str(ex)}, status=500) poster = payload.get("image") if poster and self.config["posters_enabled"]: await self._post_poster(room, poster, payload.get("subject") or "poster") return json_response({"ok": True}) def _format_seerr_event(self, p: dict) -> tuple[str, str]: nt = (p.get("notification_type") or "").upper() subject = p.get("subject") or "?" message = p.get("message") or "" media = p.get("media") or {} request = p.get("request") or {} media_type = media.get("media_type") or "?" requester = request.get("requestedBy_username") or "" emoji = { "MEDIA_PENDING": "๐Ÿ“ฅ", "MEDIA_APPROVED": "โœ…", "MEDIA_AUTO_APPROVED": "โœ…", "MEDIA_AVAILABLE": "๐ŸŽฌ", "MEDIA_DECLINED": "โŒ", "MEDIA_FAILED": "โš ๏ธ", "ISSUE_CREATED": "๐Ÿ›", "ISSUE_COMMENT": "๐Ÿ’ฌ", "ISSUE_RESOLVED": "๐Ÿ› ", "TEST_NOTIFICATION": "๐Ÿ”ง", }.get(nt, "๐Ÿ“ฃ") label = nt.replace("_", " ").title() or "Notification" text_parts = [f"{emoji} {label} โ€” {subject}"] if media_type and media_type != "?": text_parts.append(f"({media_type})") if requester: text_parts.append(f"โ€” requested by {requester}") text = " ".join(text_parts) if message: text += f"\n\n{message}" html = ( f"{emoji} {label} โ€” {subject}" + (f" ({media_type})" if media_type != '?' else "") + (f" โ€” requested by {requester}" if requester else "") + (f"

{message}" if message else "") ) return text, html