maubot-media/media_bot/bot.py
Maddox 8c62c9fd31 Initial commit: media bot v0.1.0
Maubot plugin: Matrix companion for the homelab media stack.

Services wrapped:
- Seerr: search, request, requests, trending
- Emby: nowplaying, recent, watched
- Sonarr/Radarr: queue, upcoming, missing
- NZBGet/qBittorrent: activity

Each Matrix sender is mapped to per-service user IDs via plugin config;
unmapped senders are rejected. Replies are MXID-prefixed for shared rooms.
2026-04-28 08:22:38 -04:00

531 lines
21 KiB
Python

"""Media bot — Matrix companion for the homelab media stack.
Wraps Seerr (search/request), Emby (library/playback), Sonarr/Radarr (queue/calendar),
NZBGet + qBittorrent (active downloads).
Each Matrix sender is mapped to per-service user IDs via plugin config; unmapped
senders are rejected. All replies are prefixed with the sender's MXID so notifications
in shared rooms stay readable (matches the Books Bot convention).
"""
from __future__ import annotations
import asyncio
from datetime import date, datetime, timedelta, timezone
from typing import Any, Optional, Type
import aiohttp
from maubot import MessageEvent, Plugin
from maubot.handlers import command
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
from .clients.arr import ArrError, RadarrClient, SonarrClient
from .clients.downloads import DownloadError, NzbgetClient, QbtClient
from .clients.emby import EmbyClient, EmbyError
from .clients.seerr import SeerrClient, SeerrError
# --- Seerr availability codes ----------------------------------------------
# https://api-docs.overseerr.dev — `Status` enum
SEERR_AVAIL = {
1: "unknown",
2: "pending",
3: "processing",
4: "partial",
5: "available",
}
def _human_bytes(n: float) -> str:
for unit in ("B", "KB", "MB", "GB", "TB"):
if abs(n) < 1024:
return f"{n:.1f} {unit}"
n /= 1024
return f"{n:.1f} PB"
def _human_eta(seconds: int | float | None) -> str:
if not seconds or seconds < 0 or seconds > 8640000:
return "?"
s = int(seconds)
h, rem = divmod(s, 3600)
m, s = divmod(rem, 60)
if h:
return f"{h}h{m}m"
if m:
return f"{m}m{s}s"
return f"{s}s"
class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("http_timeout")
helper.copy("default_results")
helper.copy("seerr")
helper.copy("sonarr")
helper.copy("radarr")
helper.copy("emby")
helper.copy("nzbget")
helper.copy("qbittorrent")
helper.copy("user_map")
class MediaBot(Plugin):
config: Config
session: aiohttp.ClientSession
seerr: SeerrClient
sonarr: SonarrClient
radarr: RadarrClient
emby: EmbyClient
nzbget: NzbgetClient
qbt: QbtClient
async def start(self) -> None:
self.config.load_and_update()
timeout = aiohttp.ClientTimeout(total=self.config["http_timeout"])
self.session = aiohttp.ClientSession(timeout=timeout)
s, so, r, e, n, q = (
self.config["seerr"], self.config["sonarr"], self.config["radarr"],
self.config["emby"], self.config["nzbget"], self.config["qbittorrent"],
)
self.seerr = SeerrClient(self.session, s["url"], s["api_key"])
self.sonarr = SonarrClient(self.session, so["url"], so["api_key"])
self.radarr = RadarrClient(self.session, r["url"], r["api_key"])
self.emby = EmbyClient(self.session, e["url"], e["api_key"])
self.nzbget = NzbgetClient(self.session, n["url"], n["username"], n["password"])
self.qbt = QbtClient(self.session, q["url"], q["username"], q["password"])
self.log.info("Media bot started — users=%d", len(self.config["user_map"] or {}))
async def stop(self) -> None:
if hasattr(self, "session"):
await self.session.close()
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config
# ---------- helpers ----------
def _resolve_user(self, sender: str) -> Optional[dict]:
return (self.config["user_map"] or {}).get(sender)
async def _say(self, evt: MessageEvent, message: str) -> None:
await evt.respond(f"{evt.sender}\n\n{message}")
async def _reject_unmapped(self, evt: MessageEvent) -> bool:
if not self._resolve_user(evt.sender):
self.log.warning("Unauthorized sender: %s", evt.sender)
await self._say(evt, "Sorry, your Matrix user isn't authorized to use the media bot.")
return True
return False
# ---------- top-level command ----------
@command.new("media", aliases=["m"], help="Media stack bot — try `!media help`",
require_subcommand=True)
async def media(self) -> None:
pass
@media.subcommand("help", help="Show available commands")
async def cmd_help(self, evt: MessageEvent) -> None:
await evt.mark_read()
msg = (
"**Media bot commands**\n\n"
"*Search & request (Seerr)*\n"
"- `!media search <query>` — top results\n"
"- `!media request <query> [--tv|--movie]` — request top hit\n"
"- `!media requests` — your pending/processing requests\n"
"- `!media trending` — what's trending\n\n"
"*Library (Emby)*\n"
"- `!media nowplaying` — active sessions\n"
"- `!media recent [movies|tv]` — recently added\n"
"- `!media watched` — what you recently finished\n\n"
"*Downloads (Sonarr/Radarr/NZB/qBt)*\n"
"- `!media queue` — Sonarr + Radarr queue\n"
"- `!media activity` — NZBGet + qBt active downloads\n"
"- `!media upcoming` — Sonarr calendar (next 7 days)\n"
"- `!media missing` — Sonarr wanted/missing\n"
)
await self._say(evt, msg)
# ---------- Seerr ----------
@media.subcommand("search", help="Search Seerr for a movie or show")
@command.argument("query", pass_raw=True, required=True)
async def cmd_search(self, evt: MessageEvent, query: str) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
try:
results = await self.seerr.search(query)
except SeerrError as ex:
await self._say(evt, f"Search failed: {ex}")
return
if not results:
await self._say(evt, f"No Seerr results for **{query}**.")
return
n = self.config["default_results"]
lines = [f"**Top {min(n, len(results))} for '{query}':**"]
for r in results[:n]:
lines.append(self._fmt_seerr(r))
await self._say(evt, "\n".join(lines))
@media.subcommand("request", help="Request the top hit; pass --tv or --movie to force type")
@command.argument("query", pass_raw=True, required=True)
async def cmd_request(self, evt: MessageEvent, query: str) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
forced_type: Optional[str] = None
words = query.split()
cleaned = []
for w in words:
if w == "--tv":
forced_type = "tv"
elif w == "--movie":
forced_type = "movie"
else:
cleaned.append(w)
q = " ".join(cleaned).strip()
if not q:
await self._say(evt, "Need a search query, e.g. `!media request dune --movie`.")
return
user_cfg = self._resolve_user(evt.sender)
seerr_uid = user_cfg.get("seerr_user_id") if user_cfg else None
if not seerr_uid:
await self._say(evt, "Your Matrix user has no Seerr user_id mapped — ask maddox.")
return
try:
results = await self.seerr.search(q)
except SeerrError as ex:
await self._say(evt, f"Search failed: {ex}")
return
candidates = [r for r in results if r.get("mediaType") in ("movie", "tv")]
if forced_type:
candidates = [r for r in candidates if r.get("mediaType") == forced_type]
if not candidates:
await self._say(evt, f"No matching results for **{q}**.")
return
top = candidates[0]
media_type = top["mediaType"]
tmdb_id = top.get("id")
title = top.get("title") or top.get("name") or q
if not tmdb_id:
await self._say(evt, f"Top result for **{q}** has no TMDB id — try `!media search`.")
return
try:
req = await self.seerr.request(media_type, tmdb_id, seerr_uid)
except SeerrError as ex:
await self._say(evt, f"Request failed: {ex}")
return
status_id = (req or {}).get("status")
status = {1: "pending approval", 2: "approved", 3: "declined"}.get(status_id, str(status_id))
await self._say(evt, f"Requested **{title}** ({media_type}) — status: *{status}*.")
@media.subcommand("requests", help="Show your pending/processing Seerr requests")
async def cmd_requests(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
user_cfg = self._resolve_user(evt.sender)
seerr_uid = (user_cfg or {}).get("seerr_user_id")
if not seerr_uid:
await self._say(evt, "Your Matrix user has no Seerr user_id mapped — ask maddox.")
return
try:
reqs = await self.seerr.user_requests(seerr_uid, take=10)
except SeerrError as ex:
await self._say(evt, f"Lookup failed: {ex}")
return
if not reqs:
await self._say(evt, "No pending or processing requests.")
return
lines = [f"**Your requests ({len(reqs)}):**"]
for r in reqs:
mi = r.get("media") or {}
t = mi.get("title") or mi.get("name") or f"tmdb:{mi.get('tmdbId')}"
mt = mi.get("mediaType") or "?"
avail = SEERR_AVAIL.get(mi.get("status", 0), "?")
lines.append(f"- *{t}* ({mt}) — {avail}")
await self._say(evt, "\n".join(lines))
@media.subcommand("trending", help="Show what's trending in Seerr")
async def cmd_trending(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
try:
results = await self.seerr.trending()
except SeerrError as ex:
await self._say(evt, f"Trending fetch failed: {ex}")
return
n = self.config["default_results"]
results = [r for r in results if r.get("mediaType") in ("movie", "tv")][:n]
if not results:
await self._say(evt, "Nothing trending right now.")
return
lines = [f"**Trending ({len(results)}):**"]
for r in results:
lines.append(self._fmt_seerr(r))
await self._say(evt, "\n".join(lines))
def _fmt_seerr(self, r: dict) -> str:
title = r.get("title") or r.get("name") or "?"
mt = r.get("mediaType") or "?"
date_s = r.get("releaseDate") or r.get("firstAirDate") or ""
year = date_s[:4] if date_s else ""
info = (r.get("mediaInfo") or {}).get("status")
avail = SEERR_AVAIL.get(info, None) if info else None
bits = [f"*{title}*"]
if year:
bits.append(f"({year})")
bits.append(f"{mt}")
if avail:
bits.append(f"[{avail}]")
return "- " + " ".join(bits)
# ---------- Emby ----------
@media.subcommand("nowplaying", aliases=["np"], help="Active Emby sessions")
async def cmd_nowplaying(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
try:
sessions = await self.emby.sessions()
except EmbyError as ex:
await self._say(evt, f"Emby fetch failed: {ex}")
return
active = [s for s in sessions if s.get("NowPlayingItem")]
if not active:
await self._say(evt, "Nothing playing on Emby right now.")
return
lines = [f"**Now playing on Emby ({len(active)}):**"]
for s in active:
item = s.get("NowPlayingItem") or {}
user = s.get("UserName") or "?"
client = s.get("Client") or ""
t = self._fmt_emby_item(item)
ps = s.get("PlayState") or {}
pos = ps.get("PositionTicks") or 0
run = item.get("RunTimeTicks") or 0
pct = (pos / run * 100) if run else 0
paused = " (paused)" if ps.get("IsPaused") else ""
lines.append(f"- **{user}** — {t} · {pct:.0f}%{paused} [{client}]")
await self._say(evt, "\n".join(lines))
@media.subcommand("recent", help="Recently added — `!media recent [movies|tv]`")
@command.argument("kind", required=False)
async def cmd_recent(self, evt: MessageEvent, kind: str = "") -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
# Use the sender's emby_user_id if set, else fall back to the first mapped user's id
user_cfg = self._resolve_user(evt.sender) or {}
emby_uid = user_cfg.get("emby_user_id")
if not emby_uid or emby_uid == "TBD":
for v in (self.config["user_map"] or {}).values():
if v and v.get("emby_user_id") and v["emby_user_id"] != "TBD":
emby_uid = v["emby_user_id"]
break
if not emby_uid:
await self._say(evt, "No usable Emby user_id in config.")
return
item_types = None
if kind.lower() in ("movie", "movies"):
item_types = "Movie"
elif kind.lower() in ("tv", "series", "shows"):
item_types = "Series,Episode"
try:
items = await self.emby.recently_added(emby_uid, limit=10, item_types=item_types)
except EmbyError as ex:
await self._say(evt, f"Emby fetch failed: {ex}")
return
if not items:
await self._say(evt, "Nothing recently added.")
return
label = f" ({kind})" if kind else ""
lines = [f"**Recently added{label} ({len(items)}):**"]
for it in items:
lines.append("- " + self._fmt_emby_item(it))
await self._say(evt, "\n".join(lines))
@media.subcommand("watched", help="Your recently watched items on Emby")
async def cmd_watched(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
user_cfg = self._resolve_user(evt.sender) or {}
emby_uid = user_cfg.get("emby_user_id")
if not emby_uid or emby_uid == "TBD":
await self._say(evt, "Your Matrix user has no Emby user_id mapped — ask maddox.")
return
try:
items = await self.emby.user_played(emby_uid, limit=10)
except EmbyError as ex:
await self._say(evt, f"Emby fetch failed: {ex}")
return
if not items:
await self._say(evt, "No recently watched items.")
return
lines = [f"**Recently watched ({len(items)}):**"]
for it in items:
lines.append("- " + self._fmt_emby_item(it))
await self._say(evt, "\n".join(lines))
def _fmt_emby_item(self, it: dict) -> str:
t = it.get("Type")
name = it.get("Name") or "?"
year = it.get("ProductionYear")
if t == "Episode":
series = it.get("SeriesName") or "?"
sn = it.get("ParentIndexNumber")
ep = it.get("IndexNumber")
tag = f"S{sn:02d}E{ep:02d}" if sn and ep else ""
return f"*{series}* {tag}{name}".strip()
if year:
return f"*{name}* ({year})"
return f"*{name}*"
# ---------- Sonarr / Radarr ----------
@media.subcommand("queue", help="Combined Sonarr + Radarr queue")
async def cmd_queue(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
try:
sonarr_q, radarr_q = await asyncio.gather(
self.sonarr.queue(), self.radarr.queue(),
return_exceptions=True,
)
except Exception as ex:
await self._say(evt, f"Queue fetch failed: {ex}")
return
lines: list[str] = []
if isinstance(sonarr_q, Exception):
lines.append(f"**Sonarr:** unreachable ({sonarr_q})")
else:
lines.append(f"**Sonarr queue ({len(sonarr_q)}):**")
for q in sonarr_q[:10]:
series = (q.get("series") or {}).get("title") or "?"
ep = q.get("episode") or {}
tag = ""
if ep.get("seasonNumber") is not None and ep.get("episodeNumber") is not None:
tag = f" S{ep['seasonNumber']:02d}E{ep['episodeNumber']:02d}"
status = q.get("status") or "?"
pct = self._arr_pct(q)
lines.append(f"- *{series}*{tag}{status} {pct}")
lines.append("")
if isinstance(radarr_q, Exception):
lines.append(f"**Radarr:** unreachable ({radarr_q})")
else:
lines.append(f"**Radarr queue ({len(radarr_q)}):**")
for q in radarr_q[:10]:
title = (q.get("movie") or {}).get("title") or q.get("title") or "?"
status = q.get("status") or "?"
pct = self._arr_pct(q)
lines.append(f"- *{title}* — {status} {pct}")
await self._say(evt, "\n".join(lines))
@media.subcommand("upcoming", help="Sonarr calendar — episodes airing in next 7 days")
async def cmd_upcoming(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
today = date.today()
end = today + timedelta(days=7)
try:
cal = await self.sonarr.calendar(today.isoformat(), end.isoformat())
except ArrError as ex:
await self._say(evt, f"Calendar fetch failed: {ex}")
return
if not cal:
await self._say(evt, "Nothing airing in the next 7 days.")
return
lines = [f"**Upcoming ({len(cal)}):**"]
for ep in cal[:15]:
series = (ep.get("series") or {}).get("title") or "?"
sn = ep.get("seasonNumber")
en = ep.get("episodeNumber")
tag = f"S{sn:02d}E{en:02d}" if sn is not None and en is not None else ""
t = ep.get("title") or ""
air = (ep.get("airDateUtc") or ep.get("airDate") or "")[:10]
lines.append(f"- {air}: *{series}* {tag}{t}")
await self._say(evt, "\n".join(lines))
@media.subcommand("missing", help="Sonarr wanted/missing list")
async def cmd_missing(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
try:
items = await self.sonarr.missing(page_size=10)
except ArrError as ex:
await self._say(evt, f"Missing fetch failed: {ex}")
return
if not items:
await self._say(evt, "No missing episodes — Sonarr is happy.")
return
lines = [f"**Wanted/Missing ({len(items)}):**"]
for ep in items:
series = (ep.get("series") or {}).get("title") or "?"
sn = ep.get("seasonNumber")
en = ep.get("episodeNumber")
tag = f"S{sn:02d}E{en:02d}" if sn is not None and en is not None else ""
t = ep.get("title") or ""
air = (ep.get("airDateUtc") or ep.get("airDate") or "")[:10]
lines.append(f"- *{series}* {tag}{t} (aired {air})")
await self._say(evt, "\n".join(lines))
def _arr_pct(self, q: dict) -> str:
size = q.get("size") or 0
left = q.get("sizeleft") or 0
if size <= 0:
return ""
pct = (1 - left / size) * 100
return f"{pct:.0f}%"
# ---------- Downloads ----------
@media.subcommand("activity", aliases=["dl"], help="NZBGet + qBittorrent active downloads")
async def cmd_activity(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
nzb_res, qbt_res = await asyncio.gather(
self.nzbget.listgroups(),
self.qbt.downloading(),
return_exceptions=True,
)
lines: list[str] = []
# NZBGet
if isinstance(nzb_res, Exception):
lines.append(f"**NZBGet:** unreachable ({nzb_res})")
else:
active = [g for g in nzb_res if (g.get("RemainingSizeMB") or 0) > 0]
lines.append(f"**NZBGet ({len(active)} active):**" if active else "**NZBGet:** idle")
for g in active[:8]:
name = g.get("NZBNicename") or g.get("NZBName") or "?"
rate_mbps = (g.get("DownloadRate") or 0) / 1024 / 1024 # NZBGet reports B/s
size_mb = g.get("FileSizeMB") or 0
left_mb = g.get("RemainingSizeMB") or 0
pct = (1 - left_mb / size_mb) * 100 if size_mb else 0
lines.append(f"- *{name}* — {pct:.0f}% · {rate_mbps:.1f} MB/s")
lines.append("")
# qBittorrent
if isinstance(qbt_res, Exception):
lines.append(f"**qBittorrent:** unreachable ({qbt_res})")
else:
lines.append(f"**qBittorrent ({len(qbt_res)} active):**" if qbt_res else "**qBittorrent:** idle")
for t in qbt_res[:8]:
name = t.get("name") or "?"
pct = (t.get("progress") or 0) * 100
speed = _human_bytes(t.get("dlspeed") or 0) + "/s"
eta = _human_eta(t.get("eta"))
lines.append(f"- *{name}* — {pct:.0f}% · {speed} · ETA {eta}")
await self._say(evt, "\n".join(lines))