maubot-media/media_bot/bot.py
Maddox e1a7aa7b5b v0.3.0: reactions, posters, Seerr webhook
- Admins can react 👍/ or 👎/ on a !media request to approve/decline
- Posters auto-attach to request confirmations and !media random picks
  (TMDB for Seerr items, Emby /Items/{id}/Images/Primary for library items)
- New @web.post(/seerr-webhook) handler — Seerr → Matrix room directly,
  replaces the Telegram bridge path
- New config: posters_enabled, admin_users, notifications_room,
  seerr_webhook_secret
2026-04-28 18:16:54 -04:00

991 lines
41 KiB
Python

"""Media bot — Matrix companion for the homelab media stack.
Wraps Seerr (search/request), Emby (library/playback), Sonarr/Radarr (queue/calendar),
NZBGet + qBittorrent (active downloads).
Each Matrix sender is mapped to per-service user IDs via plugin config; unmapped
senders are rejected. All replies are prefixed with the sender's MXID so notifications
in shared rooms stay readable (matches the Books Bot convention).
"""
from __future__ import annotations
import asyncio
from datetime import date, datetime, timedelta, timezone
from typing import Any, Optional, Type
import aiohttp
from aiohttp.web import Request, Response, json_response
from maubot import MessageEvent, Plugin
from maubot.handlers import command, event, web
from mautrix.types import (
EventType,
Format,
ImageInfo,
MediaMessageEventContent,
MessageType,
ReactionEvent,
RoomID,
TextMessageEventContent,
)
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
from .clients.arr import ArrError, RadarrClient, SonarrClient
from .clients.downloads import DownloadError, NzbgetClient, QbtClient
from .clients.emby import EmbyClient, EmbyError
from .clients.seerr import SeerrClient, SeerrError
# --- Seerr availability codes ----------------------------------------------
# https://api-docs.overseerr.dev — `Status` enum
SEERR_AVAIL = {
1: "unknown",
2: "pending",
3: "processing",
4: "partial",
5: "available",
}
def _human_bytes(n: float) -> str:
for unit in ("B", "KB", "MB", "GB", "TB"):
if abs(n) < 1024:
return f"{n:.1f} {unit}"
n /= 1024
return f"{n:.1f} PB"
def _human_eta(seconds: int | float | None) -> str:
if not seconds or seconds < 0 or seconds > 8640000:
return "?"
s = int(seconds)
h, rem = divmod(s, 3600)
m, s = divmod(rem, 60)
if h:
return f"{h}h{m}m"
if m:
return f"{m}m{s}s"
return f"{s}s"
class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("http_timeout")
helper.copy("default_results")
helper.copy("posters_enabled")
helper.copy("admin_users")
helper.copy("notifications_room")
helper.copy("seerr_webhook_secret")
helper.copy("seerr")
helper.copy("sonarr")
helper.copy("radarr")
helper.copy("emby")
helper.copy("nzbget")
helper.copy("qbittorrent")
helper.copy("user_map")
# Reactions counted as approve/decline. Skin-tone variants pass via startswith.
APPROVE_KEYS = ("👍", "", "🟢")
DECLINE_KEYS = ("👎", "", "🟥")
def _matches(key: str, candidates: tuple[str, ...]) -> bool:
return any(key.startswith(c) for c in candidates)
class MediaBot(Plugin):
config: Config
session: aiohttp.ClientSession
seerr: SeerrClient
sonarr: SonarrClient
radarr: RadarrClient
emby: EmbyClient
nzbget: NzbgetClient
qbt: QbtClient
# Per-(room, sender) cache of last search/trending results — used by
# `!media request <N>` to skip re-typing the query. In-memory only; a bot
# restart clears it (acceptable, results are cheap to rebuild).
_search_cache: dict[tuple[str, str], list[dict]]
# Map of bot-message event_id → Seerr request_id, for reaction-based
# approve/decline. Capped at 200; oldest entries are evicted on overflow.
_pending_requests: dict[str, int]
async def start(self) -> None:
self.config.load_and_update()
timeout = aiohttp.ClientTimeout(total=self.config["http_timeout"])
self.session = aiohttp.ClientSession(timeout=timeout)
s, so, r, e, n, q = (
self.config["seerr"], self.config["sonarr"], self.config["radarr"],
self.config["emby"], self.config["nzbget"], self.config["qbittorrent"],
)
self.seerr = SeerrClient(self.session, s["url"], s["api_key"])
self.sonarr = SonarrClient(self.session, so["url"], so["api_key"])
self.radarr = RadarrClient(self.session, r["url"], r["api_key"])
self.emby = EmbyClient(self.session, e["url"], e["api_key"])
self.nzbget = NzbgetClient(self.session, n["url"], n["username"], n["password"])
self.qbt = QbtClient(self.session, q["url"], q["username"], q["password"])
self._search_cache = {}
self._pending_requests = {}
self.log.info("Media bot started — users=%d", len(self.config["user_map"] or {}))
async def stop(self) -> None:
if hasattr(self, "session"):
await self.session.close()
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config
# ---------- helpers ----------
def _resolve_user(self, sender: str) -> Optional[dict]:
return (self.config["user_map"] or {}).get(sender)
async def _say(self, evt: MessageEvent, message: str) -> None:
await evt.respond(f"{evt.sender}\n\n{message}")
async def _reject_unmapped(self, evt: MessageEvent) -> bool:
if not self._resolve_user(evt.sender):
self.log.warning("Unauthorized sender: %s", evt.sender)
await self._say(evt, "Sorry, your Matrix user isn't authorized to use the media bot.")
return True
return False
def _user_pref(self, sender: str, key: str, default: Any) -> Any:
cfg = self._resolve_user(sender) or {}
val = cfg.get(key)
return val if val is not None else default
def _result_count(self, sender: str) -> int:
return int(self._user_pref(sender, "result_count", self.config["default_results"]))
def _stash_search(self, evt: MessageEvent, results: list[dict]) -> None:
"""Cache search/trending results so the user can `!media request <N>`."""
self._search_cache[(evt.room_id, evt.sender)] = list(results)
def _track_request(self, event_id: str, request_id: int) -> None:
self._pending_requests[event_id] = request_id
if len(self._pending_requests) > 200:
oldest = next(iter(self._pending_requests))
del self._pending_requests[oldest]
async def _post_poster(self, room_id: str, image_url: str, caption: str) -> bool:
"""Download a poster URL, upload to Matrix, send as m.image. Returns True on success."""
if not self.config["posters_enabled"]:
return False
try:
async with self.session.get(image_url) as r:
if r.status >= 400:
self.log.info("poster fetch %s%s", image_url, r.status)
return False
data = await r.read()
mime = r.headers.get("Content-Type", "image/jpeg").split(";")[0].strip()
mxc = await self.client.upload_media(data, mime_type=mime)
content = MediaMessageEventContent(
msgtype=MessageType.IMAGE,
body=caption,
url=mxc,
info=ImageInfo(mimetype=mime, size=len(data)),
)
await self.client.send_message(RoomID(room_id), content)
return True
except Exception as ex:
self.log.warning("poster upload failed: %s", ex)
return False
# ---------- top-level command ----------
@command.new("media", aliases=["m"], help="Media stack bot — try `!media help`",
require_subcommand=True)
async def media(self) -> None:
pass
@media.subcommand("help", help="Show available commands")
async def cmd_help(self, evt: MessageEvent) -> None:
await evt.mark_read()
msg = (
"**Media bot commands**\n\n"
"*Search & request (Seerr)*\n"
"- `!media search <query>` — top results (numbered)\n"
"- `!media request <query> [--tv|--movie]` — request top hit\n"
"- `!media request <N>` — pick item N from your last search/trending\n"
"- `!media requests` — your pending/processing requests\n"
"- `!media trending` — what's trending (numbered)\n\n"
"*Library (Emby)*\n"
"- `!media nowplaying` — active sessions\n"
"- `!media recent [movies|tv]` — recently added\n"
"- `!media watched` — what you recently finished\n"
"- `!media find <query>` — search the existing library\n"
"- `!media resume` — your continue-watching list\n"
"- `!media random [movie|tv]` — random unwatched pick\n\n"
"*Sonarr / Radarr*\n"
"- `!media queue` — combined queue\n"
"- `!media upcoming` — Sonarr calendar (next 7 days)\n"
"- `!media missing` — Sonarr wanted/missing\n"
"- `!media health` — active warnings on either arr\n\n"
"*Downloads (NZBGet + qBt)*\n"
"- `!media activity` — current downloads\n"
"- `!media completed` — finished in the last 24h\n"
"- `!media speed` — aggregate down/up speed\n"
"- `!media pause` / `!media unpause` — global pause/resume\n\n"
"_Admins can react 👍/👎 on a request to approve/decline._"
)
await self._say(evt, msg)
# ---------- Seerr ----------
@media.subcommand("search", help="Search Seerr for a movie or show")
@command.argument("query", pass_raw=True, required=True)
async def cmd_search(self, evt: MessageEvent, query: str) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
try:
results = await self.seerr.search(query)
except SeerrError as ex:
await self._say(evt, f"Search failed: {ex}")
return
results = [r for r in results if r.get("mediaType") in ("movie", "tv")]
if not results:
await self._say(evt, f"No Seerr results for **{query}**.")
return
n = self._result_count(evt.sender)
shown = results[:n]
self._stash_search(evt, shown)
lines = [f"**Top {len(shown)} for '{query}':**"]
for i, r in enumerate(shown, 1):
lines.append(f"{i}. " + self._fmt_seerr(r, leading_dash=False))
lines.append("")
lines.append("_Pick one with `!media request <N>`._")
await self._say(evt, "\n".join(lines))
@media.subcommand("request",
help="Request a movie/show. Pass `<query>` or a number from a recent search.")
@command.argument("query", pass_raw=True, required=True)
async def cmd_request(self, evt: MessageEvent, query: str) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
user_cfg = self._resolve_user(evt.sender) or {}
seerr_uid = user_cfg.get("seerr_user_id")
if not seerr_uid:
await self._say(evt, "Your Matrix user has no Seerr user_id mapped — ask maddox.")
return
# Numbered selection: !media request 2 → 2nd item from last search/trending
stripped = query.strip()
if stripped.isdigit():
cached = self._search_cache.get((evt.room_id, evt.sender)) or []
idx = int(stripped) - 1
if not cached:
await self._say(evt, "No recent search results to pick from. Run `!media search <query>` first.")
return
if idx < 0 or idx >= len(cached):
await self._say(evt, f"Pick a number 1-{len(cached)}.")
return
top = cached[idx]
await self._do_request(evt, seerr_uid, top)
return
# Free-text path: optional --tv/--movie flag, else fall back to user default
forced_type: Optional[str] = None
cleaned: list[str] = []
for w in stripped.split():
if w == "--tv":
forced_type = "tv"
elif w == "--movie":
forced_type = "movie"
else:
cleaned.append(w)
if forced_type is None:
forced_type = user_cfg.get("default_media_type")
q = " ".join(cleaned).strip()
if not q:
await self._say(evt, "Need a search query, e.g. `!media request dune --movie`.")
return
try:
results = await self.seerr.search(q)
except SeerrError as ex:
await self._say(evt, f"Search failed: {ex}")
return
candidates = [r for r in results if r.get("mediaType") in ("movie", "tv")]
if forced_type:
candidates = [r for r in candidates if r.get("mediaType") == forced_type]
if not candidates:
await self._say(evt, f"No matching results for **{q}**.")
return
await self._do_request(evt, seerr_uid, candidates[0])
async def _do_request(self, evt: MessageEvent, seerr_uid: int, item: dict) -> None:
media_type = item.get("mediaType")
tmdb_id = item.get("id")
title = item.get("title") or item.get("name") or "?"
if not (media_type and tmdb_id):
await self._say(evt, f"**{title}** is missing a media type or TMDB id — can't request.")
return
try:
req = await self.seerr.request(media_type, tmdb_id, seerr_uid)
except SeerrError as ex:
await self._say(evt, f"Request failed: {ex}")
return
status_id = (req or {}).get("status")
status = {1: "pending approval", 2: "approved", 3: "declined"}.get(status_id, str(status_id))
request_id = (req or {}).get("id")
# Optional poster preview
poster = SeerrClient.poster_url(item)
if poster:
await self._post_poster(evt.room_id, poster, f"{title} ({media_type})")
msg = f"Requested **{title}** ({media_type}) — status: *{status}*."
if status_id == 1 and request_id:
msg += "\n\n_Admins: react 👍 to approve or 👎 to decline._"
sent_id = await evt.respond(f"{evt.sender}\n\n{msg}")
if request_id and status_id == 1 and sent_id:
self._track_request(sent_id, request_id)
@media.subcommand("requests", help="Show your pending/processing Seerr requests")
async def cmd_requests(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
user_cfg = self._resolve_user(evt.sender)
seerr_uid = (user_cfg or {}).get("seerr_user_id")
if not seerr_uid:
await self._say(evt, "Your Matrix user has no Seerr user_id mapped — ask maddox.")
return
try:
reqs = await self.seerr.user_requests(seerr_uid, take=10)
except SeerrError as ex:
await self._say(evt, f"Lookup failed: {ex}")
return
if not reqs:
await self._say(evt, "No pending or processing requests.")
return
lines = [f"**Your requests ({len(reqs)}):**"]
for r in reqs:
mi = r.get("media") or {}
t = mi.get("title") or mi.get("name") or f"tmdb:{mi.get('tmdbId')}"
mt = mi.get("mediaType") or "?"
avail = SEERR_AVAIL.get(mi.get("status", 0), "?")
lines.append(f"- *{t}* ({mt}) — {avail}")
await self._say(evt, "\n".join(lines))
@media.subcommand("trending", help="Show what's trending in Seerr")
async def cmd_trending(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
try:
results = await self.seerr.trending()
except SeerrError as ex:
await self._say(evt, f"Trending fetch failed: {ex}")
return
n = self._result_count(evt.sender)
results = [r for r in results if r.get("mediaType") in ("movie", "tv")][:n]
if not results:
await self._say(evt, "Nothing trending right now.")
return
self._stash_search(evt, results)
lines = [f"**Trending ({len(results)}):**"]
for i, r in enumerate(results, 1):
lines.append(f"{i}. " + self._fmt_seerr(r, leading_dash=False))
lines.append("")
lines.append("_Pick one with `!media request <N>`._")
await self._say(evt, "\n".join(lines))
def _fmt_seerr(self, r: dict, leading_dash: bool = True) -> str:
title = r.get("title") or r.get("name") or "?"
mt = r.get("mediaType") or "?"
date_s = r.get("releaseDate") or r.get("firstAirDate") or ""
year = date_s[:4] if date_s else ""
info = (r.get("mediaInfo") or {}).get("status")
avail = SEERR_AVAIL.get(info, None) if info else None
bits = [f"*{title}*"]
if year:
bits.append(f"({year})")
bits.append(f"{mt}")
if avail:
bits.append(f"[{avail}]")
prefix = "- " if leading_dash else ""
return prefix + " ".join(bits)
# ---------- Emby ----------
@media.subcommand("nowplaying", aliases=["np"], help="Active Emby sessions")
async def cmd_nowplaying(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
try:
sessions = await self.emby.sessions()
except EmbyError as ex:
await self._say(evt, f"Emby fetch failed: {ex}")
return
active = [s for s in sessions if s.get("NowPlayingItem")]
if not active:
await self._say(evt, "Nothing playing on Emby right now.")
return
lines = [f"**Now playing on Emby ({len(active)}):**"]
for s in active:
item = s.get("NowPlayingItem") or {}
user = s.get("UserName") or "?"
client = s.get("Client") or ""
t = self._fmt_emby_item(item)
ps = s.get("PlayState") or {}
pos = ps.get("PositionTicks") or 0
run = item.get("RunTimeTicks") or 0
pct = (pos / run * 100) if run else 0
paused = " (paused)" if ps.get("IsPaused") else ""
lines.append(f"- **{user}** — {t} · {pct:.0f}%{paused} [{client}]")
await self._say(evt, "\n".join(lines))
@media.subcommand("recent", help="Recently added — `!media recent [movies|tv]`")
@command.argument("kind", required=False)
async def cmd_recent(self, evt: MessageEvent, kind: str = "") -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
# Use the sender's emby_user_id if set, else fall back to the first mapped user's id
user_cfg = self._resolve_user(evt.sender) or {}
emby_uid = user_cfg.get("emby_user_id")
if not emby_uid or emby_uid == "TBD":
for v in (self.config["user_map"] or {}).values():
if v and v.get("emby_user_id") and v["emby_user_id"] != "TBD":
emby_uid = v["emby_user_id"]
break
if not emby_uid:
await self._say(evt, "No usable Emby user_id in config.")
return
item_types = None
if kind.lower() in ("movie", "movies"):
item_types = "Movie"
elif kind.lower() in ("tv", "series", "shows"):
item_types = "Series,Episode"
try:
items = await self.emby.recently_added(emby_uid, limit=10, item_types=item_types)
except EmbyError as ex:
await self._say(evt, f"Emby fetch failed: {ex}")
return
if not items:
await self._say(evt, "Nothing recently added.")
return
label = f" ({kind})" if kind else ""
lines = [f"**Recently added{label} ({len(items)}):**"]
for it in items:
lines.append("- " + self._fmt_emby_item(it))
await self._say(evt, "\n".join(lines))
@media.subcommand("watched", help="Your recently watched items on Emby")
async def cmd_watched(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
user_cfg = self._resolve_user(evt.sender) or {}
emby_uid = user_cfg.get("emby_user_id")
if not emby_uid or emby_uid == "TBD":
await self._say(evt, "Your Matrix user has no Emby user_id mapped — ask maddox.")
return
try:
items = await self.emby.user_played(emby_uid, limit=10)
except EmbyError as ex:
await self._say(evt, f"Emby fetch failed: {ex}")
return
if not items:
await self._say(evt, "No recently watched items.")
return
lines = [f"**Recently watched ({len(items)}):**"]
for it in items:
lines.append("- " + self._fmt_emby_item(it))
await self._say(evt, "\n".join(lines))
@media.subcommand("find", help="Search the existing Emby library")
@command.argument("query", pass_raw=True, required=True)
async def cmd_find(self, evt: MessageEvent, query: str) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
emby_uid = self._any_emby_uid(evt.sender)
if not emby_uid:
await self._say(evt, "No usable Emby user_id in config.")
return
try:
items = await self.emby.find(emby_uid, query, limit=10)
except EmbyError as ex:
await self._say(evt, f"Emby fetch failed: {ex}")
return
if not items:
await self._say(evt, f"Nothing in the library matching **{query}**.")
return
lines = [f"**Found in library ({len(items)}):**"]
for it in items:
lines.append("- " + self._fmt_emby_item(it))
await self._say(evt, "\n".join(lines))
@media.subcommand("resume", help="Your continue-watching list")
async def cmd_resume(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
user_cfg = self._resolve_user(evt.sender) or {}
emby_uid = user_cfg.get("emby_user_id")
if not emby_uid or emby_uid == "TBD":
await self._say(evt, "Your Matrix user has no Emby user_id mapped — ask maddox.")
return
try:
items = await self.emby.resume(emby_uid, limit=10)
except EmbyError as ex:
await self._say(evt, f"Emby fetch failed: {ex}")
return
if not items:
await self._say(evt, "Nothing to resume — you've finished everything you started.")
return
lines = [f"**Continue watching ({len(items)}):**"]
for it in items:
pct = ""
ud = it.get("UserData") or {}
played_pct = ud.get("PlayedPercentage")
if played_pct:
pct = f" · {played_pct:.0f}%"
lines.append("- " + self._fmt_emby_item(it) + pct)
await self._say(evt, "\n".join(lines))
@media.subcommand("random", help="Random unwatched pick — `!media random [movie|tv]`")
@command.argument("kind", required=False)
async def cmd_random(self, evt: MessageEvent, kind: str = "") -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
user_cfg = self._resolve_user(evt.sender) or {}
emby_uid = user_cfg.get("emby_user_id")
if not emby_uid or emby_uid == "TBD":
emby_uid = self._any_emby_uid(evt.sender)
if not emby_uid:
await self._say(evt, "No usable Emby user_id in config.")
return
item_type = "Movie"
if kind.lower() in ("tv", "series", "show", "shows"):
item_type = "Series"
try:
item = await self.emby.random_unplayed(emby_uid, item_type=item_type)
except EmbyError as ex:
await self._say(evt, f"Emby fetch failed: {ex}")
return
if not item:
await self._say(evt, f"No unwatched {item_type.lower()}s found — go you.")
return
line = self._fmt_emby_item(item)
overview = (item.get("Overview") or "").strip()
if len(overview) > 240:
overview = overview[:240].rsplit(" ", 1)[0] + ""
poster = self.emby.poster_url(item)
if poster:
await self._post_poster(evt.room_id, poster, item.get("Name") or "poster")
msg = f"**Random pick:** {line}"
if overview:
msg += f"\n\n{overview}"
await self._say(evt, msg)
def _any_emby_uid(self, sender: str) -> Optional[str]:
"""Sender's emby_user_id, falling back to any other mapped user's id."""
cfg = self._resolve_user(sender) or {}
uid = cfg.get("emby_user_id")
if uid and uid != "TBD":
return uid
for v in (self.config["user_map"] or {}).values():
if v and v.get("emby_user_id") and v["emby_user_id"] != "TBD":
return v["emby_user_id"]
return None
def _fmt_emby_item(self, it: dict) -> str:
t = it.get("Type")
name = it.get("Name") or "?"
year = it.get("ProductionYear")
if t == "Episode":
series = it.get("SeriesName") or "?"
sn = it.get("ParentIndexNumber")
ep = it.get("IndexNumber")
tag = f"S{sn:02d}E{ep:02d}" if sn and ep else ""
return f"*{series}* {tag}{name}".strip()
if year:
return f"*{name}* ({year})"
return f"*{name}*"
# ---------- Sonarr / Radarr ----------
@media.subcommand("queue", help="Combined Sonarr + Radarr queue")
async def cmd_queue(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
try:
sonarr_q, radarr_q = await asyncio.gather(
self.sonarr.queue(), self.radarr.queue(),
return_exceptions=True,
)
except Exception as ex:
await self._say(evt, f"Queue fetch failed: {ex}")
return
lines: list[str] = []
if isinstance(sonarr_q, Exception):
lines.append(f"**Sonarr:** unreachable ({sonarr_q})")
else:
lines.append(f"**Sonarr queue ({len(sonarr_q)}):**")
for q in sonarr_q[:10]:
series = (q.get("series") or {}).get("title") or "?"
ep = q.get("episode") or {}
tag = ""
if ep.get("seasonNumber") is not None and ep.get("episodeNumber") is not None:
tag = f" S{ep['seasonNumber']:02d}E{ep['episodeNumber']:02d}"
status = q.get("status") or "?"
pct = self._arr_pct(q)
lines.append(f"- *{series}*{tag}{status} {pct}")
lines.append("")
if isinstance(radarr_q, Exception):
lines.append(f"**Radarr:** unreachable ({radarr_q})")
else:
lines.append(f"**Radarr queue ({len(radarr_q)}):**")
for q in radarr_q[:10]:
title = (q.get("movie") or {}).get("title") or q.get("title") or "?"
status = q.get("status") or "?"
pct = self._arr_pct(q)
lines.append(f"- *{title}* — {status} {pct}")
await self._say(evt, "\n".join(lines))
@media.subcommand("upcoming", help="Sonarr calendar — episodes airing in next 7 days")
async def cmd_upcoming(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
today = date.today()
end = today + timedelta(days=7)
try:
cal = await self.sonarr.calendar(today.isoformat(), end.isoformat())
except ArrError as ex:
await self._say(evt, f"Calendar fetch failed: {ex}")
return
if not cal:
await self._say(evt, "Nothing airing in the next 7 days.")
return
lines = [f"**Upcoming ({len(cal)}):**"]
for ep in cal[:15]:
series = (ep.get("series") or {}).get("title") or "?"
sn = ep.get("seasonNumber")
en = ep.get("episodeNumber")
tag = f"S{sn:02d}E{en:02d}" if sn is not None and en is not None else ""
t = ep.get("title") or ""
air = (ep.get("airDateUtc") or ep.get("airDate") or "")[:10]
lines.append(f"- {air}: *{series}* {tag}{t}")
await self._say(evt, "\n".join(lines))
@media.subcommand("missing", help="Sonarr wanted/missing list")
async def cmd_missing(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
try:
items = await self.sonarr.missing(page_size=10)
except ArrError as ex:
await self._say(evt, f"Missing fetch failed: {ex}")
return
if not items:
await self._say(evt, "No missing episodes — Sonarr is happy.")
return
lines = [f"**Wanted/Missing ({len(items)}):**"]
for ep in items:
series = (ep.get("series") or {}).get("title") or "?"
sn = ep.get("seasonNumber")
en = ep.get("episodeNumber")
tag = f"S{sn:02d}E{en:02d}" if sn is not None and en is not None else ""
t = ep.get("title") or ""
air = (ep.get("airDateUtc") or ep.get("airDate") or "")[:10]
lines.append(f"- *{series}* {tag}{t} (aired {air})")
await self._say(evt, "\n".join(lines))
@media.subcommand("health", help="Sonarr + Radarr active health warnings")
async def cmd_health(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
sonarr_h, radarr_h = await asyncio.gather(
self.sonarr.health(), self.radarr.health(),
return_exceptions=True,
)
lines: list[str] = []
for label, data in (("Sonarr", sonarr_h), ("Radarr", radarr_h)):
if isinstance(data, Exception):
lines.append(f"**{label}:** unreachable ({data})")
continue
if not data:
lines.append(f"**{label}:** ✅ healthy")
continue
lines.append(f"**{label} ({len(data)} warnings):**")
for h in data:
src = h.get("source") or "?"
msg = h.get("message") or "?"
lines.append(f"- [{src}] {msg}")
await self._say(evt, "\n".join(lines))
def _arr_pct(self, q: dict) -> str:
size = q.get("size") or 0
left = q.get("sizeleft") or 0
if size <= 0:
return ""
pct = (1 - left / size) * 100
return f"{pct:.0f}%"
# ---------- Downloads ----------
@media.subcommand("activity", aliases=["dl"], help="NZBGet + qBittorrent active downloads")
async def cmd_activity(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
nzb_res, qbt_res = await asyncio.gather(
self.nzbget.listgroups(),
self.qbt.downloading(),
return_exceptions=True,
)
lines: list[str] = []
# NZBGet
if isinstance(nzb_res, Exception):
lines.append(f"**NZBGet:** unreachable ({nzb_res})")
else:
active = [g for g in nzb_res if (g.get("RemainingSizeMB") or 0) > 0]
lines.append(f"**NZBGet ({len(active)} active):**" if active else "**NZBGet:** idle")
for g in active[:8]:
name = g.get("NZBNicename") or g.get("NZBName") or "?"
rate_mbps = (g.get("DownloadRate") or 0) / 1024 / 1024 # NZBGet reports B/s
size_mb = g.get("FileSizeMB") or 0
left_mb = g.get("RemainingSizeMB") or 0
pct = (1 - left_mb / size_mb) * 100 if size_mb else 0
lines.append(f"- *{name}* — {pct:.0f}% · {rate_mbps:.1f} MB/s")
lines.append("")
# qBittorrent
if isinstance(qbt_res, Exception):
lines.append(f"**qBittorrent:** unreachable ({qbt_res})")
else:
lines.append(f"**qBittorrent ({len(qbt_res)} active):**" if qbt_res else "**qBittorrent:** idle")
for t in qbt_res[:8]:
name = t.get("name") or "?"
pct = (t.get("progress") or 0) * 100
speed = _human_bytes(t.get("dlspeed") or 0) + "/s"
eta = _human_eta(t.get("eta"))
lines.append(f"- *{name}* — {pct:.0f}% · {speed} · ETA {eta}")
await self._say(evt, "\n".join(lines))
@media.subcommand("speed", help="Aggregate down/up speeds across NZBGet + qBt")
async def cmd_speed(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
nzb_status, qbt_xfer = await asyncio.gather(
self.nzbget.status(), self.qbt.transfer_info(),
return_exceptions=True,
)
lines: list[str] = []
if isinstance(nzb_status, Exception):
lines.append(f"**NZBGet:** unreachable ({nzb_status})")
else:
dl = nzb_status.get("DownloadRate") or 0 # bytes/s
paused = nzb_status.get("DownloadPaused") or nzb_status.get("ServerPaused")
tag = " (paused)" if paused else ""
lines.append(f"**NZBGet:** ↓ {_human_bytes(dl)}/s{tag}")
if isinstance(qbt_xfer, Exception):
lines.append(f"**qBittorrent:** unreachable ({qbt_xfer})")
else:
dl = qbt_xfer.get("dl_info_speed") or 0
ul = qbt_xfer.get("up_info_speed") or 0
state = qbt_xfer.get("connection_status") or "?"
lines.append(f"**qBittorrent:** ↓ {_human_bytes(dl)}/s · ↑ {_human_bytes(ul)}/s [{state}]")
await self._say(evt, "\n".join(lines))
@media.subcommand("completed", help="Downloads finished in the last 24h")
async def cmd_completed(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
cutoff = datetime.now(timezone.utc).timestamp() - 86400
nzb_hist, qbt_all = await asyncio.gather(
self.nzbget.history(), self.qbt.all_torrents(),
return_exceptions=True,
)
lines: list[str] = []
# NZBGet history items have HistoryTime (epoch) and FileSizeMB
if isinstance(nzb_hist, Exception):
lines.append(f"**NZBGet:** unreachable ({nzb_hist})")
else:
recent = [h for h in nzb_hist if (h.get("HistoryTime") or 0) >= cutoff]
lines.append(f"**NZBGet — completed last 24h ({len(recent)}):**")
for h in recent[:10]:
name = h.get("Name") or h.get("NZBName") or "?"
size_mb = h.get("FileSizeMB") or 0
status = h.get("Status") or "?"
lines.append(f"- *{name}* — {size_mb / 1024:.1f} GB [{status}]")
lines.append("")
# qBt: completion_on (epoch); finished torrents have it set
if isinstance(qbt_all, Exception):
lines.append(f"**qBittorrent:** unreachable ({qbt_all})")
else:
recent = [t for t in qbt_all
if (t.get("completion_on") or 0) >= cutoff
and (t.get("completion_on") or 0) > 0]
lines.append(f"**qBittorrent — completed last 24h ({len(recent)}):**")
for t in recent[:10]:
name = t.get("name") or "?"
size = t.get("size") or 0
lines.append(f"- *{name}* — {_human_bytes(size)}")
await self._say(evt, "\n".join(lines))
@media.subcommand("pause", help="Pause all downloads (NZBGet + qBittorrent)")
async def cmd_pause(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
results = await asyncio.gather(
self.nzbget.pause(), self.qbt.pause_all(),
return_exceptions=True,
)
nzb_ok = not isinstance(results[0], Exception)
qbt_ok = not isinstance(results[1], Exception)
msg = (
f"NZBGet: {'paused ⏸' if nzb_ok else f'failed ({results[0]})'}\n"
f"qBittorrent: {'paused ⏸' if qbt_ok else f'failed ({results[1]})'}"
)
await self._say(evt, msg)
@media.subcommand("unpause", help="Resume all downloads (NZBGet + qBittorrent)")
async def cmd_unpause(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
results = await asyncio.gather(
self.nzbget.unpause(), self.qbt.resume_all(),
return_exceptions=True,
)
nzb_ok = not isinstance(results[0], Exception)
qbt_ok = not isinstance(results[1], Exception)
msg = (
f"NZBGet: {'resumed ▶' if nzb_ok else f'failed ({results[0]})'}\n"
f"qBittorrent: {'resumed ▶' if qbt_ok else f'failed ({results[1]})'}"
)
await self._say(evt, msg)
# ---------- Reactions: admin approve/decline ----------
@event.on(EventType.REACTION)
async def on_reaction(self, evt: ReactionEvent) -> None:
relates = getattr(evt.content, "relates_to", None)
if not relates or not relates.event_id or not relates.key:
return
request_id = self._pending_requests.get(relates.event_id)
if not request_id:
return # not one of our request messages
admins = self.config["admin_users"] or []
if evt.sender not in admins:
self.log.info("Reaction from non-admin %s ignored", evt.sender)
return
key = relates.key
if _matches(key, APPROVE_KEYS):
label, fn = "approved ✅", self.seerr.approve
elif _matches(key, DECLINE_KEYS):
label, fn = "declined ❌", self.seerr.decline
else:
return # unrelated reaction
try:
await fn(request_id)
except SeerrError as ex:
await self.client.send_text(evt.room_id, text=f"Request {request_id} action failed: {ex}")
return
self._pending_requests.pop(relates.event_id, None)
await self.client.send_text(
evt.room_id, text=f"Request {request_id} {label} (by {evt.sender})"
)
# ---------- Webhook: Seerr → Matrix notifications ----------
@web.post("/seerr-webhook")
async def seerr_webhook(self, req: Request) -> Response:
secret = self.config["seerr_webhook_secret"] or ""
if secret:
auth = req.headers.get("Authorization", "")
if auth != f"Bearer {secret}":
self.log.warning("Seerr webhook bad auth from %s", req.remote)
return Response(status=401, text="unauthorized")
try:
payload = await req.json()
except Exception:
return Response(status=400, text="invalid json")
room = (self.config["notifications_room"] or "").strip()
if not room:
return json_response({"ok": False, "error": "notifications_room not configured"})
text, html = self._format_seerr_event(payload)
try:
content = TextMessageEventContent(
msgtype=MessageType.NOTICE,
body=text,
format=Format.HTML,
formatted_body=html,
)
await self.client.send_message(RoomID(room), content)
except Exception as ex:
self.log.exception("Failed to post Seerr webhook to %s", room)
return json_response({"ok": False, "error": str(ex)}, status=500)
poster = payload.get("image")
if poster and self.config["posters_enabled"]:
await self._post_poster(room, poster, payload.get("subject") or "poster")
return json_response({"ok": True})
def _format_seerr_event(self, p: dict) -> tuple[str, str]:
nt = (p.get("notification_type") or "").upper()
subject = p.get("subject") or "?"
message = p.get("message") or ""
media = p.get("media") or {}
request = p.get("request") or {}
media_type = media.get("media_type") or "?"
requester = request.get("requestedBy_username") or ""
emoji = {
"MEDIA_PENDING": "📥",
"MEDIA_APPROVED": "",
"MEDIA_AUTO_APPROVED": "",
"MEDIA_AVAILABLE": "🎬",
"MEDIA_DECLINED": "",
"MEDIA_FAILED": "⚠️",
"ISSUE_CREATED": "🐛",
"ISSUE_COMMENT": "💬",
"ISSUE_RESOLVED": "🛠",
"TEST_NOTIFICATION": "🔧",
}.get(nt, "📣")
label = nt.replace("_", " ").title() or "Notification"
text_parts = [f"{emoji} {label}{subject}"]
if media_type and media_type != "?":
text_parts.append(f"({media_type})")
if requester:
text_parts.append(f"— requested by {requester}")
text = " ".join(text_parts)
if message:
text += f"\n\n{message}"
html = (
f"{emoji} <strong>{label}</strong> — <em>{subject}</em>"
+ (f" ({media_type})" if media_type != '?' else "")
+ (f" — requested by <strong>{requester}</strong>" if requester else "")
+ (f"<br/><br/>{message}" if message else "")
)
return text, html