The cleaner runs Sunday 4:20 AM and pings ntfy. The Sunday digest now pulls the last 12h of messages from the emby-cleaner topic and appends them to the daily summary. Configurable via ntfy_url + emby_cleaner_topic in base-config.yaml. Also drop the zoneinfo dependency in favour of a manual EST/EDT offset calc (the maubot container ships without tzdata).
1330 lines
55 KiB
Python
1330 lines
55 KiB
Python
"""Media bot — Matrix companion for the homelab media stack.
|
|
|
|
Wraps Seerr (search/request), Emby (library/playback), Sonarr/Radarr (queue/calendar),
|
|
NZBGet + qBittorrent (active downloads).
|
|
|
|
Each Matrix sender is mapped to per-service user IDs via plugin config; unmapped
|
|
senders are rejected. All replies are prefixed with the sender's MXID so notifications
|
|
in shared rooms stay readable (matches the Books Bot convention).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from datetime import date, datetime, timedelta, timezone
|
|
from typing import Any, Optional, Type
|
|
|
|
import aiohttp
|
|
from aiohttp.web import Request, Response, json_response
|
|
|
|
from maubot import MessageEvent, Plugin
|
|
from maubot.handlers import command, event, web
|
|
from mautrix.types import (
|
|
EventType,
|
|
Format,
|
|
ImageInfo,
|
|
MediaMessageEventContent,
|
|
MessageType,
|
|
ReactionEvent,
|
|
RoomID,
|
|
TextMessageEventContent,
|
|
)
|
|
from mautrix.util.async_db import UpgradeTable
|
|
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
|
|
|
|
from .clients.arr import ArrError, RadarrClient, SonarrClient
|
|
from .clients.downloads import DownloadError, NzbgetClient, QbtClient
|
|
from .clients.emby import EmbyClient, EmbyError
|
|
from .clients.seerr import SeerrClient, SeerrError
|
|
from .db import upgrade_table
|
|
|
|
|
|
def _is_us_dst(dt: datetime) -> bool:
|
|
"""Approximate US DST: second Sunday of March → first Sunday of November.
|
|
Used because the maubot container ships without tzdata for ZoneInfo.
|
|
"""
|
|
y = dt.year
|
|
march1 = datetime(y, 3, 1)
|
|
second_sunday = march1 + timedelta(days=((6 - march1.weekday()) % 7) + 7)
|
|
nov1 = datetime(y, 11, 1)
|
|
first_sunday = nov1 + timedelta(days=(6 - nov1.weekday()) % 7)
|
|
naive = dt.replace(tzinfo=None) if dt.tzinfo else dt
|
|
return second_sunday <= naive < first_sunday
|
|
|
|
|
|
def _local_now() -> datetime:
|
|
"""Approximate Indianapolis local time (Eastern, with DST). Naive."""
|
|
utc = datetime.now(timezone.utc)
|
|
offset = -4 if _is_us_dst(utc) else -5
|
|
return (utc + timedelta(hours=offset)).replace(tzinfo=None)
|
|
|
|
|
|
# --- Seerr availability codes ----------------------------------------------
|
|
# https://api-docs.overseerr.dev — `Status` enum
|
|
SEERR_AVAIL = {
|
|
1: "unknown",
|
|
2: "pending",
|
|
3: "processing",
|
|
4: "partial",
|
|
5: "available",
|
|
}
|
|
|
|
|
|
def _human_bytes(n: float) -> str:
|
|
for unit in ("B", "KB", "MB", "GB", "TB"):
|
|
if abs(n) < 1024:
|
|
return f"{n:.1f} {unit}"
|
|
n /= 1024
|
|
return f"{n:.1f} PB"
|
|
|
|
|
|
def _human_eta(seconds: int | float | None) -> str:
|
|
if not seconds or seconds < 0 or seconds > 8640000:
|
|
return "?"
|
|
s = int(seconds)
|
|
h, rem = divmod(s, 3600)
|
|
m, s = divmod(rem, 60)
|
|
if h:
|
|
return f"{h}h{m}m"
|
|
if m:
|
|
return f"{m}m{s}s"
|
|
return f"{s}s"
|
|
|
|
|
|
class Config(BaseProxyConfig):
|
|
def do_update(self, helper: ConfigUpdateHelper) -> None:
|
|
helper.copy("http_timeout")
|
|
helper.copy("default_results")
|
|
helper.copy("posters_enabled")
|
|
helper.copy("admin_users")
|
|
helper.copy("notifications_room")
|
|
helper.copy("seerr_webhook_secret")
|
|
helper.copy("sonarr_webhook_secret")
|
|
helper.copy("digest_enabled")
|
|
helper.copy("digest_hour")
|
|
helper.copy("ntfy_url")
|
|
helper.copy("emby_cleaner_topic")
|
|
helper.copy("seerr")
|
|
helper.copy("sonarr")
|
|
helper.copy("radarr")
|
|
helper.copy("emby")
|
|
helper.copy("nzbget")
|
|
helper.copy("qbittorrent")
|
|
helper.copy("user_map")
|
|
|
|
|
|
# Reactions counted as approve/decline. Skin-tone variants pass via startswith.
|
|
APPROVE_KEYS = ("👍", "✅", "🟢")
|
|
DECLINE_KEYS = ("👎", "❌", "🟥")
|
|
|
|
|
|
def _matches(key: str, candidates: tuple[str, ...]) -> bool:
|
|
return any(key.startswith(c) for c in candidates)
|
|
|
|
|
|
class MediaBot(Plugin):
|
|
config: Config
|
|
session: aiohttp.ClientSession
|
|
|
|
seerr: SeerrClient
|
|
sonarr: SonarrClient
|
|
radarr: RadarrClient
|
|
emby: EmbyClient
|
|
nzbget: NzbgetClient
|
|
qbt: QbtClient
|
|
|
|
# Per-(room, sender) cache of last search/trending results — used by
|
|
# `!media request <N>` to skip re-typing the query. In-memory only; a bot
|
|
# restart clears it (acceptable, results are cheap to rebuild).
|
|
_search_cache: dict[tuple[str, str], list[dict]]
|
|
|
|
# Map of bot-message event_id → Seerr request_id, for reaction-based
|
|
# approve/decline. Capped at 200; oldest entries are evicted on overflow.
|
|
_pending_requests: dict[str, int]
|
|
|
|
# Background task running the daily digest scheduler.
|
|
_digest_task: Optional[asyncio.Task]
|
|
|
|
@classmethod
|
|
def get_db_upgrade_table(cls) -> UpgradeTable:
|
|
return upgrade_table
|
|
|
|
async def start(self) -> None:
|
|
self.config.load_and_update()
|
|
timeout = aiohttp.ClientTimeout(total=self.config["http_timeout"])
|
|
self.session = aiohttp.ClientSession(timeout=timeout)
|
|
|
|
s, so, r, e, n, q = (
|
|
self.config["seerr"], self.config["sonarr"], self.config["radarr"],
|
|
self.config["emby"], self.config["nzbget"], self.config["qbittorrent"],
|
|
)
|
|
self.seerr = SeerrClient(self.session, s["url"], s["api_key"])
|
|
self.sonarr = SonarrClient(self.session, so["url"], so["api_key"])
|
|
self.radarr = RadarrClient(self.session, r["url"], r["api_key"])
|
|
self.emby = EmbyClient(self.session, e["url"], e["api_key"])
|
|
self.nzbget = NzbgetClient(self.session, n["url"], n["username"], n["password"])
|
|
self.qbt = QbtClient(self.session, q["url"], q["username"], q["password"])
|
|
|
|
self._search_cache = {}
|
|
self._pending_requests = {}
|
|
self._digest_task = None
|
|
if self.config["digest_enabled"] and (self.config["notifications_room"] or "").strip():
|
|
self._digest_task = asyncio.create_task(self._digest_loop())
|
|
self.log.info("Media bot started — users=%d", len(self.config["user_map"] or {}))
|
|
|
|
async def stop(self) -> None:
|
|
if self._digest_task and not self._digest_task.done():
|
|
self._digest_task.cancel()
|
|
if hasattr(self, "session"):
|
|
await self.session.close()
|
|
|
|
@classmethod
|
|
def get_config_class(cls) -> Type[BaseProxyConfig]:
|
|
return Config
|
|
|
|
# ---------- helpers ----------
|
|
|
|
def _resolve_user(self, sender: str) -> Optional[dict]:
|
|
return (self.config["user_map"] or {}).get(sender)
|
|
|
|
async def _say(self, evt: MessageEvent, message: str) -> None:
|
|
await evt.respond(f"{evt.sender}\n\n{message}")
|
|
|
|
async def _reject_unmapped(self, evt: MessageEvent) -> bool:
|
|
if not self._resolve_user(evt.sender):
|
|
self.log.warning("Unauthorized sender: %s", evt.sender)
|
|
await self._say(evt, "Sorry, your Matrix user isn't authorized to use the media bot.")
|
|
return True
|
|
return False
|
|
|
|
def _user_pref(self, sender: str, key: str, default: Any) -> Any:
|
|
cfg = self._resolve_user(sender) or {}
|
|
val = cfg.get(key)
|
|
return val if val is not None else default
|
|
|
|
def _result_count(self, sender: str) -> int:
|
|
return int(self._user_pref(sender, "result_count", self.config["default_results"]))
|
|
|
|
def _stash_search(self, evt: MessageEvent, results: list[dict]) -> None:
|
|
"""Cache search/trending results so the user can `!media request <N>`."""
|
|
self._search_cache[(evt.room_id, evt.sender)] = list(results)
|
|
|
|
def _track_request(self, event_id: str, request_id: int) -> None:
|
|
self._pending_requests[event_id] = request_id
|
|
if len(self._pending_requests) > 200:
|
|
oldest = next(iter(self._pending_requests))
|
|
del self._pending_requests[oldest]
|
|
|
|
async def _post_poster(self, room_id: str, image_url: str, caption: str) -> bool:
|
|
"""Download a poster URL, upload to Matrix, send as m.image. Returns True on success."""
|
|
if not self.config["posters_enabled"]:
|
|
return False
|
|
try:
|
|
async with self.session.get(image_url) as r:
|
|
if r.status >= 400:
|
|
self.log.info("poster fetch %s → %s", image_url, r.status)
|
|
return False
|
|
data = await r.read()
|
|
mime = r.headers.get("Content-Type", "image/jpeg").split(";")[0].strip()
|
|
mxc = await self.client.upload_media(data, mime_type=mime)
|
|
content = MediaMessageEventContent(
|
|
msgtype=MessageType.IMAGE,
|
|
body=caption,
|
|
url=mxc,
|
|
info=ImageInfo(mimetype=mime, size=len(data)),
|
|
)
|
|
await self.client.send_message(RoomID(room_id), content)
|
|
return True
|
|
except Exception as ex:
|
|
self.log.warning("poster upload failed: %s", ex)
|
|
return False
|
|
|
|
# ---------- top-level command ----------
|
|
|
|
@command.new("media", aliases=["m"], help="Media stack bot — try `!media help`",
|
|
require_subcommand=True)
|
|
async def media(self) -> None:
|
|
pass
|
|
|
|
@media.subcommand("help", help="Show available commands")
|
|
async def cmd_help(self, evt: MessageEvent) -> None:
|
|
await evt.mark_read()
|
|
msg = (
|
|
"**Media bot commands**\n\n"
|
|
"*Search & request (Seerr)*\n"
|
|
"- `!media search <query>` — top results (numbered)\n"
|
|
"- `!media request <query> [--tv|--movie]` — request top hit\n"
|
|
"- `!media request <N>` — pick item N from your last search/trending\n"
|
|
"- `!media requests` — your pending/processing requests\n"
|
|
"- `!media trending` — what's trending (numbered)\n\n"
|
|
"*Library (Emby)*\n"
|
|
"- `!media nowplaying` — active sessions\n"
|
|
"- `!media recent [movies|tv]` — recently added\n"
|
|
"- `!media watched` — what you recently finished\n"
|
|
"- `!media find <query>` — search the existing library\n"
|
|
"- `!media resume` — your continue-watching list\n"
|
|
"- `!media random [movie|tv]` — random unwatched pick\n\n"
|
|
"*Sonarr / Radarr*\n"
|
|
"- `!media queue` — combined queue\n"
|
|
"- `!media upcoming` — Sonarr calendar (next 7 days)\n"
|
|
"- `!media missing` — Sonarr wanted/missing\n"
|
|
"- `!media health` — active warnings on either arr\n\n"
|
|
"*Downloads (NZBGet + qBt)*\n"
|
|
"- `!media activity` — current downloads\n"
|
|
"- `!media completed` — finished in the last 24h\n"
|
|
"- `!media speed` — aggregate down/up speed\n"
|
|
"- `!media pause` / `!media unpause` — global pause/resume\n\n"
|
|
"*Subscriptions*\n"
|
|
"- `!media subscribe <show>` — ping me when new episodes import\n"
|
|
"- `!media unsubscribe <show>` — stop pings\n"
|
|
"- `!media subscriptions` — list yours\n"
|
|
"- `!media digest` — fire today's digest now (admin/test)\n\n"
|
|
"_Admins can react 👍/👎 on a request to approve/decline._"
|
|
)
|
|
await self._say(evt, msg)
|
|
|
|
# ---------- Seerr ----------
|
|
|
|
@media.subcommand("search", help="Search Seerr for a movie or show")
|
|
@command.argument("query", pass_raw=True, required=True)
|
|
async def cmd_search(self, evt: MessageEvent, query: str) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
try:
|
|
results = await self.seerr.search(query)
|
|
except SeerrError as ex:
|
|
await self._say(evt, f"Search failed: {ex}")
|
|
return
|
|
results = [r for r in results if r.get("mediaType") in ("movie", "tv")]
|
|
if not results:
|
|
await self._say(evt, f"No Seerr results for **{query}**.")
|
|
return
|
|
n = self._result_count(evt.sender)
|
|
shown = results[:n]
|
|
self._stash_search(evt, shown)
|
|
lines = [f"**Top {len(shown)} for '{query}':**"]
|
|
for i, r in enumerate(shown, 1):
|
|
lines.append(f"{i}. " + self._fmt_seerr(r, leading_dash=False))
|
|
lines.append("")
|
|
lines.append("_Pick one with `!media request <N>`._")
|
|
await self._say(evt, "\n".join(lines))
|
|
|
|
@media.subcommand("request",
|
|
help="Request a movie/show. Pass `<query>` or a number from a recent search.")
|
|
@command.argument("query", pass_raw=True, required=True)
|
|
async def cmd_request(self, evt: MessageEvent, query: str) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
user_cfg = self._resolve_user(evt.sender) or {}
|
|
seerr_uid = user_cfg.get("seerr_user_id")
|
|
if not seerr_uid:
|
|
await self._say(evt, "Your Matrix user has no Seerr user_id mapped — ask maddox.")
|
|
return
|
|
|
|
# Numbered selection: !media request 2 → 2nd item from last search/trending
|
|
stripped = query.strip()
|
|
if stripped.isdigit():
|
|
cached = self._search_cache.get((evt.room_id, evt.sender)) or []
|
|
idx = int(stripped) - 1
|
|
if not cached:
|
|
await self._say(evt, "No recent search results to pick from. Run `!media search <query>` first.")
|
|
return
|
|
if idx < 0 or idx >= len(cached):
|
|
await self._say(evt, f"Pick a number 1-{len(cached)}.")
|
|
return
|
|
top = cached[idx]
|
|
await self._do_request(evt, seerr_uid, top)
|
|
return
|
|
|
|
# Free-text path: optional --tv/--movie flag, else fall back to user default
|
|
forced_type: Optional[str] = None
|
|
cleaned: list[str] = []
|
|
for w in stripped.split():
|
|
if w == "--tv":
|
|
forced_type = "tv"
|
|
elif w == "--movie":
|
|
forced_type = "movie"
|
|
else:
|
|
cleaned.append(w)
|
|
if forced_type is None:
|
|
forced_type = user_cfg.get("default_media_type")
|
|
q = " ".join(cleaned).strip()
|
|
if not q:
|
|
await self._say(evt, "Need a search query, e.g. `!media request dune --movie`.")
|
|
return
|
|
try:
|
|
results = await self.seerr.search(q)
|
|
except SeerrError as ex:
|
|
await self._say(evt, f"Search failed: {ex}")
|
|
return
|
|
candidates = [r for r in results if r.get("mediaType") in ("movie", "tv")]
|
|
if forced_type:
|
|
candidates = [r for r in candidates if r.get("mediaType") == forced_type]
|
|
if not candidates:
|
|
await self._say(evt, f"No matching results for **{q}**.")
|
|
return
|
|
await self._do_request(evt, seerr_uid, candidates[0])
|
|
|
|
async def _do_request(self, evt: MessageEvent, seerr_uid: int, item: dict) -> None:
|
|
media_type = item.get("mediaType")
|
|
tmdb_id = item.get("id")
|
|
title = item.get("title") or item.get("name") or "?"
|
|
if not (media_type and tmdb_id):
|
|
await self._say(evt, f"**{title}** is missing a media type or TMDB id — can't request.")
|
|
return
|
|
try:
|
|
req = await self.seerr.request(media_type, tmdb_id, seerr_uid)
|
|
except SeerrError as ex:
|
|
await self._say(evt, f"Request failed: {ex}")
|
|
return
|
|
status_id = (req or {}).get("status")
|
|
status = {1: "pending approval", 2: "approved", 3: "declined"}.get(status_id, str(status_id))
|
|
request_id = (req or {}).get("id")
|
|
|
|
# Optional poster preview
|
|
poster = SeerrClient.poster_url(item)
|
|
if poster:
|
|
await self._post_poster(evt.room_id, poster, f"{title} ({media_type})")
|
|
|
|
msg = f"Requested **{title}** ({media_type}) — status: *{status}*."
|
|
if status_id == 1 and request_id:
|
|
msg += "\n\n_Admins: react 👍 to approve or 👎 to decline._"
|
|
sent_id = await evt.respond(f"{evt.sender}\n\n{msg}")
|
|
if request_id and status_id == 1 and sent_id:
|
|
self._track_request(sent_id, request_id)
|
|
|
|
@media.subcommand("requests", help="Show your pending/processing Seerr requests")
|
|
async def cmd_requests(self, evt: MessageEvent) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
user_cfg = self._resolve_user(evt.sender)
|
|
seerr_uid = (user_cfg or {}).get("seerr_user_id")
|
|
if not seerr_uid:
|
|
await self._say(evt, "Your Matrix user has no Seerr user_id mapped — ask maddox.")
|
|
return
|
|
try:
|
|
reqs = await self.seerr.user_requests(seerr_uid, take=10)
|
|
except SeerrError as ex:
|
|
await self._say(evt, f"Lookup failed: {ex}")
|
|
return
|
|
if not reqs:
|
|
await self._say(evt, "No pending or processing requests.")
|
|
return
|
|
lines = [f"**Your requests ({len(reqs)}):**"]
|
|
for r in reqs:
|
|
mi = r.get("media") or {}
|
|
t = mi.get("title") or mi.get("name") or f"tmdb:{mi.get('tmdbId')}"
|
|
mt = mi.get("mediaType") or "?"
|
|
avail = SEERR_AVAIL.get(mi.get("status", 0), "?")
|
|
lines.append(f"- *{t}* ({mt}) — {avail}")
|
|
await self._say(evt, "\n".join(lines))
|
|
|
|
@media.subcommand("trending", help="Show what's trending in Seerr")
|
|
async def cmd_trending(self, evt: MessageEvent) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
try:
|
|
results = await self.seerr.trending()
|
|
except SeerrError as ex:
|
|
await self._say(evt, f"Trending fetch failed: {ex}")
|
|
return
|
|
n = self._result_count(evt.sender)
|
|
results = [r for r in results if r.get("mediaType") in ("movie", "tv")][:n]
|
|
if not results:
|
|
await self._say(evt, "Nothing trending right now.")
|
|
return
|
|
self._stash_search(evt, results)
|
|
lines = [f"**Trending ({len(results)}):**"]
|
|
for i, r in enumerate(results, 1):
|
|
lines.append(f"{i}. " + self._fmt_seerr(r, leading_dash=False))
|
|
lines.append("")
|
|
lines.append("_Pick one with `!media request <N>`._")
|
|
await self._say(evt, "\n".join(lines))
|
|
|
|
def _fmt_seerr(self, r: dict, leading_dash: bool = True) -> str:
|
|
title = r.get("title") or r.get("name") or "?"
|
|
mt = r.get("mediaType") or "?"
|
|
date_s = r.get("releaseDate") or r.get("firstAirDate") or ""
|
|
year = date_s[:4] if date_s else ""
|
|
info = (r.get("mediaInfo") or {}).get("status")
|
|
avail = SEERR_AVAIL.get(info, None) if info else None
|
|
bits = [f"*{title}*"]
|
|
if year:
|
|
bits.append(f"({year})")
|
|
bits.append(f"— {mt}")
|
|
if avail:
|
|
bits.append(f"[{avail}]")
|
|
prefix = "- " if leading_dash else ""
|
|
return prefix + " ".join(bits)
|
|
|
|
# ---------- Emby ----------
|
|
|
|
@media.subcommand("nowplaying", aliases=["np"], help="Active Emby sessions")
|
|
async def cmd_nowplaying(self, evt: MessageEvent) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
try:
|
|
sessions = await self.emby.sessions()
|
|
except EmbyError as ex:
|
|
await self._say(evt, f"Emby fetch failed: {ex}")
|
|
return
|
|
active = [s for s in sessions if s.get("NowPlayingItem")]
|
|
if not active:
|
|
await self._say(evt, "Nothing playing on Emby right now.")
|
|
return
|
|
lines = [f"**Now playing on Emby ({len(active)}):**"]
|
|
for s in active:
|
|
item = s.get("NowPlayingItem") or {}
|
|
user = s.get("UserName") or "?"
|
|
client = s.get("Client") or ""
|
|
t = self._fmt_emby_item(item)
|
|
ps = s.get("PlayState") or {}
|
|
pos = ps.get("PositionTicks") or 0
|
|
run = item.get("RunTimeTicks") or 0
|
|
pct = (pos / run * 100) if run else 0
|
|
paused = " (paused)" if ps.get("IsPaused") else ""
|
|
lines.append(f"- **{user}** — {t} · {pct:.0f}%{paused} [{client}]")
|
|
await self._say(evt, "\n".join(lines))
|
|
|
|
@media.subcommand("recent", help="Recently added — `!media recent [movies|tv]`")
|
|
@command.argument("kind", required=False)
|
|
async def cmd_recent(self, evt: MessageEvent, kind: str = "") -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
# Use the sender's emby_user_id if set, else fall back to the first mapped user's id
|
|
user_cfg = self._resolve_user(evt.sender) or {}
|
|
emby_uid = user_cfg.get("emby_user_id")
|
|
if not emby_uid or emby_uid == "TBD":
|
|
for v in (self.config["user_map"] or {}).values():
|
|
if v and v.get("emby_user_id") and v["emby_user_id"] != "TBD":
|
|
emby_uid = v["emby_user_id"]
|
|
break
|
|
if not emby_uid:
|
|
await self._say(evt, "No usable Emby user_id in config.")
|
|
return
|
|
item_types = None
|
|
if kind.lower() in ("movie", "movies"):
|
|
item_types = "Movie"
|
|
elif kind.lower() in ("tv", "series", "shows"):
|
|
item_types = "Series,Episode"
|
|
try:
|
|
items = await self.emby.recently_added(emby_uid, limit=10, item_types=item_types)
|
|
except EmbyError as ex:
|
|
await self._say(evt, f"Emby fetch failed: {ex}")
|
|
return
|
|
if not items:
|
|
await self._say(evt, "Nothing recently added.")
|
|
return
|
|
label = f" ({kind})" if kind else ""
|
|
lines = [f"**Recently added{label} ({len(items)}):**"]
|
|
for it in items:
|
|
lines.append("- " + self._fmt_emby_item(it))
|
|
await self._say(evt, "\n".join(lines))
|
|
|
|
@media.subcommand("watched", help="Your recently watched items on Emby")
|
|
async def cmd_watched(self, evt: MessageEvent) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
user_cfg = self._resolve_user(evt.sender) or {}
|
|
emby_uid = user_cfg.get("emby_user_id")
|
|
if not emby_uid or emby_uid == "TBD":
|
|
await self._say(evt, "Your Matrix user has no Emby user_id mapped — ask maddox.")
|
|
return
|
|
try:
|
|
items = await self.emby.user_played(emby_uid, limit=10)
|
|
except EmbyError as ex:
|
|
await self._say(evt, f"Emby fetch failed: {ex}")
|
|
return
|
|
if not items:
|
|
await self._say(evt, "No recently watched items.")
|
|
return
|
|
lines = [f"**Recently watched ({len(items)}):**"]
|
|
for it in items:
|
|
lines.append("- " + self._fmt_emby_item(it))
|
|
await self._say(evt, "\n".join(lines))
|
|
|
|
@media.subcommand("find", help="Search the existing Emby library")
|
|
@command.argument("query", pass_raw=True, required=True)
|
|
async def cmd_find(self, evt: MessageEvent, query: str) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
emby_uid = self._any_emby_uid(evt.sender)
|
|
if not emby_uid:
|
|
await self._say(evt, "No usable Emby user_id in config.")
|
|
return
|
|
try:
|
|
items = await self.emby.find(emby_uid, query, limit=10)
|
|
except EmbyError as ex:
|
|
await self._say(evt, f"Emby fetch failed: {ex}")
|
|
return
|
|
if not items:
|
|
await self._say(evt, f"Nothing in the library matching **{query}**.")
|
|
return
|
|
lines = [f"**Found in library ({len(items)}):**"]
|
|
for it in items:
|
|
lines.append("- " + self._fmt_emby_item(it))
|
|
await self._say(evt, "\n".join(lines))
|
|
|
|
@media.subcommand("resume", help="Your continue-watching list")
|
|
async def cmd_resume(self, evt: MessageEvent) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
user_cfg = self._resolve_user(evt.sender) or {}
|
|
emby_uid = user_cfg.get("emby_user_id")
|
|
if not emby_uid or emby_uid == "TBD":
|
|
await self._say(evt, "Your Matrix user has no Emby user_id mapped — ask maddox.")
|
|
return
|
|
try:
|
|
items = await self.emby.resume(emby_uid, limit=10)
|
|
except EmbyError as ex:
|
|
await self._say(evt, f"Emby fetch failed: {ex}")
|
|
return
|
|
if not items:
|
|
await self._say(evt, "Nothing to resume — you've finished everything you started.")
|
|
return
|
|
lines = [f"**Continue watching ({len(items)}):**"]
|
|
for it in items:
|
|
pct = ""
|
|
ud = it.get("UserData") or {}
|
|
played_pct = ud.get("PlayedPercentage")
|
|
if played_pct:
|
|
pct = f" · {played_pct:.0f}%"
|
|
lines.append("- " + self._fmt_emby_item(it) + pct)
|
|
await self._say(evt, "\n".join(lines))
|
|
|
|
@media.subcommand("random", help="Random unwatched pick — `!media random [movie|tv]`")
|
|
@command.argument("kind", required=False)
|
|
async def cmd_random(self, evt: MessageEvent, kind: str = "") -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
user_cfg = self._resolve_user(evt.sender) or {}
|
|
emby_uid = user_cfg.get("emby_user_id")
|
|
if not emby_uid or emby_uid == "TBD":
|
|
emby_uid = self._any_emby_uid(evt.sender)
|
|
if not emby_uid:
|
|
await self._say(evt, "No usable Emby user_id in config.")
|
|
return
|
|
item_type = "Movie"
|
|
if kind.lower() in ("tv", "series", "show", "shows"):
|
|
item_type = "Series"
|
|
try:
|
|
item = await self.emby.random_unplayed(emby_uid, item_type=item_type)
|
|
except EmbyError as ex:
|
|
await self._say(evt, f"Emby fetch failed: {ex}")
|
|
return
|
|
if not item:
|
|
await self._say(evt, f"No unwatched {item_type.lower()}s found — go you.")
|
|
return
|
|
line = self._fmt_emby_item(item)
|
|
overview = (item.get("Overview") or "").strip()
|
|
if len(overview) > 240:
|
|
overview = overview[:240].rsplit(" ", 1)[0] + "…"
|
|
poster = self.emby.poster_url(item)
|
|
if poster:
|
|
await self._post_poster(evt.room_id, poster, item.get("Name") or "poster")
|
|
msg = f"**Random pick:** {line}"
|
|
if overview:
|
|
msg += f"\n\n{overview}"
|
|
await self._say(evt, msg)
|
|
|
|
def _any_emby_uid(self, sender: str) -> Optional[str]:
|
|
"""Sender's emby_user_id, falling back to any other mapped user's id."""
|
|
cfg = self._resolve_user(sender) or {}
|
|
uid = cfg.get("emby_user_id")
|
|
if uid and uid != "TBD":
|
|
return uid
|
|
for v in (self.config["user_map"] or {}).values():
|
|
if v and v.get("emby_user_id") and v["emby_user_id"] != "TBD":
|
|
return v["emby_user_id"]
|
|
return None
|
|
|
|
def _fmt_emby_item(self, it: dict) -> str:
|
|
t = it.get("Type")
|
|
name = it.get("Name") or "?"
|
|
year = it.get("ProductionYear")
|
|
if t == "Episode":
|
|
series = it.get("SeriesName") or "?"
|
|
sn = it.get("ParentIndexNumber")
|
|
ep = it.get("IndexNumber")
|
|
tag = f"S{sn:02d}E{ep:02d}" if sn and ep else ""
|
|
return f"*{series}* {tag} — {name}".strip()
|
|
if year:
|
|
return f"*{name}* ({year})"
|
|
return f"*{name}*"
|
|
|
|
# ---------- Sonarr / Radarr ----------
|
|
|
|
@media.subcommand("queue", help="Combined Sonarr + Radarr queue")
|
|
async def cmd_queue(self, evt: MessageEvent) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
try:
|
|
sonarr_q, radarr_q = await asyncio.gather(
|
|
self.sonarr.queue(), self.radarr.queue(),
|
|
return_exceptions=True,
|
|
)
|
|
except Exception as ex:
|
|
await self._say(evt, f"Queue fetch failed: {ex}")
|
|
return
|
|
lines: list[str] = []
|
|
if isinstance(sonarr_q, Exception):
|
|
lines.append(f"**Sonarr:** unreachable ({sonarr_q})")
|
|
else:
|
|
lines.append(f"**Sonarr queue ({len(sonarr_q)}):**")
|
|
for q in sonarr_q[:10]:
|
|
series = (q.get("series") or {}).get("title") or "?"
|
|
ep = q.get("episode") or {}
|
|
tag = ""
|
|
if ep.get("seasonNumber") is not None and ep.get("episodeNumber") is not None:
|
|
tag = f" S{ep['seasonNumber']:02d}E{ep['episodeNumber']:02d}"
|
|
status = q.get("status") or "?"
|
|
pct = self._arr_pct(q)
|
|
lines.append(f"- *{series}*{tag} — {status} {pct}")
|
|
lines.append("")
|
|
if isinstance(radarr_q, Exception):
|
|
lines.append(f"**Radarr:** unreachable ({radarr_q})")
|
|
else:
|
|
lines.append(f"**Radarr queue ({len(radarr_q)}):**")
|
|
for q in radarr_q[:10]:
|
|
title = (q.get("movie") or {}).get("title") or q.get("title") or "?"
|
|
status = q.get("status") or "?"
|
|
pct = self._arr_pct(q)
|
|
lines.append(f"- *{title}* — {status} {pct}")
|
|
await self._say(evt, "\n".join(lines))
|
|
|
|
@media.subcommand("upcoming", help="Sonarr calendar — episodes airing in next 7 days")
|
|
async def cmd_upcoming(self, evt: MessageEvent) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
today = date.today()
|
|
end = today + timedelta(days=7)
|
|
try:
|
|
cal = await self.sonarr.calendar(today.isoformat(), end.isoformat())
|
|
except ArrError as ex:
|
|
await self._say(evt, f"Calendar fetch failed: {ex}")
|
|
return
|
|
if not cal:
|
|
await self._say(evt, "Nothing airing in the next 7 days.")
|
|
return
|
|
lines = [f"**Upcoming ({len(cal)}):**"]
|
|
for ep in cal[:15]:
|
|
series = (ep.get("series") or {}).get("title") or "?"
|
|
sn = ep.get("seasonNumber")
|
|
en = ep.get("episodeNumber")
|
|
tag = f"S{sn:02d}E{en:02d}" if sn is not None and en is not None else ""
|
|
t = ep.get("title") or ""
|
|
air = (ep.get("airDateUtc") or ep.get("airDate") or "")[:10]
|
|
lines.append(f"- {air}: *{series}* {tag} — {t}")
|
|
await self._say(evt, "\n".join(lines))
|
|
|
|
@media.subcommand("missing", help="Sonarr wanted/missing list")
|
|
async def cmd_missing(self, evt: MessageEvent) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
try:
|
|
items = await self.sonarr.missing(page_size=10)
|
|
except ArrError as ex:
|
|
await self._say(evt, f"Missing fetch failed: {ex}")
|
|
return
|
|
if not items:
|
|
await self._say(evt, "No missing episodes — Sonarr is happy.")
|
|
return
|
|
lines = [f"**Wanted/Missing ({len(items)}):**"]
|
|
for ep in items:
|
|
series = (ep.get("series") or {}).get("title") or "?"
|
|
sn = ep.get("seasonNumber")
|
|
en = ep.get("episodeNumber")
|
|
tag = f"S{sn:02d}E{en:02d}" if sn is not None and en is not None else ""
|
|
t = ep.get("title") or ""
|
|
air = (ep.get("airDateUtc") or ep.get("airDate") or "")[:10]
|
|
lines.append(f"- *{series}* {tag} — {t} (aired {air})")
|
|
await self._say(evt, "\n".join(lines))
|
|
|
|
@media.subcommand("health", help="Sonarr + Radarr active health warnings")
|
|
async def cmd_health(self, evt: MessageEvent) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
sonarr_h, radarr_h = await asyncio.gather(
|
|
self.sonarr.health(), self.radarr.health(),
|
|
return_exceptions=True,
|
|
)
|
|
lines: list[str] = []
|
|
for label, data in (("Sonarr", sonarr_h), ("Radarr", radarr_h)):
|
|
if isinstance(data, Exception):
|
|
lines.append(f"**{label}:** unreachable ({data})")
|
|
continue
|
|
if not data:
|
|
lines.append(f"**{label}:** ✅ healthy")
|
|
continue
|
|
lines.append(f"**{label} ({len(data)} warnings):**")
|
|
for h in data:
|
|
src = h.get("source") or "?"
|
|
msg = h.get("message") or "?"
|
|
lines.append(f"- [{src}] {msg}")
|
|
await self._say(evt, "\n".join(lines))
|
|
|
|
def _arr_pct(self, q: dict) -> str:
|
|
size = q.get("size") or 0
|
|
left = q.get("sizeleft") or 0
|
|
if size <= 0:
|
|
return ""
|
|
pct = (1 - left / size) * 100
|
|
return f"{pct:.0f}%"
|
|
|
|
# ---------- Downloads ----------
|
|
|
|
@media.subcommand("activity", aliases=["dl"], help="NZBGet + qBittorrent active downloads")
|
|
async def cmd_activity(self, evt: MessageEvent) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
nzb_res, qbt_res = await asyncio.gather(
|
|
self.nzbget.listgroups(),
|
|
self.qbt.downloading(),
|
|
return_exceptions=True,
|
|
)
|
|
lines: list[str] = []
|
|
# NZBGet
|
|
if isinstance(nzb_res, Exception):
|
|
lines.append(f"**NZBGet:** unreachable ({nzb_res})")
|
|
else:
|
|
active = [g for g in nzb_res if (g.get("RemainingSizeMB") or 0) > 0]
|
|
lines.append(f"**NZBGet ({len(active)} active):**" if active else "**NZBGet:** idle")
|
|
for g in active[:8]:
|
|
name = g.get("NZBNicename") or g.get("NZBName") or "?"
|
|
rate_mbps = (g.get("DownloadRate") or 0) / 1024 / 1024 # NZBGet reports B/s
|
|
size_mb = g.get("FileSizeMB") or 0
|
|
left_mb = g.get("RemainingSizeMB") or 0
|
|
pct = (1 - left_mb / size_mb) * 100 if size_mb else 0
|
|
lines.append(f"- *{name}* — {pct:.0f}% · {rate_mbps:.1f} MB/s")
|
|
lines.append("")
|
|
# qBittorrent
|
|
if isinstance(qbt_res, Exception):
|
|
lines.append(f"**qBittorrent:** unreachable ({qbt_res})")
|
|
else:
|
|
lines.append(f"**qBittorrent ({len(qbt_res)} active):**" if qbt_res else "**qBittorrent:** idle")
|
|
for t in qbt_res[:8]:
|
|
name = t.get("name") or "?"
|
|
pct = (t.get("progress") or 0) * 100
|
|
speed = _human_bytes(t.get("dlspeed") or 0) + "/s"
|
|
eta = _human_eta(t.get("eta"))
|
|
lines.append(f"- *{name}* — {pct:.0f}% · {speed} · ETA {eta}")
|
|
await self._say(evt, "\n".join(lines))
|
|
|
|
@media.subcommand("speed", help="Aggregate down/up speeds across NZBGet + qBt")
|
|
async def cmd_speed(self, evt: MessageEvent) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
nzb_status, qbt_xfer = await asyncio.gather(
|
|
self.nzbget.status(), self.qbt.transfer_info(),
|
|
return_exceptions=True,
|
|
)
|
|
lines: list[str] = []
|
|
if isinstance(nzb_status, Exception):
|
|
lines.append(f"**NZBGet:** unreachable ({nzb_status})")
|
|
else:
|
|
dl = nzb_status.get("DownloadRate") or 0 # bytes/s
|
|
paused = nzb_status.get("DownloadPaused") or nzb_status.get("ServerPaused")
|
|
tag = " (paused)" if paused else ""
|
|
lines.append(f"**NZBGet:** ↓ {_human_bytes(dl)}/s{tag}")
|
|
if isinstance(qbt_xfer, Exception):
|
|
lines.append(f"**qBittorrent:** unreachable ({qbt_xfer})")
|
|
else:
|
|
dl = qbt_xfer.get("dl_info_speed") or 0
|
|
ul = qbt_xfer.get("up_info_speed") or 0
|
|
state = qbt_xfer.get("connection_status") or "?"
|
|
lines.append(f"**qBittorrent:** ↓ {_human_bytes(dl)}/s · ↑ {_human_bytes(ul)}/s [{state}]")
|
|
await self._say(evt, "\n".join(lines))
|
|
|
|
@media.subcommand("completed", help="Downloads finished in the last 24h")
|
|
async def cmd_completed(self, evt: MessageEvent) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
cutoff = datetime.now(timezone.utc).timestamp() - 86400
|
|
nzb_hist, qbt_all = await asyncio.gather(
|
|
self.nzbget.history(), self.qbt.all_torrents(),
|
|
return_exceptions=True,
|
|
)
|
|
lines: list[str] = []
|
|
# NZBGet history items have HistoryTime (epoch) and FileSizeMB
|
|
if isinstance(nzb_hist, Exception):
|
|
lines.append(f"**NZBGet:** unreachable ({nzb_hist})")
|
|
else:
|
|
recent = [h for h in nzb_hist if (h.get("HistoryTime") or 0) >= cutoff]
|
|
lines.append(f"**NZBGet — completed last 24h ({len(recent)}):**")
|
|
for h in recent[:10]:
|
|
name = h.get("Name") or h.get("NZBName") or "?"
|
|
size_mb = h.get("FileSizeMB") or 0
|
|
status = h.get("Status") or "?"
|
|
lines.append(f"- *{name}* — {size_mb / 1024:.1f} GB [{status}]")
|
|
lines.append("")
|
|
# qBt: completion_on (epoch); finished torrents have it set
|
|
if isinstance(qbt_all, Exception):
|
|
lines.append(f"**qBittorrent:** unreachable ({qbt_all})")
|
|
else:
|
|
recent = [t for t in qbt_all
|
|
if (t.get("completion_on") or 0) >= cutoff
|
|
and (t.get("completion_on") or 0) > 0]
|
|
lines.append(f"**qBittorrent — completed last 24h ({len(recent)}):**")
|
|
for t in recent[:10]:
|
|
name = t.get("name") or "?"
|
|
size = t.get("size") or 0
|
|
lines.append(f"- *{name}* — {_human_bytes(size)}")
|
|
await self._say(evt, "\n".join(lines))
|
|
|
|
@media.subcommand("pause", help="Pause all downloads (NZBGet + qBittorrent)")
|
|
async def cmd_pause(self, evt: MessageEvent) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
results = await asyncio.gather(
|
|
self.nzbget.pause(), self.qbt.pause_all(),
|
|
return_exceptions=True,
|
|
)
|
|
nzb_ok = not isinstance(results[0], Exception)
|
|
qbt_ok = not isinstance(results[1], Exception)
|
|
msg = (
|
|
f"NZBGet: {'paused ⏸' if nzb_ok else f'failed ({results[0]})'}\n"
|
|
f"qBittorrent: {'paused ⏸' if qbt_ok else f'failed ({results[1]})'}"
|
|
)
|
|
await self._say(evt, msg)
|
|
|
|
@media.subcommand("unpause", help="Resume all downloads (NZBGet + qBittorrent)")
|
|
async def cmd_unpause(self, evt: MessageEvent) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
results = await asyncio.gather(
|
|
self.nzbget.unpause(), self.qbt.resume_all(),
|
|
return_exceptions=True,
|
|
)
|
|
nzb_ok = not isinstance(results[0], Exception)
|
|
qbt_ok = not isinstance(results[1], Exception)
|
|
msg = (
|
|
f"NZBGet: {'resumed ▶' if nzb_ok else f'failed ({results[0]})'}\n"
|
|
f"qBittorrent: {'resumed ▶' if qbt_ok else f'failed ({results[1]})'}"
|
|
)
|
|
await self._say(evt, msg)
|
|
|
|
# ---------- Reactions: admin approve/decline ----------
|
|
|
|
@event.on(EventType.REACTION)
|
|
async def on_reaction(self, evt: ReactionEvent) -> None:
|
|
relates = getattr(evt.content, "relates_to", None)
|
|
if not relates or not relates.event_id or not relates.key:
|
|
return
|
|
request_id = self._pending_requests.get(relates.event_id)
|
|
if not request_id:
|
|
return # not one of our request messages
|
|
|
|
admins = self.config["admin_users"] or []
|
|
if evt.sender not in admins:
|
|
self.log.info("Reaction from non-admin %s ignored", evt.sender)
|
|
return
|
|
|
|
key = relates.key
|
|
if _matches(key, APPROVE_KEYS):
|
|
label, fn = "approved ✅", self.seerr.approve
|
|
elif _matches(key, DECLINE_KEYS):
|
|
label, fn = "declined ❌", self.seerr.decline
|
|
else:
|
|
return # unrelated reaction
|
|
|
|
try:
|
|
await fn(request_id)
|
|
except SeerrError as ex:
|
|
await self.client.send_text(evt.room_id, text=f"Request {request_id} action failed: {ex}")
|
|
return
|
|
self._pending_requests.pop(relates.event_id, None)
|
|
await self.client.send_text(
|
|
evt.room_id, text=f"Request {request_id} {label} (by {evt.sender})"
|
|
)
|
|
|
|
# ---------- Webhook: Seerr → Matrix notifications ----------
|
|
|
|
@web.post("/seerr-webhook")
|
|
async def seerr_webhook(self, req: Request) -> Response:
|
|
secret = self.config["seerr_webhook_secret"] or ""
|
|
if secret:
|
|
# Case-insensitive scheme per RFC 7235 — Seerr's UI lowercases "Bearer"
|
|
auth = (req.headers.get("Authorization") or "").strip()
|
|
scheme, _, token = auth.partition(" ")
|
|
if scheme.lower() != "bearer" or token != secret:
|
|
self.log.warning("Seerr webhook bad auth from %s", req.remote)
|
|
return Response(status=401, text="unauthorized")
|
|
|
|
try:
|
|
payload = await req.json()
|
|
except Exception:
|
|
return Response(status=400, text="invalid json")
|
|
|
|
room = (self.config["notifications_room"] or "").strip()
|
|
if not room:
|
|
return json_response({"ok": False, "error": "notifications_room not configured"})
|
|
|
|
text, html = self._format_seerr_event(payload)
|
|
try:
|
|
content = TextMessageEventContent(
|
|
msgtype=MessageType.NOTICE,
|
|
body=text,
|
|
format=Format.HTML,
|
|
formatted_body=html,
|
|
)
|
|
await self.client.send_message(RoomID(room), content)
|
|
except Exception as ex:
|
|
self.log.exception("Failed to post Seerr webhook to %s", room)
|
|
return json_response({"ok": False, "error": str(ex)}, status=500)
|
|
|
|
poster = payload.get("image")
|
|
if poster and self.config["posters_enabled"]:
|
|
await self._post_poster(room, poster, payload.get("subject") or "poster")
|
|
return json_response({"ok": True})
|
|
|
|
def _format_seerr_event(self, p: dict) -> tuple[str, str]:
|
|
nt = (p.get("notification_type") or "").upper()
|
|
subject = p.get("subject") or "?"
|
|
message = p.get("message") or ""
|
|
media = p.get("media") or {}
|
|
request = p.get("request") or {}
|
|
media_type = media.get("media_type") or "?"
|
|
requester = request.get("requestedBy_username") or ""
|
|
|
|
emoji = {
|
|
"MEDIA_PENDING": "📥",
|
|
"MEDIA_APPROVED": "✅",
|
|
"MEDIA_AUTO_APPROVED": "✅",
|
|
"MEDIA_AVAILABLE": "🎬",
|
|
"MEDIA_DECLINED": "❌",
|
|
"MEDIA_FAILED": "⚠️",
|
|
"ISSUE_CREATED": "🐛",
|
|
"ISSUE_COMMENT": "💬",
|
|
"ISSUE_RESOLVED": "🛠",
|
|
"TEST_NOTIFICATION": "🔧",
|
|
}.get(nt, "📣")
|
|
|
|
label = nt.replace("_", " ").title() or "Notification"
|
|
text_parts = [f"{emoji} {label} — {subject}"]
|
|
if media_type and media_type != "?":
|
|
text_parts.append(f"({media_type})")
|
|
if requester:
|
|
text_parts.append(f"— requested by {requester}")
|
|
text = " ".join(text_parts)
|
|
if message:
|
|
text += f"\n\n{message}"
|
|
|
|
html = (
|
|
f"{emoji} <strong>{label}</strong> — <em>{subject}</em>"
|
|
+ (f" ({media_type})" if media_type != '?' else "")
|
|
+ (f" — requested by <strong>{requester}</strong>" if requester else "")
|
|
+ (f"<br/><br/>{message}" if message else "")
|
|
)
|
|
return text, html
|
|
|
|
# ---------- Subscriptions ----------
|
|
|
|
@media.subcommand("subscribe", aliases=["sub"],
|
|
help="Subscribe to a Sonarr series — pings you when new episodes land")
|
|
@command.argument("query", pass_raw=True, required=True)
|
|
async def cmd_subscribe(self, evt: MessageEvent, query: str) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
try:
|
|
series = await self.sonarr.list_series()
|
|
except ArrError as ex:
|
|
await self._say(evt, f"Sonarr lookup failed: {ex}")
|
|
return
|
|
q = query.lower().strip()
|
|
matches = [s for s in series if q in (s.get("title") or "").lower()]
|
|
if not matches:
|
|
await self._say(evt, f"No Sonarr series matches **{query}**. Add it via Sonarr first.")
|
|
return
|
|
if len(matches) > 1:
|
|
titles = ", ".join(f"*{s['title']}*" for s in matches[:5])
|
|
await self._say(evt, f"Multiple matches: {titles}. Be more specific.")
|
|
return
|
|
s = matches[0]
|
|
try:
|
|
await self.database.execute(
|
|
"INSERT OR REPLACE INTO subscriptions (mxid, sonarr_series_id, title) "
|
|
"VALUES ($1, $2, $3)",
|
|
evt.sender, s["id"], s["title"],
|
|
)
|
|
except Exception as ex:
|
|
await self._say(evt, f"Subscribe failed: {ex}")
|
|
return
|
|
await self._say(evt, f"Subscribed to **{s['title']}**. You'll be pinged on new episode imports.")
|
|
|
|
@media.subcommand("unsubscribe", aliases=["unsub"], help="Unsubscribe from a series")
|
|
@command.argument("query", pass_raw=True, required=True)
|
|
async def cmd_unsubscribe(self, evt: MessageEvent, query: str) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
rows = await self.database.fetch(
|
|
"SELECT sonarr_series_id, title FROM subscriptions WHERE mxid = $1",
|
|
evt.sender,
|
|
)
|
|
q = query.lower().strip()
|
|
matches = [r for r in rows if q in (r["title"] or "").lower()]
|
|
if not matches:
|
|
await self._say(evt, f"You aren't subscribed to anything matching **{query}**.")
|
|
return
|
|
if len(matches) > 1:
|
|
titles = ", ".join(f"*{r['title']}*" for r in matches[:5])
|
|
await self._say(evt, f"Multiple matches: {titles}. Be more specific.")
|
|
return
|
|
m = matches[0]
|
|
await self.database.execute(
|
|
"DELETE FROM subscriptions WHERE mxid = $1 AND sonarr_series_id = $2",
|
|
evt.sender, m["sonarr_series_id"],
|
|
)
|
|
await self._say(evt, f"Unsubscribed from **{m['title']}**.")
|
|
|
|
@media.subcommand("subscriptions", aliases=["subs"], help="List your subscriptions")
|
|
async def cmd_subscriptions(self, evt: MessageEvent) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
rows = await self.database.fetch(
|
|
"SELECT title, added_at FROM subscriptions WHERE mxid = $1 ORDER BY title",
|
|
evt.sender,
|
|
)
|
|
if not rows:
|
|
await self._say(evt, "No subscriptions yet — `!media subscribe <show>` to add one.")
|
|
return
|
|
lines = [f"**Your subscriptions ({len(rows)}):**"]
|
|
for r in rows:
|
|
lines.append(f"- *{r['title']}*")
|
|
await self._say(evt, "\n".join(lines))
|
|
|
|
# ---------- Sonarr webhook ----------
|
|
|
|
@web.post("/sonarr-webhook")
|
|
async def sonarr_webhook(self, req: Request) -> Response:
|
|
secret = self.config["sonarr_webhook_secret"] or ""
|
|
if secret:
|
|
auth = (req.headers.get("Authorization") or "").strip()
|
|
scheme, _, token = auth.partition(" ")
|
|
if scheme.lower() != "bearer" or token != secret:
|
|
self.log.warning("Sonarr webhook bad auth from %s", req.remote)
|
|
return Response(status=401, text="unauthorized")
|
|
try:
|
|
payload = await req.json()
|
|
except Exception:
|
|
return Response(status=400, text="invalid json")
|
|
|
|
event_type = (payload.get("eventType") or "").lower()
|
|
# Fire on Download (import) and Upgrade events; ignore Grab + Test
|
|
if event_type == "test":
|
|
return json_response({"ok": True, "test": True})
|
|
if event_type not in ("download",):
|
|
return json_response({"ok": True, "ignored": event_type})
|
|
|
|
series = payload.get("series") or {}
|
|
series_id = series.get("id")
|
|
title = series.get("title") or "?"
|
|
eps = payload.get("episodes") or []
|
|
if not series_id:
|
|
return json_response({"ok": False, "error": "no series.id"})
|
|
|
|
subs = await self.database.fetch(
|
|
"SELECT mxid FROM subscriptions WHERE sonarr_series_id = $1",
|
|
series_id,
|
|
)
|
|
if not subs:
|
|
return json_response({"ok": True, "subscribers": 0})
|
|
|
|
room = (self.config["notifications_room"] or "").strip()
|
|
if not room:
|
|
return json_response({"ok": False, "error": "notifications_room not configured"})
|
|
|
|
ep_tags = []
|
|
for ep in eps:
|
|
sn = ep.get("seasonNumber")
|
|
en = ep.get("episodeNumber")
|
|
if sn is not None and en is not None:
|
|
ep_tags.append(f"S{sn:02d}E{en:02d}")
|
|
ep_str = " ".join(ep_tags) if ep_tags else ""
|
|
upgrade = " (upgrade)" if payload.get("isUpgrade") else ""
|
|
|
|
mention_html = ", ".join(
|
|
f'<a href="https://matrix.to/#/{r["mxid"]}">{r["mxid"]}</a>' for r in subs
|
|
)
|
|
mention_plain = ", ".join(r["mxid"] for r in subs)
|
|
text = f"📺 New: {title} {ep_str}{upgrade} — {mention_plain}"
|
|
html = f"📺 New: <strong>{title}</strong> {ep_str}{upgrade} — {mention_html}"
|
|
try:
|
|
content = TextMessageEventContent(
|
|
msgtype=MessageType.NOTICE,
|
|
body=text,
|
|
format=Format.HTML,
|
|
formatted_body=html,
|
|
)
|
|
await self.client.send_message(RoomID(room), content)
|
|
except Exception as ex:
|
|
self.log.exception("Sonarr webhook send failed")
|
|
return json_response({"ok": False, "error": str(ex)}, status=500)
|
|
return json_response({"ok": True, "subscribers": len(subs)})
|
|
|
|
# ---------- Daily digest ----------
|
|
|
|
@media.subcommand("digest", help="Manually fire today's digest")
|
|
async def cmd_digest(self, evt: MessageEvent) -> None:
|
|
await evt.mark_read()
|
|
if await self._reject_unmapped(evt):
|
|
return
|
|
room = (self.config["notifications_room"] or "").strip()
|
|
if not room:
|
|
await self._say(evt, "Set `notifications_room` in config first.")
|
|
return
|
|
try:
|
|
await self._send_digest(room)
|
|
await self._say(evt, "Digest sent ✅")
|
|
except Exception as ex:
|
|
self.log.exception("digest failed")
|
|
await self._say(evt, f"Digest failed: {ex}")
|
|
|
|
async def _digest_loop(self) -> None:
|
|
"""Sleep until digest_hour each day, then send."""
|
|
try:
|
|
while True:
|
|
hour = int(self.config["digest_hour"] or 8)
|
|
now = _local_now()
|
|
target = now.replace(hour=hour, minute=0, second=0, microsecond=0)
|
|
if target <= now:
|
|
target += timedelta(days=1)
|
|
wait_s = (target - now).total_seconds()
|
|
self.log.info("Digest scheduled in %.0fs (next: %s)", wait_s, target.isoformat())
|
|
await asyncio.sleep(wait_s)
|
|
|
|
today = date.today().isoformat()
|
|
row = await self.database.fetchrow("SELECT last_run_date FROM digest_state WHERE id = 1")
|
|
if row and row["last_run_date"] == today:
|
|
continue # already ran today
|
|
|
|
room = (self.config["notifications_room"] or "").strip()
|
|
if room:
|
|
try:
|
|
await self._send_digest(room)
|
|
await self.database.execute(
|
|
"UPDATE digest_state SET last_run_date = $1 WHERE id = 1", today
|
|
)
|
|
except Exception:
|
|
self.log.exception("digest send failed; will retry tomorrow")
|
|
except asyncio.CancelledError:
|
|
self.log.info("digest loop cancelled")
|
|
raise
|
|
|
|
async def _send_digest(self, room: str) -> None:
|
|
"""Build and post the daily digest to `room`."""
|
|
cutoff = datetime.now(timezone.utc).timestamp() - 86400
|
|
emby_uid = self._any_emby_uid("")
|
|
added: list[dict] = []
|
|
if emby_uid:
|
|
try:
|
|
added = await self.emby.recently_added(emby_uid, limit=20)
|
|
except EmbyError:
|
|
added = []
|
|
nzb_hist, qbt_all, sonarr_q, radarr_q = await asyncio.gather(
|
|
self.nzbget.history(), self.qbt.all_torrents(),
|
|
self.sonarr.queue(), self.radarr.queue(),
|
|
return_exceptions=True,
|
|
)
|
|
|
|
nzb_recent = []
|
|
if not isinstance(nzb_hist, Exception):
|
|
nzb_recent = [h for h in nzb_hist if (h.get("HistoryTime") or 0) >= cutoff]
|
|
qbt_recent = []
|
|
if not isinstance(qbt_all, Exception):
|
|
qbt_recent = [t for t in qbt_all
|
|
if (t.get("completion_on") or 0) >= cutoff
|
|
and (t.get("completion_on") or 0) > 0]
|
|
|
|
s_q = sonarr_q if not isinstance(sonarr_q, Exception) else []
|
|
r_q = radarr_q if not isinstance(radarr_q, Exception) else []
|
|
|
|
local = _local_now()
|
|
is_sunday = local.weekday() == 6 # Mon=0..Sun=6
|
|
today_str = local.strftime("%A, %b %d")
|
|
lines = [f"**📰 Daily digest — {today_str}**", ""]
|
|
lines.append(f"**🆕 Recently added ({len(added)}):**")
|
|
for it in added[:8]:
|
|
lines.append("- " + self._fmt_emby_item(it))
|
|
if not added:
|
|
lines.append("- (nothing in the last sweep)")
|
|
lines.append("")
|
|
total_completed = len(nzb_recent) + len(qbt_recent)
|
|
lines.append(f"**✅ Completed last 24h: {total_completed}** "
|
|
f"(NZBGet: {len(nzb_recent)} · qBt: {len(qbt_recent)})")
|
|
for h in nzb_recent[:5]:
|
|
name = h.get("Name") or h.get("NZBName") or "?"
|
|
size_mb = h.get("FileSizeMB") or 0
|
|
lines.append(f"- *{name}* — {size_mb / 1024:.1f} GB")
|
|
for t in qbt_recent[:5]:
|
|
lines.append(f"- *{t.get('name','?')}* — {_human_bytes(t.get('size') or 0)}")
|
|
lines.append("")
|
|
lines.append(f"**📥 Queued: {len(s_q)} TV · {len(r_q)} movies**")
|
|
|
|
if is_sunday:
|
|
cleaner_lines = await self._fetch_emby_cleaner_recap()
|
|
if cleaner_lines:
|
|
lines.append("")
|
|
lines.append("**🧹 Weekly cleanup (emby-cleaner):**")
|
|
lines.extend(cleaner_lines)
|
|
|
|
await self.client.send_message(
|
|
RoomID(room),
|
|
TextMessageEventContent(msgtype=MessageType.NOTICE, body="\n".join(lines)),
|
|
)
|
|
|
|
async def _fetch_emby_cleaner_recap(self) -> list[str]:
|
|
"""Pull the last 12h of messages from the emby-cleaner ntfy topic."""
|
|
ntfy = (self.config["ntfy_url"] or "").rstrip("/")
|
|
topic = self.config["emby_cleaner_topic"] or "emby-cleaner"
|
|
if not ntfy:
|
|
return []
|
|
url = f"{ntfy}/{topic}/json?poll=1&since=12h"
|
|
try:
|
|
async with self.session.get(url) as r:
|
|
if r.status >= 400:
|
|
self.log.info("ntfy fetch %s → %s", url, r.status)
|
|
return []
|
|
text = await r.text()
|
|
except Exception as ex:
|
|
self.log.warning("ntfy fetch failed: %s", ex)
|
|
return []
|
|
out: list[str] = []
|
|
for raw in text.splitlines():
|
|
raw = raw.strip()
|
|
if not raw:
|
|
continue
|
|
try:
|
|
msg = __import__("json").loads(raw)
|
|
except Exception:
|
|
continue
|
|
title = msg.get("title") or ""
|
|
body = (msg.get("message") or "").strip()
|
|
if title:
|
|
out.append(f"- *{title}*")
|
|
for line in body.splitlines()[:6]:
|
|
line = line.strip()
|
|
if line:
|
|
out.append(f" {line}")
|
|
return out[:25]
|