maubot-media/media_bot/bot.py
Maddox 87eef44d93 v0.6.2: Sunday cleanup recap as separate follow-up message
The Sunday emby-cleaner recap used to be appended to the same digest body
and shared the (N/M) byte-length splitting. Now it ships as its own
message after the main digest, keeping the daily-digest content focused
on what's new and the cleanup recap on what was removed.

Closes maubot-media#1.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-11 20:39:39 -04:00

1822 lines
76 KiB
Python

"""Media bot — Matrix companion for the homelab media stack.
Wraps Seerr (search/request), Emby (library/playback), Sonarr/Radarr (queue/calendar),
NZBGet + qBittorrent (active downloads).
Each Matrix sender is mapped to per-service user IDs via plugin config; unmapped
senders are rejected. All replies are prefixed with the sender's MXID so notifications
in shared rooms stay readable (matches the Books Bot convention).
"""
from __future__ import annotations
import asyncio
from datetime import date, datetime, timedelta, timezone
from typing import Any, Optional, Type
import aiohttp
from aiohttp.web import Request, Response, json_response
from maubot import MessageEvent, Plugin
from maubot.handlers import command, event, web
from mautrix.types import (
EventType,
Format,
ImageInfo,
MediaMessageEventContent,
MessageType,
ReactionEvent,
RoomID,
TextMessageEventContent,
)
from mautrix.util.async_db import UpgradeTable
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
from .clients.arr import ArrError, LidarrClient, RadarrClient, SonarrClient
from .clients.downloads import DownloadError, NzbgetClient, QbtClient
from .clients.emby import EmbyClient, EmbyError
from .clients.seerr import SeerrClient, SeerrError
from .db import upgrade_table
def _is_us_dst(dt: datetime) -> bool:
"""Approximate US DST: second Sunday of March → first Sunday of November.
Used because the maubot container ships without tzdata for ZoneInfo.
"""
y = dt.year
march1 = datetime(y, 3, 1)
second_sunday = march1 + timedelta(days=((6 - march1.weekday()) % 7) + 7)
nov1 = datetime(y, 11, 1)
first_sunday = nov1 + timedelta(days=(6 - nov1.weekday()) % 7)
naive = dt.replace(tzinfo=None) if dt.tzinfo else dt
return second_sunday <= naive < first_sunday
def _local_now() -> datetime:
"""Approximate Indianapolis local time (Eastern, with DST). Naive."""
utc = datetime.now(timezone.utc)
offset = -4 if _is_us_dst(utc) else -5
return (utc + timedelta(hours=offset)).replace(tzinfo=None)
# --- Seerr availability codes ----------------------------------------------
# https://api-docs.overseerr.dev — `Status` enum
SEERR_AVAIL = {
1: "unknown",
2: "pending",
3: "processing",
4: "partial",
5: "available",
}
def _human_bytes(n: float) -> str:
for unit in ("B", "KB", "MB", "GB", "TB"):
if abs(n) < 1024:
return f"{n:.1f} {unit}"
n /= 1024
return f"{n:.1f} PB"
def _human_eta(seconds: int | float | None) -> str:
if not seconds or seconds < 0 or seconds > 8640000:
return "?"
s = int(seconds)
h, rem = divmod(s, 3600)
m, s = divmod(rem, 60)
if h:
return f"{h}h{m}m"
if m:
return f"{m}m{s}s"
return f"{s}s"
class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("http_timeout")
helper.copy("default_results")
helper.copy("posters_enabled")
helper.copy("admin_users")
helper.copy("notifications_room")
helper.copy("seerr_webhook_secret")
helper.copy("sonarr_webhook_secret")
helper.copy("digest_enabled")
helper.copy("digest_hour")
helper.copy("ntfy_url")
helper.copy("emby_cleaner_topic")
helper.copy("seerr")
helper.copy("sonarr")
helper.copy("radarr")
helper.copy("lidarr")
helper.copy("emby")
helper.copy("nzbget")
helper.copy("qbittorrent")
helper.copy("user_map")
# Reactions counted as approve/decline. Skin-tone variants pass via startswith.
APPROVE_KEYS = ("👍", "", "🟢")
DECLINE_KEYS = ("👎", "", "🟥")
def _matches(key: str, candidates: tuple[str, ...]) -> bool:
return any(key.startswith(c) for c in candidates)
class MediaBot(Plugin):
config: Config
session: aiohttp.ClientSession
seerr: SeerrClient
sonarr: SonarrClient
radarr: RadarrClient
lidarr: LidarrClient
emby: EmbyClient
nzbget: NzbgetClient
qbt: QbtClient
# Per-(room, sender) cache of last search/trending results — used by
# `!media request <N>` to skip re-typing the query. In-memory only; a bot
# restart clears it (acceptable, results are cheap to rebuild).
_search_cache: dict[tuple[str, str], list[dict]]
# Map of bot-message event_id → Seerr request_id, for reaction-based
# approve/decline. Capped at 200; oldest entries are evicted on overflow.
_pending_requests: dict[str, int]
# Background task running the daily digest scheduler.
_digest_task: Optional[asyncio.Task]
@classmethod
def get_db_upgrade_table(cls) -> UpgradeTable:
return upgrade_table
async def start(self) -> None:
self.config.load_and_update()
timeout = aiohttp.ClientTimeout(total=self.config["http_timeout"])
self.session = aiohttp.ClientSession(timeout=timeout)
s, so, r, l, e, n, q = (
self.config["seerr"], self.config["sonarr"], self.config["radarr"],
self.config["lidarr"], self.config["emby"],
self.config["nzbget"], self.config["qbittorrent"],
)
self.seerr = SeerrClient(self.session, s["url"], s["api_key"])
self.sonarr = SonarrClient(self.session, so["url"], so["api_key"])
self.radarr = RadarrClient(self.session, r["url"], r["api_key"])
self.lidarr = LidarrClient(self.session, l["url"], l["api_key"])
self.emby = EmbyClient(self.session, e["url"], e["api_key"])
self.nzbget = NzbgetClient(self.session, n["url"], n["username"], n["password"])
self.qbt = QbtClient(self.session, q["url"], q["username"], q["password"])
# Seerr is optional. When disabled, search/request fall back to direct
# Sonarr/Radarr lookup+add, and the seerr-only commands return a hint.
self.seerr_enabled = self._service_configured(s)
self._search_cache = {}
self._pending_requests = {}
await self._ensure_db_schema()
self._digest_task = None
if self.config["digest_enabled"] and (self.config["notifications_room"] or "").strip():
self._digest_task = asyncio.create_task(self._digest_loop())
self.log.info("Media bot started — users=%d", len(self.config["user_map"] or {}))
async def stop(self) -> None:
if self._digest_task and not self._digest_task.done():
self._digest_task.cancel()
if hasattr(self, "session"):
await self.session.close()
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config
# ---------- helpers ----------
@staticmethod
def _service_configured(block: dict | None) -> bool:
"""A service block is 'configured' if it has a non-empty url and a
non-placeholder api_key. Used to gate optional integrations."""
if not block:
return False
url = (block.get("url") or "").strip()
key = (block.get("api_key") or "").strip()
return bool(url) and key not in ("", "CHANGEME")
def _resolve_user(self, sender: str) -> Optional[dict]:
return (self.config["user_map"] or {}).get(sender)
def _media_label(self) -> str:
"""Display label for the configured Emby/Jellyfin server."""
kind = ((self.config["emby"] or {}).get("type") or "emby").lower()
return "Jellyfin" if kind == "jellyfin" else "Emby"
async def _say(self, evt: MessageEvent, message: str) -> None:
await evt.respond(f"{evt.sender}\n\n{message}")
async def _reject_unmapped(self, evt: MessageEvent) -> bool:
if not self._resolve_user(evt.sender):
self.log.warning("Unauthorized sender: %s", evt.sender)
await self._say(evt, "Sorry, your Matrix user isn't authorized to use the media bot.")
return True
return False
def _user_pref(self, sender: str, key: str, default: Any) -> Any:
cfg = self._resolve_user(sender) or {}
val = cfg.get(key)
return val if val is not None else default
def _result_count(self, sender: str) -> int:
return int(self._user_pref(sender, "result_count", self.config["default_results"]))
def _stash_search(self, evt: MessageEvent, results: list[dict]) -> None:
"""Cache search/trending results so the user can `!media request <N>`."""
self._search_cache[(evt.room_id, evt.sender)] = list(results)
def _track_request(self, event_id: str, request_id: int) -> None:
self._pending_requests[event_id] = request_id
if len(self._pending_requests) > 200:
oldest = next(iter(self._pending_requests))
del self._pending_requests[oldest]
async def _post_poster(self, room_id: str, image_url: str, caption: str) -> bool:
"""Download a poster URL, upload to Matrix, send as m.image. Returns True on success."""
if not self.config["posters_enabled"]:
return False
try:
async with self.session.get(image_url) as r:
if r.status >= 400:
self.log.info("poster fetch %s%s", image_url, r.status)
return False
data = await r.read()
mime = r.headers.get("Content-Type", "image/jpeg").split(";")[0].strip()
mxc = await self.client.upload_media(data, mime_type=mime)
content = MediaMessageEventContent(
msgtype=MessageType.IMAGE,
body=caption,
url=mxc,
info=ImageInfo(mimetype=mime, size=len(data)),
)
await self.client.send_message(RoomID(room_id), content)
return True
except Exception as ex:
self.log.warning("poster upload failed: %s", ex)
return False
# ---------- top-level command ----------
@command.new("media", aliases=["m"], help="Media stack bot — try `!media help`",
require_subcommand=True)
async def media(self) -> None:
pass
@media.subcommand("help", help="Show available commands")
async def cmd_help(self, evt: MessageEvent) -> None:
await evt.mark_read()
msg = (
"**Media bot commands**\n\n"
"*Search & request*\n"
"- `!media search <query>` — top results (numbered)\n"
"- `!media request <query> [--tv|--movie]` — request top hit\n"
"- `!media request <N>` — pick item N from your last search/trending\n"
+ ("- `!media requests` — your pending/processing requests\n"
"- `!media trending` — what's trending (numbered)\n\n"
if self.seerr_enabled else
"_(Seerr not configured — search/request go directly to Sonarr+Radarr; "
"no approval workflow.)_\n\n")
+
f"*Library ({self._media_label()})*\n"
"- `!media nowplaying` — active sessions\n"
"- `!media recent [movies|tv]` — recently added\n"
"- `!media watched` — what you recently finished\n"
"- `!media find <query>` — search the existing library\n"
"- `!media resume` — your continue-watching list\n"
"- `!media random [movie|tv]` — random unwatched pick\n\n"
"*Sonarr / Radarr*\n"
"- `!media queue` — combined queue\n"
"- `!media upcoming` — Sonarr calendar (next 7 days)\n"
"- `!media missing` — Sonarr wanted/missing\n"
f"- `!media health` — Sonarr/Radarr/Lidarr/Seerr/{self._media_label()}/NZBGet/qBt status\n\n"
"*Lidarr (music)*\n"
"- `!media music <q>` — search MusicBrainz (numbered)\n"
"- `!media music add <q|N>` — add the artist to Lidarr\n\n"
"*Downloads (NZBGet + qBt)*\n"
"- `!media activity` — current downloads\n"
"- `!media completed` — finished in the last 24h\n"
"- `!media speed` — aggregate down/up speed\n"
"- `!media pause` / `!media unpause` — global pause/resume\n\n"
"*Subscriptions*\n"
"- `!media subscribe <show>` — ping me when new episodes import\n"
"- `!media unsubscribe <show>` — stop pings\n"
"- `!media subscriptions` — list yours\n"
"- `!media digest` — fire today's digest now (admin/test)\n\n"
"_Admins can react 👍/👎 on a request to approve/decline._"
)
await self._say(evt, msg)
# ---------- Seerr ----------
@media.subcommand("search", help="Search Seerr for a movie or show")
@command.argument("query", pass_raw=True, required=True)
async def cmd_search(self, evt: MessageEvent, query: str) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
if self.seerr_enabled:
try:
results = await self.seerr.search(query)
except SeerrError as ex:
await self._say(evt, f"Search failed: {ex}")
return
results = [r for r in results if r.get("mediaType") in ("movie", "tv")]
for r in results:
r["_source"] = "seerr"
else:
results = await self._arr_search(query)
if not results:
await self._say(evt, f"No results for **{query}**.")
return
n = self._result_count(evt.sender)
shown = results[:n]
self._stash_search(evt, shown)
lines = [f"**Top {len(shown)} for '{query}':**"]
for i, r in enumerate(shown, 1):
lines.append(f"{i}. " + self._fmt_seerr(r, leading_dash=False))
lines.append("")
lines.append("_Pick one with `!media request <N>`._")
await self._say(evt, "\n".join(lines))
@media.subcommand("request",
help="Request a movie/show. Pass `<query>` or a number from a recent search.")
@command.argument("query", pass_raw=True, required=True)
async def cmd_request(self, evt: MessageEvent, query: str) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
user_cfg = self._resolve_user(evt.sender) or {}
seerr_uid = user_cfg.get("seerr_user_id")
if self.seerr_enabled and not seerr_uid:
await self._say(evt, "Your Matrix user has no `seerr_user_id` mapped in `user_map`.")
return
# Numbered selection: !media request 2 → 2nd item from last search/trending
stripped = query.strip()
if stripped.isdigit():
cached = self._search_cache.get((evt.room_id, evt.sender)) or []
idx = int(stripped) - 1
if not cached:
await self._say(evt, "No recent search results to pick from. Run `!media search <query>` first.")
return
if idx < 0 or idx >= len(cached):
await self._say(evt, f"Pick a number 1-{len(cached)}.")
return
await self._do_request(evt, seerr_uid, cached[idx])
return
# Free-text path: optional --tv/--movie flag, else fall back to user default
forced_type: Optional[str] = None
cleaned: list[str] = []
for w in stripped.split():
if w == "--tv":
forced_type = "tv"
elif w == "--movie":
forced_type = "movie"
else:
cleaned.append(w)
if forced_type is None:
forced_type = user_cfg.get("default_media_type")
q = " ".join(cleaned).strip()
if not q:
await self._say(evt, "Need a search query, e.g. `!media request dune --movie`.")
return
if self.seerr_enabled:
try:
results = await self.seerr.search(q)
except SeerrError as ex:
await self._say(evt, f"Search failed: {ex}")
return
candidates = [r for r in results if r.get("mediaType") in ("movie", "tv")]
for r in candidates:
r["_source"] = "seerr"
else:
candidates = await self._arr_search(q, media_type=forced_type)
if forced_type:
candidates = [r for r in candidates if r.get("mediaType") == forced_type]
if not candidates:
await self._say(evt, f"No matching results for **{q}**.")
return
await self._do_request(evt, seerr_uid, candidates[0])
async def _do_request(self, evt: MessageEvent, seerr_uid: Optional[int], item: dict) -> None:
source = item.get("_source") or ("seerr" if self.seerr_enabled else "arr")
if source == "seerr":
await self._do_request_seerr(evt, seerr_uid, item)
else:
await self._do_request_arr(evt, item)
async def _do_request_seerr(self, evt: MessageEvent, seerr_uid: Optional[int], item: dict) -> None:
media_type = item.get("mediaType")
tmdb_id = item.get("id")
title = item.get("title") or item.get("name") or "?"
if not (media_type and tmdb_id):
await self._say(evt, f"**{title}** is missing a media type or TMDB id — can't request.")
return
try:
req = await self.seerr.request(media_type, tmdb_id, seerr_uid)
except SeerrError as ex:
await self._say(evt, f"Request failed: {ex}")
return
status_id = (req or {}).get("status")
status = {1: "pending approval", 2: "approved", 3: "declined"}.get(status_id, str(status_id))
request_id = (req or {}).get("id")
# Optional poster preview
poster = SeerrClient.poster_url(item)
if poster:
await self._post_poster(evt.room_id, poster, f"{title} ({media_type})")
msg = f"Requested **{title}** ({media_type}) — status: *{status}*."
if status_id == 1 and request_id:
msg += "\n\n_Admins: react 👍 to approve or 👎 to decline._"
sent_id = await evt.respond(f"{evt.sender}\n\n{msg}")
if request_id and status_id == 1 and sent_id:
self._track_request(sent_id, request_id)
async def _do_request_arr(self, evt: MessageEvent, item: dict) -> None:
"""Add a movie/series directly to Radarr/Sonarr (no Seerr in the loop)."""
media_type = item.get("mediaType")
title = item.get("title") or item.get("name") or "?"
raw = item.get("_raw") or {}
if media_type == "movie":
ok, msg = await self._add_radarr(raw)
elif media_type == "tv":
ok, msg = await self._add_sonarr(raw)
else:
await self._say(evt, f"**{title}** is neither movie nor tv — can't add.")
return
# Poster preview if Radarr/Sonarr returned one in the lookup payload
poster = next(
(img.get("remoteUrl") or img.get("url")
for img in (raw.get("images") or [])
if img.get("coverType") == "poster"),
None,
)
if poster:
await self._post_poster(evt.room_id, poster, f"{title} ({media_type})")
prefix = "Added" if ok else "Failed"
await self._say(evt, f"{prefix} **{title}** ({media_type}) — {msg}")
async def _add_sonarr(self, series: dict) -> tuple[bool, str]:
cfg = self.config["sonarr"] or {}
try:
qpid = cfg.get("quality_profile_id")
root = cfg.get("root_folder_path")
if not qpid:
qps = await self.sonarr.quality_profiles()
qpid = qps[0]["id"] if qps else None
if not root:
rfs = await self.sonarr.root_folders()
root = rfs[0]["path"] if rfs else None
lpid = cfg.get("language_profile_id")
if lpid is None:
lps = await self.sonarr.language_profiles()
lpid = lps[0]["id"] if lps else None
except ArrError as ex:
return False, f"Sonarr defaults lookup failed: {ex}"
if not (qpid and root):
return False, "Sonarr is missing a quality profile or root folder."
payload = dict(series)
payload.update({
"qualityProfileId": qpid,
"rootFolderPath": root,
"monitored": True,
"seasonFolder": True,
"addOptions": {
"monitor": cfg.get("monitor", "all"),
"searchForMissingEpisodes": bool(cfg.get("search_on_add", True)),
"searchForCutoffUnmetEpisodes": False,
},
})
if lpid: # Sonarr v3 only
payload["languageProfileId"] = lpid
try:
await self.sonarr.add_series(payload)
except ArrError as ex:
return False, f"Sonarr add failed: {ex}"
searched = "search kicked off" if cfg.get("search_on_add", True) else "no search triggered"
return True, f"Sonarr profile {qpid}, root `{root}`, {searched}."
async def _add_radarr(self, movie: dict) -> tuple[bool, str]:
cfg = self.config["radarr"] or {}
try:
qpid = cfg.get("quality_profile_id")
root = cfg.get("root_folder_path")
if not qpid:
qps = await self.radarr.quality_profiles()
qpid = qps[0]["id"] if qps else None
if not root:
rfs = await self.radarr.root_folders()
root = rfs[0]["path"] if rfs else None
except ArrError as ex:
return False, f"Radarr defaults lookup failed: {ex}"
if not (qpid and root):
return False, "Radarr is missing a quality profile or root folder."
payload = dict(movie)
payload.update({
"qualityProfileId": qpid,
"rootFolderPath": root,
"monitored": True,
"minimumAvailability": cfg.get("minimum_availability", "released"),
"addOptions": {
"monitor": cfg.get("monitor", "movieOnly"),
"searchForMovie": bool(cfg.get("search_on_add", True)),
},
})
try:
await self.radarr.add_movie(payload)
except ArrError as ex:
return False, f"Radarr add failed: {ex}"
searched = "search kicked off" if cfg.get("search_on_add", True) else "no search triggered"
return True, f"Radarr profile {qpid}, root `{root}`, {searched}."
async def _arr_search(self, query: str, media_type: Optional[str] = None) -> list[dict]:
"""Direct Sonarr+Radarr lookup, normalized to the Seerr search-item shape
so the rest of the bot (formatting, caching, request flow) doesn't care
which source produced the result."""
tasks = []
if media_type != "movie":
tasks.append(("tv", self.sonarr.lookup(query)))
if media_type != "tv":
tasks.append(("movie", self.radarr.lookup(query)))
results: list[dict] = []
gathered = await asyncio.gather(
*(t[1] for t in tasks), return_exceptions=True,
)
for (mt, _), data in zip(tasks, gathered):
if isinstance(data, Exception):
self.log.warning("arr lookup failed for %s: %s", mt, data)
continue
for item in data:
results.append(self._normalize_arr_item(item, mt))
return results
@staticmethod
def _normalize_arr_item(item: dict, media_type: str) -> dict:
"""Reshape a Sonarr/Radarr lookup result so it slots into the same code
paths as a Seerr search result."""
year = item.get("year")
date_field = "releaseDate" if media_type == "movie" else "firstAirDate"
return {
"title": item.get("title") or item.get("name") or "?",
"mediaType": media_type,
date_field: f"{year}-01-01" if year else "",
"id": item.get("tmdbId") if media_type == "movie" else item.get("tvdbId"),
"tmdbId": item.get("tmdbId"),
"tvdbId": item.get("tvdbId"),
"_source": "radarr" if media_type == "movie" else "sonarr",
"_raw": item,
}
@media.subcommand("requests", help="Show your pending/processing Seerr requests")
async def cmd_requests(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
if not self.seerr_enabled:
await self._say(evt, "Seerr isn't configured — `!media requests` is unavailable.")
return
user_cfg = self._resolve_user(evt.sender)
seerr_uid = (user_cfg or {}).get("seerr_user_id")
if not seerr_uid:
await self._say(evt, "Your Matrix user has no `seerr_user_id` mapped in `user_map`.")
return
try:
reqs = await self.seerr.user_requests(seerr_uid, take=10)
except SeerrError as ex:
await self._say(evt, f"Lookup failed: {ex}")
return
if not reqs:
await self._say(evt, "No pending or processing requests.")
return
lines = [f"**Your requests ({len(reqs)}):**"]
for r in reqs:
mi = r.get("media") or {}
t = mi.get("title") or mi.get("name") or f"tmdb:{mi.get('tmdbId')}"
mt = mi.get("mediaType") or "?"
avail = SEERR_AVAIL.get(mi.get("status", 0), "?")
lines.append(f"- *{t}* ({mt}) — {avail}")
await self._say(evt, "\n".join(lines))
@media.subcommand("trending", help="Show what's trending in Seerr")
async def cmd_trending(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
if not self.seerr_enabled:
await self._say(evt, "Seerr isn't configured — `!media trending` is unavailable.")
return
try:
results = await self.seerr.trending()
except SeerrError as ex:
await self._say(evt, f"Trending fetch failed: {ex}")
return
n = self._result_count(evt.sender)
results = [r for r in results if r.get("mediaType") in ("movie", "tv")][:n]
if not results:
await self._say(evt, "Nothing trending right now.")
return
for r in results:
r["_source"] = "seerr"
self._stash_search(evt, results)
lines = [f"**Trending ({len(results)}):**"]
for i, r in enumerate(results, 1):
lines.append(f"{i}. " + self._fmt_seerr(r, leading_dash=False))
lines.append("")
lines.append("_Pick one with `!media request <N>`._")
await self._say(evt, "\n".join(lines))
def _fmt_seerr(self, r: dict, leading_dash: bool = True) -> str:
title = r.get("title") or r.get("name") or "?"
mt = r.get("mediaType") or "?"
date_s = r.get("releaseDate") or r.get("firstAirDate") or ""
year = date_s[:4] if date_s else ""
info = (r.get("mediaInfo") or {}).get("status")
avail = SEERR_AVAIL.get(info, None) if info else None
bits = [f"*{title}*"]
if year:
bits.append(f"({year})")
bits.append(f"{mt}")
if avail:
bits.append(f"[{avail}]")
prefix = "- " if leading_dash else ""
return prefix + " ".join(bits)
# ---------- Emby ----------
@media.subcommand("nowplaying", aliases=["np"], help="Active Emby sessions")
async def cmd_nowplaying(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
try:
sessions = await self.emby.sessions()
except EmbyError as ex:
await self._say(evt, f"{self._media_label()} fetch failed: {ex}")
return
active = [s for s in sessions if s.get("NowPlayingItem")]
if not active:
await self._say(evt, f"Nothing playing on {self._media_label()} right now.")
return
lines = [f"**Now playing on {self._media_label()} ({len(active)}):**"]
for s in active:
item = s.get("NowPlayingItem") or {}
user = s.get("UserName") or "?"
client = s.get("Client") or ""
t = self._fmt_emby_item(item)
ps = s.get("PlayState") or {}
pos = ps.get("PositionTicks") or 0
run = item.get("RunTimeTicks") or 0
pct = (pos / run * 100) if run else 0
paused = " (paused)" if ps.get("IsPaused") else ""
lines.append(f"- **{user}** — {t} · {pct:.0f}%{paused} [{client}]")
await self._say(evt, "\n".join(lines))
@media.subcommand("recent", help="Recently added — `!media recent [movies|tv]`")
@command.argument("kind", required=False)
async def cmd_recent(self, evt: MessageEvent, kind: str = "") -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
# Use the sender's emby_user_id if set, else fall back to the first mapped user's id
user_cfg = self._resolve_user(evt.sender) or {}
emby_uid = user_cfg.get("emby_user_id")
if not emby_uid or emby_uid == "TBD":
for v in (self.config["user_map"] or {}).values():
if v and v.get("emby_user_id") and v["emby_user_id"] != "TBD":
emby_uid = v["emby_user_id"]
break
if not emby_uid:
await self._say(evt, "No usable Emby user_id in config.")
return
item_types = None
if kind.lower() in ("movie", "movies"):
item_types = "Movie"
elif kind.lower() in ("tv", "series", "shows"):
item_types = "Series,Episode"
try:
items = await self.emby.recently_added(emby_uid, limit=10, item_types=item_types)
except EmbyError as ex:
await self._say(evt, f"{self._media_label()} fetch failed: {ex}")
return
if not items:
await self._say(evt, "Nothing recently added.")
return
label = f" ({kind})" if kind else ""
lines = [f"**Recently added{label} ({len(items)}):**"]
for it in items:
lines.append("- " + self._fmt_emby_item(it))
await self._say(evt, "\n".join(lines))
@media.subcommand("watched", help="Your recently watched items on Emby")
async def cmd_watched(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
user_cfg = self._resolve_user(evt.sender) or {}
emby_uid = user_cfg.get("emby_user_id")
if not emby_uid or emby_uid == "TBD":
await self._say(evt, "Your Matrix user has no `emby_user_id` mapped in `user_map`.")
return
try:
items = await self.emby.user_played(emby_uid, limit=10)
except EmbyError as ex:
await self._say(evt, f"{self._media_label()} fetch failed: {ex}")
return
if not items:
await self._say(evt, "No recently watched items.")
return
lines = [f"**Recently watched ({len(items)}):**"]
for it in items:
lines.append("- " + self._fmt_emby_item(it))
await self._say(evt, "\n".join(lines))
@media.subcommand("find", help="Search the existing Emby library")
@command.argument("query", pass_raw=True, required=True)
async def cmd_find(self, evt: MessageEvent, query: str) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
emby_uid = self._any_emby_uid(evt.sender)
if not emby_uid:
await self._say(evt, "No usable Emby user_id in config.")
return
try:
items = await self.emby.find(emby_uid, query, limit=10)
except EmbyError as ex:
await self._say(evt, f"{self._media_label()} fetch failed: {ex}")
return
if not items:
await self._say(evt, f"Nothing in the library matching **{query}**.")
return
lines = [f"**Found in library ({len(items)}):**"]
for it in items:
lines.append("- " + self._fmt_emby_item(it))
await self._say(evt, "\n".join(lines))
@media.subcommand("resume", help="Your continue-watching list")
async def cmd_resume(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
user_cfg = self._resolve_user(evt.sender) or {}
emby_uid = user_cfg.get("emby_user_id")
if not emby_uid or emby_uid == "TBD":
await self._say(evt, "Your Matrix user has no `emby_user_id` mapped in `user_map`.")
return
try:
items = await self.emby.resume(emby_uid, limit=10)
except EmbyError as ex:
await self._say(evt, f"{self._media_label()} fetch failed: {ex}")
return
if not items:
await self._say(evt, "Nothing to resume — you've finished everything you started.")
return
lines = [f"**Continue watching ({len(items)}):**"]
for it in items:
pct = ""
ud = it.get("UserData") or {}
played_pct = ud.get("PlayedPercentage")
if played_pct:
pct = f" · {played_pct:.0f}%"
lines.append("- " + self._fmt_emby_item(it) + pct)
await self._say(evt, "\n".join(lines))
@media.subcommand("random", help="Random unwatched pick — `!media random [movie|tv]`")
@command.argument("kind", required=False)
async def cmd_random(self, evt: MessageEvent, kind: str = "") -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
user_cfg = self._resolve_user(evt.sender) or {}
emby_uid = user_cfg.get("emby_user_id")
if not emby_uid or emby_uid == "TBD":
emby_uid = self._any_emby_uid(evt.sender)
if not emby_uid:
await self._say(evt, "No usable Emby user_id in config.")
return
item_type = "Movie"
if kind.lower() in ("tv", "series", "show", "shows"):
item_type = "Series"
try:
item = await self.emby.random_unplayed(emby_uid, item_type=item_type)
except EmbyError as ex:
await self._say(evt, f"{self._media_label()} fetch failed: {ex}")
return
if not item:
await self._say(evt, f"No unwatched {item_type.lower()}s found — go you.")
return
line = self._fmt_emby_item(item)
overview = (item.get("Overview") or "").strip()
if len(overview) > 240:
overview = overview[:240].rsplit(" ", 1)[0] + ""
poster = self.emby.poster_url(item)
if poster:
await self._post_poster(evt.room_id, poster, item.get("Name") or "poster")
msg = f"**Random pick:** {line}"
if overview:
msg += f"\n\n{overview}"
await self._say(evt, msg)
def _any_emby_uid(self, sender: str) -> Optional[str]:
"""Sender's emby_user_id, falling back to any other mapped user's id."""
cfg = self._resolve_user(sender) or {}
uid = cfg.get("emby_user_id")
if uid and uid != "TBD":
return uid
for v in (self.config["user_map"] or {}).values():
if v and v.get("emby_user_id") and v["emby_user_id"] != "TBD":
return v["emby_user_id"]
return None
def _fmt_emby_item(self, it: dict) -> str:
t = it.get("Type")
name = it.get("Name") or "?"
year = it.get("ProductionYear")
if t == "Episode":
series = it.get("SeriesName") or "?"
sn = it.get("ParentIndexNumber")
ep = it.get("IndexNumber")
tag = f"S{sn:02d}E{ep:02d}" if sn and ep else ""
return f"*{series}* {tag}{name}".strip()
if year:
return f"*{name}* ({year})"
return f"*{name}*"
# ---------- Sonarr / Radarr ----------
@media.subcommand("queue", help="Combined Sonarr + Radarr queue")
async def cmd_queue(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
try:
sonarr_q, radarr_q = await asyncio.gather(
self.sonarr.queue(), self.radarr.queue(),
return_exceptions=True,
)
except Exception as ex:
await self._say(evt, f"Queue fetch failed: {ex}")
return
lines: list[str] = []
if isinstance(sonarr_q, Exception):
lines.append(f"**Sonarr:** unreachable ({sonarr_q})")
else:
lines.append(f"**Sonarr queue ({len(sonarr_q)}):**")
for q in sonarr_q[:10]:
series = (q.get("series") or {}).get("title") or "?"
ep = q.get("episode") or {}
tag = ""
if ep.get("seasonNumber") is not None and ep.get("episodeNumber") is not None:
tag = f" S{ep['seasonNumber']:02d}E{ep['episodeNumber']:02d}"
status = q.get("status") or "?"
pct = self._arr_pct(q)
lines.append(f"- *{series}*{tag}{status} {pct}")
lines.append("")
if isinstance(radarr_q, Exception):
lines.append(f"**Radarr:** unreachable ({radarr_q})")
else:
lines.append(f"**Radarr queue ({len(radarr_q)}):**")
for q in radarr_q[:10]:
title = (q.get("movie") or {}).get("title") or q.get("title") or "?"
status = q.get("status") or "?"
pct = self._arr_pct(q)
lines.append(f"- *{title}* — {status} {pct}")
await self._say(evt, "\n".join(lines))
@media.subcommand("upcoming", help="Sonarr calendar — episodes airing in next 7 days")
async def cmd_upcoming(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
today = date.today()
end = today + timedelta(days=7)
try:
cal = await self.sonarr.calendar(today.isoformat(), end.isoformat())
except ArrError as ex:
await self._say(evt, f"Calendar fetch failed: {ex}")
return
if not cal:
await self._say(evt, "Nothing airing in the next 7 days.")
return
lines = [f"**Upcoming ({len(cal)}):**"]
for ep in cal[:15]:
series = (ep.get("series") or {}).get("title") or "?"
sn = ep.get("seasonNumber")
en = ep.get("episodeNumber")
tag = f"S{sn:02d}E{en:02d}" if sn is not None and en is not None else ""
t = ep.get("title") or ""
air = (ep.get("airDateUtc") or ep.get("airDate") or "")[:10]
lines.append(f"- {air}: *{series}* {tag}{t}")
await self._say(evt, "\n".join(lines))
@media.subcommand("missing", help="Sonarr wanted/missing list")
async def cmd_missing(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
try:
items = await self.sonarr.missing(page_size=10)
except ArrError as ex:
await self._say(evt, f"Missing fetch failed: {ex}")
return
if not items:
await self._say(evt, "No missing episodes — Sonarr is happy.")
return
lines = [f"**Wanted/Missing ({len(items)}):**"]
for ep in items:
series = (ep.get("series") or {}).get("title") or "?"
sn = ep.get("seasonNumber")
en = ep.get("episodeNumber")
tag = f"S{sn:02d}E{en:02d}" if sn is not None and en is not None else ""
t = ep.get("title") or ""
air = (ep.get("airDateUtc") or ep.get("airDate") or "")[:10]
lines.append(f"- *{series}* {tag}{t} (aired {air})")
await self._say(evt, "\n".join(lines))
@media.subcommand("health", help="Status of all backing services")
async def cmd_health(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
(sonarr_h, radarr_h, lidarr_h,
seerr_s, emby_s, nzb_s, qbt_s) = await asyncio.gather(
self.sonarr.health(),
self.radarr.health(),
self.lidarr.health(),
self.seerr.status(),
self.emby.system_info(),
self.nzbget.status(),
self.qbt.transfer_info(),
return_exceptions=True,
)
blocks: list[list[str]] = []
def arr_block(label: str, data) -> list[str]:
if isinstance(data, Exception):
return [f"**{label}:** ❌ unreachable ({data})"]
if not data:
return [f"**{label}:** ✅ healthy"]
out = [f"**{label}:** ⚠️ {len(data)} warning(s)"]
for h in data:
src = h.get("source") or "?"
msg = h.get("message") or "?"
out.append(f" - [{src}] {msg}")
return out
blocks.append(arr_block("Sonarr", sonarr_h))
blocks.append(arr_block("Radarr", radarr_h))
blocks.append(arr_block("Lidarr", lidarr_h))
if isinstance(seerr_s, Exception):
blocks.append([f"**Seerr:** ❌ unreachable ({seerr_s})"])
else:
ver = seerr_s.get("version") or "?"
update = seerr_s.get("commitsBehind")
tag = f" (update available, {update} behind)" if update else ""
blocks.append([f"**Seerr:** ✅ v{ver}{tag}"])
media_label = self._media_label()
if isinstance(emby_s, Exception):
blocks.append([f"**{media_label}:** ❌ unreachable ({emby_s})"])
else:
ver = emby_s.get("Version") or "?"
name = emby_s.get("ServerName") or media_label
blocks.append([f"**{media_label}:** ✅ {name} v{ver}"])
if isinstance(nzb_s, Exception):
blocks.append([f"**NZBGet:** ❌ unreachable ({nzb_s})"])
else:
paused = nzb_s.get("DownloadPaused") or nzb_s.get("ServerStandBy")
ver = nzb_s.get("Version") or "?"
blocks.append([
f"**NZBGet:** ⏸️ paused (v{ver})" if paused else f"**NZBGet:** ✅ v{ver}"
])
if isinstance(qbt_s, Exception):
blocks.append([f"**qBittorrent:** ❌ unreachable ({qbt_s})"])
else:
conn = qbt_s.get("connection_status") or "?"
# 'connected' = healthy; 'firewalled' = reachable but inbound blocked;
# 'disconnected' = no peers reachable (often VPN issue).
icon = {"connected": "", "firewalled": "⚠️"}.get(conn, "")
blocks.append([f"**qBittorrent:** {icon} {conn}"])
await self._say(evt, "\n\n".join("\n".join(b) for b in blocks))
def _arr_pct(self, q: dict) -> str:
size = q.get("size") or 0
left = q.get("sizeleft") or 0
if size <= 0:
return ""
pct = (1 - left / size) * 100
return f"{pct:.0f}%"
# ---------- Downloads ----------
@media.subcommand("activity", aliases=["dl"], help="NZBGet + qBittorrent active downloads")
async def cmd_activity(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
nzb_res, qbt_res = await asyncio.gather(
self.nzbget.listgroups(),
self.qbt.downloading(),
return_exceptions=True,
)
lines: list[str] = []
# NZBGet
if isinstance(nzb_res, Exception):
lines.append(f"**NZBGet:** unreachable ({nzb_res})")
else:
active = [g for g in nzb_res if (g.get("RemainingSizeMB") or 0) > 0]
lines.append(f"**NZBGet ({len(active)} active):**" if active else "**NZBGet:** idle")
for g in active[:8]:
name = g.get("NZBNicename") or g.get("NZBName") or "?"
rate_mbps = (g.get("DownloadRate") or 0) / 1024 / 1024 # NZBGet reports B/s
size_mb = g.get("FileSizeMB") or 0
left_mb = g.get("RemainingSizeMB") or 0
pct = (1 - left_mb / size_mb) * 100 if size_mb else 0
lines.append(f"- *{name}* — {pct:.0f}% · {rate_mbps:.1f} MB/s")
lines.append("")
# qBittorrent
if isinstance(qbt_res, Exception):
lines.append(f"**qBittorrent:** unreachable ({qbt_res})")
else:
lines.append(f"**qBittorrent ({len(qbt_res)} active):**" if qbt_res else "**qBittorrent:** idle")
for t in qbt_res[:8]:
name = t.get("name") or "?"
pct = (t.get("progress") or 0) * 100
speed = _human_bytes(t.get("dlspeed") or 0) + "/s"
eta = _human_eta(t.get("eta"))
lines.append(f"- *{name}* — {pct:.0f}% · {speed} · ETA {eta}")
await self._say(evt, "\n".join(lines))
@media.subcommand("speed", help="Aggregate down/up speeds across NZBGet + qBt")
async def cmd_speed(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
nzb_status, qbt_xfer = await asyncio.gather(
self.nzbget.status(), self.qbt.transfer_info(),
return_exceptions=True,
)
lines: list[str] = []
if isinstance(nzb_status, Exception):
lines.append(f"**NZBGet:** unreachable ({nzb_status})")
else:
dl = nzb_status.get("DownloadRate") or 0 # bytes/s
paused = nzb_status.get("DownloadPaused") or nzb_status.get("ServerPaused")
tag = " (paused)" if paused else ""
lines.append(f"**NZBGet:** ↓ {_human_bytes(dl)}/s{tag}")
if isinstance(qbt_xfer, Exception):
lines.append(f"**qBittorrent:** unreachable ({qbt_xfer})")
else:
dl = qbt_xfer.get("dl_info_speed") or 0
ul = qbt_xfer.get("up_info_speed") or 0
state = qbt_xfer.get("connection_status") or "?"
lines.append(f"**qBittorrent:** ↓ {_human_bytes(dl)}/s · ↑ {_human_bytes(ul)}/s [{state}]")
await self._say(evt, "\n".join(lines))
@media.subcommand("completed", help="Downloads finished in the last 24h")
async def cmd_completed(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
cutoff = datetime.now(timezone.utc).timestamp() - 86400
nzb_hist, qbt_all = await asyncio.gather(
self.nzbget.history(), self.qbt.all_torrents(),
return_exceptions=True,
)
lines: list[str] = []
# NZBGet history items have HistoryTime (epoch) and FileSizeMB
if isinstance(nzb_hist, Exception):
lines.append(f"**NZBGet:** unreachable ({nzb_hist})")
else:
recent = [h for h in nzb_hist if (h.get("HistoryTime") or 0) >= cutoff]
lines.append(f"**NZBGet — completed last 24h ({len(recent)}):**")
for h in recent[:10]:
name = h.get("Name") or h.get("NZBName") or "?"
size_mb = h.get("FileSizeMB") or 0
status = h.get("Status") or "?"
lines.append(f"- *{name}* — {size_mb / 1024:.1f} GB [{status}]")
lines.append("")
# qBt: completion_on (epoch); finished torrents have it set
if isinstance(qbt_all, Exception):
lines.append(f"**qBittorrent:** unreachable ({qbt_all})")
else:
recent = [t for t in qbt_all
if (t.get("completion_on") or 0) >= cutoff
and (t.get("completion_on") or 0) > 0]
lines.append(f"**qBittorrent — completed last 24h ({len(recent)}):**")
for t in recent[:10]:
name = t.get("name") or "?"
size = t.get("size") or 0
lines.append(f"- *{name}* — {_human_bytes(size)}")
await self._say(evt, "\n".join(lines))
@media.subcommand("pause", help="Pause all downloads (NZBGet + qBittorrent)")
async def cmd_pause(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
results = await asyncio.gather(
self.nzbget.pause(), self.qbt.pause_all(),
return_exceptions=True,
)
nzb_ok = not isinstance(results[0], Exception)
qbt_ok = not isinstance(results[1], Exception)
msg = (
f"NZBGet: {'paused ⏸' if nzb_ok else f'failed ({results[0]})'}\n"
f"qBittorrent: {'paused ⏸' if qbt_ok else f'failed ({results[1]})'}"
)
await self._say(evt, msg)
@media.subcommand("unpause", help="Resume all downloads (NZBGet + qBittorrent)")
async def cmd_unpause(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
results = await asyncio.gather(
self.nzbget.unpause(), self.qbt.resume_all(),
return_exceptions=True,
)
nzb_ok = not isinstance(results[0], Exception)
qbt_ok = not isinstance(results[1], Exception)
msg = (
f"NZBGet: {'resumed ▶' if nzb_ok else f'failed ({results[0]})'}\n"
f"qBittorrent: {'resumed ▶' if qbt_ok else f'failed ({results[1]})'}"
)
await self._say(evt, msg)
# ---------- Reactions: admin approve/decline ----------
@event.on(EventType.REACTION)
async def on_reaction(self, evt: ReactionEvent) -> None:
relates = getattr(evt.content, "relates_to", None)
if not relates or not relates.event_id or not relates.key:
return
request_id = self._pending_requests.get(relates.event_id)
if not request_id:
return # not one of our request messages
admins = self.config["admin_users"] or []
if evt.sender not in admins:
self.log.info("Reaction from non-admin %s ignored", evt.sender)
return
key = relates.key
if _matches(key, APPROVE_KEYS):
label, fn = "approved ✅", self.seerr.approve
elif _matches(key, DECLINE_KEYS):
label, fn = "declined ❌", self.seerr.decline
else:
return # unrelated reaction
try:
await fn(request_id)
except SeerrError as ex:
await self.client.send_text(evt.room_id, text=f"Request {request_id} action failed: {ex}")
return
self._pending_requests.pop(relates.event_id, None)
await self.client.send_text(
evt.room_id, text=f"Request {request_id} {label} (by {evt.sender})"
)
# ---------- Webhook: Seerr → Matrix notifications ----------
@web.post("/seerr-webhook")
async def seerr_webhook(self, req: Request) -> Response:
secret = self.config["seerr_webhook_secret"] or ""
if secret:
# Case-insensitive scheme per RFC 7235 — Seerr's UI lowercases "Bearer"
auth = (req.headers.get("Authorization") or "").strip()
scheme, _, token = auth.partition(" ")
if scheme.lower() != "bearer" or token != secret:
self.log.warning("Seerr webhook bad auth from %s", req.remote)
return Response(status=401, text="unauthorized")
try:
payload = await req.json()
except Exception:
return Response(status=400, text="invalid json")
room = (self.config["notifications_room"] or "").strip()
if not room:
return json_response({"ok": False, "error": "notifications_room not configured"})
text, html = self._format_seerr_event(payload)
try:
content = TextMessageEventContent(
msgtype=MessageType.NOTICE,
body=text,
format=Format.HTML,
formatted_body=html,
)
await self.client.send_message(RoomID(room), content)
except Exception as ex:
self.log.exception("Failed to post Seerr webhook to %s", room)
return json_response({"ok": False, "error": str(ex)}, status=500)
poster = payload.get("image")
if poster and self.config["posters_enabled"]:
await self._post_poster(room, poster, payload.get("subject") or "poster")
return json_response({"ok": True})
def _format_seerr_event(self, p: dict) -> tuple[str, str]:
nt = (p.get("notification_type") or "").upper()
subject = p.get("subject") or "?"
message = p.get("message") or ""
media = p.get("media") or {}
request = p.get("request") or {}
media_type = media.get("media_type") or "?"
requester = request.get("requestedBy_username") or ""
emoji = {
"MEDIA_PENDING": "📥",
"MEDIA_APPROVED": "",
"MEDIA_AUTO_APPROVED": "",
"MEDIA_AVAILABLE": "🎬",
"MEDIA_DECLINED": "",
"MEDIA_FAILED": "⚠️",
"ISSUE_CREATED": "🐛",
"ISSUE_COMMENT": "💬",
"ISSUE_RESOLVED": "🛠",
"TEST_NOTIFICATION": "🔧",
}.get(nt, "📣")
label = nt.replace("_", " ").title() or "Notification"
text_parts = [f"{emoji} {label}{subject}"]
if media_type and media_type != "?":
text_parts.append(f"({media_type})")
if requester:
text_parts.append(f"— requested by {requester}")
text = " ".join(text_parts)
if message:
text += f"\n\n{message}"
html = (
f"{emoji} <strong>{label}</strong> — <em>{subject}</em>"
+ (f" ({media_type})" if media_type != '?' else "")
+ (f" — requested by <strong>{requester}</strong>" if requester else "")
+ (f"<br/><br/>{message}" if message else "")
)
return text, html
# ---------- Lidarr music ----------
# Cache of last music lookup per (room, sender) so `!media music add <N>` works.
_music_cache: dict[tuple[str, str], list[dict]] = {}
@media.subcommand("music",
help="Search MusicBrainz via Lidarr. `add <q>` or `add <N>` to add.")
@command.argument("query", pass_raw=True, required=True)
async def cmd_music(self, evt: MessageEvent, query: str) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
parts = query.strip().split(None, 1)
if parts and parts[0].lower() == "add":
arg = parts[1] if len(parts) > 1 else ""
await self._music_add(evt, arg)
return
await self._music_search(evt, query.strip())
async def _music_search(self, evt: MessageEvent, term: str) -> None:
if not term:
await self._say(evt, "Need a search query, e.g. `!media music radiohead`.")
return
try:
results = await self.lidarr.lookup(term)
except ArrError as ex:
await self._say(evt, f"Lidarr lookup failed: {ex}")
return
if not results:
await self._say(evt, f"No artists found for **{term}**.")
return
n = self._result_count(evt.sender)
shown = results[:n]
self._music_cache[(evt.room_id, evt.sender)] = list(shown)
if len(self._music_cache) > 200:
del self._music_cache[next(iter(self._music_cache))]
lines = [f"**Top {len(shown)} for '{term}':**"]
for i, a in enumerate(shown, 1):
name = a.get("artistName") or "?"
disambig = a.get("disambiguation") or ""
albums = a.get("statistics", {}).get("albumCount") or len(a.get("remoteAlbums") or [])
extra = []
if disambig:
extra.append(disambig)
if albums:
extra.append(f"{albums} albums")
tail = f" ({', '.join(extra)})" if extra else ""
lines.append(f"{i}. *{name}*{tail}")
lines.append("")
lines.append("_Add with `!media music add <N>`._")
await self._say(evt, "\n".join(lines))
async def _music_add(self, evt: MessageEvent, arg: str) -> None:
arg = arg.strip()
if not arg:
await self._say(evt, "`!media music add <query>` or `!media music add <N>`.")
return
# Numbered selection
if arg.isdigit():
cached = self._music_cache.get((evt.room_id, evt.sender)) or []
idx = int(arg) - 1
if not cached:
await self._say(evt, "No recent music search to pick from. Run `!media music <query>` first.")
return
if idx < 0 or idx >= len(cached):
await self._say(evt, f"Pick a number 1-{len(cached)}.")
return
artist = cached[idx]
else:
try:
results = await self.lidarr.lookup(arg)
except ArrError as ex:
await self._say(evt, f"Lidarr lookup failed: {ex}")
return
if not results:
await self._say(evt, f"No artists found for **{arg}**.")
return
artist = results[0]
# Resolve profile/folder defaults if not set in config
cfg = self.config["lidarr"] or {}
try:
qpid = cfg.get("quality_profile_id")
mpid = cfg.get("metadata_profile_id")
root = cfg.get("root_folder_path")
if not qpid:
qps = await self.lidarr.quality_profiles()
qpid = qps[0]["id"] if qps else None
if not mpid:
mps = await self.lidarr.metadata_profiles()
mpid = mps[0]["id"] if mps else None
if not root:
rfs = await self.lidarr.root_folders()
root = rfs[0]["path"] if rfs else None
except ArrError as ex:
await self._say(evt, f"Lidarr defaults lookup failed: {ex}")
return
if not (qpid and mpid and root):
await self._say(evt, "Lidarr is missing a quality/metadata profile or root folder — set up in Lidarr first.")
return
payload = {
"artistName": artist.get("artistName"),
"foreignArtistId": artist.get("foreignArtistId"),
"qualityProfileId": qpid,
"metadataProfileId": mpid,
"rootFolderPath": root,
"monitored": True,
"monitorNewItems": "all",
"addOptions": {
"monitor": cfg.get("monitor", "all"),
"searchForMissingAlbums": bool(cfg.get("search_on_add", True)),
},
}
# Lidarr's add endpoint also expects images + a few other passthroughs from lookup
for k in ("images", "links", "genres", "tags", "ratings", "overview",
"disambiguation", "artistType", "status", "remotePoster"):
if k in artist:
payload[k] = artist[k]
try:
added = await self.lidarr.add_artist(payload)
except ArrError as ex:
await self._say(evt, f"Add failed: {ex}")
return
name = added.get("artistName") or payload["artistName"]
await self._say(
evt,
f"Added **{name}** to Lidarr. Quality: profile {qpid}, root: `{root}`. "
f"{'Search kicked off.' if cfg.get('search_on_add', True) else 'No search triggered.'}"
)
# ---------- Subscriptions ----------
@media.subcommand("subscribe", aliases=["sub"],
help="Subscribe to a Sonarr series — pings you when new episodes land")
@command.argument("query", pass_raw=True, required=True)
async def cmd_subscribe(self, evt: MessageEvent, query: str) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
try:
series = await self.sonarr.list_series()
except ArrError as ex:
await self._say(evt, f"Sonarr lookup failed: {ex}")
return
q = query.lower().strip()
matches = [s for s in series if q in (s.get("title") or "").lower()]
if not matches:
await self._say(evt, f"No Sonarr series matches **{query}**. Add it via Sonarr first.")
return
if len(matches) > 1:
titles = ", ".join(f"*{s['title']}*" for s in matches[:5])
await self._say(evt, f"Multiple matches: {titles}. Be more specific.")
return
s = matches[0]
try:
await self.database.execute(
"INSERT OR REPLACE INTO subscriptions (mxid, sonarr_series_id, title) "
"VALUES ($1, $2, $3)",
evt.sender, s["id"], s["title"],
)
except Exception as ex:
await self._say(evt, f"Subscribe failed: {ex}")
return
await self._say(evt, f"Subscribed to **{s['title']}**. You'll be pinged on new episode imports.")
@media.subcommand("unsubscribe", aliases=["unsub"], help="Unsubscribe from a series")
@command.argument("query", pass_raw=True, required=True)
async def cmd_unsubscribe(self, evt: MessageEvent, query: str) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
rows = await self.database.fetch(
"SELECT sonarr_series_id, title FROM subscriptions WHERE mxid = $1",
evt.sender,
)
q = query.lower().strip()
matches = [r for r in rows if q in (r["title"] or "").lower()]
if not matches:
await self._say(evt, f"You aren't subscribed to anything matching **{query}**.")
return
if len(matches) > 1:
titles = ", ".join(f"*{r['title']}*" for r in matches[:5])
await self._say(evt, f"Multiple matches: {titles}. Be more specific.")
return
m = matches[0]
await self.database.execute(
"DELETE FROM subscriptions WHERE mxid = $1 AND sonarr_series_id = $2",
evt.sender, m["sonarr_series_id"],
)
await self._say(evt, f"Unsubscribed from **{m['title']}**.")
@media.subcommand("subscriptions", aliases=["subs"], help="List your subscriptions")
async def cmd_subscriptions(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
rows = await self.database.fetch(
"SELECT title, added_at FROM subscriptions WHERE mxid = $1 ORDER BY title",
evt.sender,
)
if not rows:
await self._say(evt, "No subscriptions yet — `!media subscribe <show>` to add one.")
return
lines = [f"**Your subscriptions ({len(rows)}):**"]
for r in rows:
lines.append(f"- *{r['title']}*")
await self._say(evt, "\n".join(lines))
# ---------- Sonarr webhook ----------
@web.post("/sonarr-webhook")
async def sonarr_webhook(self, req: Request) -> Response:
secret = self.config["sonarr_webhook_secret"] or ""
if secret:
auth = (req.headers.get("Authorization") or "").strip()
scheme, _, token = auth.partition(" ")
if scheme.lower() != "bearer" or token != secret:
self.log.warning("Sonarr webhook bad auth from %s", req.remote)
return Response(status=401, text="unauthorized")
try:
payload = await req.json()
except Exception:
return Response(status=400, text="invalid json")
event_type = (payload.get("eventType") or "").lower()
# Fire on Download (import) and Upgrade events; ignore Grab + Test
if event_type == "test":
return json_response({"ok": True, "test": True})
if event_type not in ("download",):
return json_response({"ok": True, "ignored": event_type})
series = payload.get("series") or {}
series_id = series.get("id")
title = series.get("title") or "?"
eps = payload.get("episodes") or []
if not series_id:
return json_response({"ok": False, "error": "no series.id"})
subs = await self.database.fetch(
"SELECT mxid FROM subscriptions WHERE sonarr_series_id = $1",
series_id,
)
if not subs:
return json_response({"ok": True, "subscribers": 0})
room = (self.config["notifications_room"] or "").strip()
if not room:
return json_response({"ok": False, "error": "notifications_room not configured"})
ep_tags = []
for ep in eps:
sn = ep.get("seasonNumber")
en = ep.get("episodeNumber")
if sn is not None and en is not None:
ep_tags.append(f"S{sn:02d}E{en:02d}")
ep_str = " ".join(ep_tags) if ep_tags else ""
upgrade = " (upgrade)" if payload.get("isUpgrade") else ""
mention_html = ", ".join(
f'<a href="https://matrix.to/#/{r["mxid"]}">{r["mxid"]}</a>' for r in subs
)
mention_plain = ", ".join(r["mxid"] for r in subs)
text = f"📺 New: {title} {ep_str}{upgrade}{mention_plain}"
html = f"📺 New: <strong>{title}</strong> {ep_str}{upgrade}{mention_html}"
try:
content = TextMessageEventContent(
msgtype=MessageType.NOTICE,
body=text,
format=Format.HTML,
formatted_body=html,
)
await self.client.send_message(RoomID(room), content)
except Exception as ex:
self.log.exception("Sonarr webhook send failed")
return json_response({"ok": False, "error": str(ex)}, status=500)
return json_response({"ok": True, "subscribers": len(subs)})
# ---------- Daily digest ----------
@media.subcommand("digest", help="Manually fire today's digest")
async def cmd_digest(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
room = (self.config["notifications_room"] or "").strip()
if not room:
await self._say(evt, "Set `notifications_room` in config first.")
return
try:
await self._send_digest(room)
await self._say(evt, "Digest sent ✅")
except Exception as ex:
self.log.exception("digest failed")
await self._say(evt, f"Digest failed: {ex}")
async def _digest_loop(self) -> None:
"""Sleep until digest_hour each day, then send. Survives any error
except CancelledError so a single failure doesn't kill the schedule."""
try:
while True:
try:
hour = int(self.config["digest_hour"] or 8)
now = _local_now()
target = now.replace(hour=hour, minute=0, second=0, microsecond=0)
if target <= now:
target += timedelta(days=1)
wait_s = (target - now).total_seconds()
self.log.info("Digest scheduled in %.0fs (next: %s)", wait_s, target.isoformat())
await asyncio.sleep(wait_s)
today = date.today().isoformat()
try:
row = await self.database.fetchrow(
"SELECT last_run_date FROM digest_state WHERE id = 1"
)
except Exception:
self.log.exception("digest_state read failed; firing anyway")
row = None
if row and row["last_run_date"] == today:
continue
room = (self.config["notifications_room"] or "").strip()
if not room:
continue
await self._send_digest(room)
try:
await self.database.execute(
"UPDATE digest_state SET last_run_date = $1 WHERE id = 1", today
)
except Exception:
self.log.exception("digest_state write failed (idempotency lost)")
except asyncio.CancelledError:
raise
except Exception:
self.log.exception("digest loop iteration failed; sleeping 1h then retrying")
await asyncio.sleep(3600)
except asyncio.CancelledError:
self.log.info("digest loop cancelled")
raise
async def _ensure_db_schema(self) -> None:
"""Defensive: confirm subscriptions + digest_state exist on startup.
If maubot's UpgradeTable migration didn't run for some reason (it
silently failed to create tables once — see v0.4 history), this
creates them with IF NOT EXISTS so the bot self-heals."""
if not getattr(self, "database", None):
self.log.warning("self.database is None — !media subscribe/digest won't persist")
return
try:
await self.database.execute(
"""
CREATE TABLE IF NOT EXISTS subscriptions (
mxid TEXT NOT NULL,
sonarr_series_id INTEGER NOT NULL,
title TEXT NOT NULL,
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (mxid, sonarr_series_id)
)
"""
)
await self.database.execute(
"""
CREATE TABLE IF NOT EXISTS digest_state (
id INTEGER PRIMARY KEY CHECK (id = 1),
last_run_date TEXT
)
"""
)
await self.database.execute(
"INSERT INTO digest_state (id, last_run_date) VALUES (1, NULL) "
"ON CONFLICT (id) DO NOTHING"
)
self.log.info("DB schema verified (subscriptions + digest_state)")
except Exception:
self.log.exception("DB schema bootstrap failed")
async def _send_digest(self, room: str) -> None:
"""Build and post the daily digest to `room`.
Skips the Completed section entirely when every finished release is
already represented in Recently Added; skips Queued when both arrs are
empty. On Sundays, the emby-cleaner recap goes out as a separate
follow-up message so the main digest stays focused on what's new.
"""
cutoff = datetime.now(timezone.utc).timestamp() - 86400
emby_uid = self._any_emby_uid("")
added: list[dict] = []
if emby_uid:
try:
added = await self.emby.recently_added(emby_uid, limit=20)
except EmbyError:
added = []
nzb_hist, qbt_all, sonarr_q, radarr_q = await asyncio.gather(
self.nzbget.history(), self.qbt.all_torrents(),
self.sonarr.queue(), self.radarr.queue(),
return_exceptions=True,
)
nzb_recent = []
if not isinstance(nzb_hist, Exception):
nzb_recent = [h for h in nzb_hist if (h.get("HistoryTime") or 0) >= cutoff]
qbt_recent = []
if not isinstance(qbt_all, Exception):
qbt_recent = [t for t in qbt_all
if (t.get("completion_on") or 0) >= cutoff
and (t.get("completion_on") or 0) > 0]
s_q = sonarr_q if not isinstance(sonarr_q, Exception) else []
r_q = radarr_q if not isinstance(radarr_q, Exception) else []
added_titles = {self._title_slug(self._emby_match_title(it)) for it in added}
added_titles.discard("")
def _is_unique(release_name: str) -> bool:
slug = self._title_slug(release_name)
if not slug:
return True
return not any(t and t in slug for t in added_titles)
nzb_unique = [h for h in nzb_recent
if _is_unique(h.get("Name") or h.get("NZBName") or "")]
qbt_unique = [t for t in qbt_recent if _is_unique(t.get("name") or "")]
local = _local_now()
is_sunday = local.weekday() == 6 # Mon=0..Sun=6
today_str = local.strftime("%A, %b %d")
digest_lines = [f"**📰 Daily digest — {today_str}**", ""]
digest_lines.append(f"**🆕 Recently added ({len(added)}):**")
for it in added[:8]:
digest_lines.append("- " + self._fmt_emby_item(it))
if not added:
digest_lines.append("- (nothing in the last sweep)")
if nzb_unique or qbt_unique:
digest_lines.append("")
digest_lines.append(
f"**✅ Also completed (not yet in {self._media_label()}): "
f"{len(nzb_unique) + len(qbt_unique)}** "
f"(NZBGet: {len(nzb_unique)} · qBt: {len(qbt_unique)})"
)
for h in nzb_unique[:5]:
name = h.get("Name") or h.get("NZBName") or "?"
size_mb = h.get("FileSizeMB") or 0
digest_lines.append(f"- *{name}* — {size_mb / 1024:.1f} GB")
for t in qbt_unique[:5]:
digest_lines.append(f"- *{t.get('name','?')}* — {_human_bytes(t.get('size') or 0)}")
if s_q or r_q:
digest_lines.append("")
digest_lines.append(f"**📥 Queued: {len(s_q)} TV · {len(r_q)} movies**")
await self._send_chunked(room, digest_lines)
if is_sunday:
cleaner_lines = await self._fetch_emby_cleaner_recap()
if cleaner_lines:
cleanup_lines = ["**🧹 Weekly cleanup (emby-cleaner):**"]
cleanup_lines.extend(cleaner_lines)
await self._send_chunked(room, cleanup_lines)
async def _send_chunked(self, room: str, lines: list[str]) -> None:
"""Pack `lines` into chunks ≤ 3000 chars and send them as separate
messages, prefixing with `(N/M)` only when more than one chunk."""
chunks = self._chunk_lines(lines)
total = len(chunks)
for idx, chunk in enumerate(chunks, 1):
body = chunk if total == 1 else f"**({idx}/{total})**\n\n{chunk}"
await self.client.send_message(
RoomID(room),
TextMessageEventContent(msgtype=MessageType.NOTICE, body=body),
)
@staticmethod
def _emby_match_title(it: dict) -> str:
"""Pick the title that should match a release name — series name for
episodes, item name otherwise."""
if it.get("Type") == "Episode":
return it.get("SeriesName") or it.get("Name") or ""
return it.get("Name") or ""
@staticmethod
def _title_slug(s: str) -> str:
"""Normalize a title or release name to lowercase alphanumerics so
'The.Bear.S03E10.1080p...' and 'The Bear' compare equal-ish."""
return "".join(c for c in s.lower() if c.isalnum())
@staticmethod
def _chunk_lines(lines: list[str], max_len: int = 3000) -> list[str]:
"""Pack lines into chunks <= max_len chars on line boundaries."""
body = "\n".join(lines)
if len(body) <= max_len:
return [body]
chunks: list[str] = []
cur: list[str] = []
cur_len = 0
for line in lines:
extra = len(line) + (1 if cur else 0)
if cur and cur_len + extra > max_len:
chunks.append("\n".join(cur))
cur = [line]
cur_len = len(line)
else:
cur.append(line)
cur_len += extra
if cur:
chunks.append("\n".join(cur))
return chunks
async def _fetch_emby_cleaner_recap(self) -> list[str]:
"""Pull the last 12h of messages from the emby-cleaner ntfy topic."""
ntfy = (self.config["ntfy_url"] or "").rstrip("/")
topic = self.config["emby_cleaner_topic"] or "emby-cleaner"
if not ntfy:
return []
url = f"{ntfy}/{topic}/json?poll=1&since=12h"
try:
async with self.session.get(url) as r:
if r.status >= 400:
self.log.info("ntfy fetch %s%s", url, r.status)
return []
text = await r.text()
except Exception as ex:
self.log.warning("ntfy fetch failed: %s", ex)
return []
out: list[str] = []
for raw in text.splitlines():
raw = raw.strip()
if not raw:
continue
try:
msg = __import__("json").loads(raw)
except Exception:
continue
title = msg.get("title") or ""
body = (msg.get("message") or "").strip()
if title:
out.append(f"- *{title}*")
for line in body.splitlines()[:6]:
line = line.strip()
if line:
out.append(f" {line}")
return out[:25]