v0.4.2: SQLite, subscriptions, Sonarr webhook, daily digest

- Enable maubot's bundled SQLite (database: true, webapp: true)
- Schema: subscriptions(mxid, sonarr_series_id, title, added_at) +
  digest_state for once-daily idempotency
- Commands: !media subscribe / unsubscribe / subscriptions / digest
- @web.post(/sonarr-webhook): on Download events, mention subscribers
  in notifications_room (Bearer auth via sonarr_webhook_secret)
- Daily digest loop: fires at digest_hour (Indianapolis), summarises
  Emby recently-added, NZBGet+qBt 24h completions, queue depth.
  Approximate EST/EDT calc since maubot container ships without tzdata.
This commit is contained in:
Maddox 2026-04-28 19:13:19 -04:00
parent c0417856a2
commit 7848d8e6ea
5 changed files with 336 additions and 2 deletions

View file

@ -18,6 +18,16 @@ notifications_room: ""
# Generate with `openssl rand -hex 32`. Empty = no auth check (NOT recommended). # Generate with `openssl rand -hex 32`. Empty = no auth check (NOT recommended).
seerr_webhook_secret: "" seerr_webhook_secret: ""
# Sonarr → Matrix subscription notifications.
# Configure Sonarr → Settings → Connect → Webhook with URL
# https://matrix.fails.me/_matrix/maubot/plugin/media/sonarr-webhook
# and Headers: `Authorization: Bearer <secret>`. Triggers on Download events.
sonarr_webhook_secret: ""
# Daily digest — fires once a day in `notifications_room`.
digest_enabled: true
digest_hour: 8 # local hour (Indianapolis), 0-23
# --- Service endpoints --- # --- Service endpoints ---
seerr: seerr:

View file

@ -1,6 +1,6 @@
maubot: 0.3.1 maubot: 0.3.1
id: com.3ddbrewery.media id: com.3ddbrewery.media
version: 0.3.2 version: 0.4.2
license: MIT license: MIT
modules: modules:
- media_bot - media_bot
@ -8,5 +8,5 @@ main_class: MediaBot
config: true config: true
extra_files: extra_files:
- base-config.yaml - base-config.yaml
database: false database: true
webapp: true webapp: true

View file

@ -29,12 +29,34 @@ from mautrix.types import (
RoomID, RoomID,
TextMessageEventContent, TextMessageEventContent,
) )
from mautrix.util.async_db import UpgradeTable
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
from .clients.arr import ArrError, RadarrClient, SonarrClient from .clients.arr import ArrError, RadarrClient, SonarrClient
from .clients.downloads import DownloadError, NzbgetClient, QbtClient from .clients.downloads import DownloadError, NzbgetClient, QbtClient
from .clients.emby import EmbyClient, EmbyError from .clients.emby import EmbyClient, EmbyError
from .clients.seerr import SeerrClient, SeerrError from .clients.seerr import SeerrClient, SeerrError
from .db import upgrade_table
def _is_us_dst(dt: datetime) -> bool:
"""Approximate US DST: second Sunday of March → first Sunday of November.
Used because the maubot container ships without tzdata for ZoneInfo.
"""
y = dt.year
march1 = datetime(y, 3, 1)
second_sunday = march1 + timedelta(days=((6 - march1.weekday()) % 7) + 7)
nov1 = datetime(y, 11, 1)
first_sunday = nov1 + timedelta(days=(6 - nov1.weekday()) % 7)
naive = dt.replace(tzinfo=None) if dt.tzinfo else dt
return second_sunday <= naive < first_sunday
def _local_now() -> datetime:
"""Approximate Indianapolis local time (Eastern, with DST). Naive."""
utc = datetime.now(timezone.utc)
offset = -4 if _is_us_dst(utc) else -5
return (utc + timedelta(hours=offset)).replace(tzinfo=None)
# --- Seerr availability codes ---------------------------------------------- # --- Seerr availability codes ----------------------------------------------
@ -77,6 +99,9 @@ class Config(BaseProxyConfig):
helper.copy("admin_users") helper.copy("admin_users")
helper.copy("notifications_room") helper.copy("notifications_room")
helper.copy("seerr_webhook_secret") helper.copy("seerr_webhook_secret")
helper.copy("sonarr_webhook_secret")
helper.copy("digest_enabled")
helper.copy("digest_hour")
helper.copy("seerr") helper.copy("seerr")
helper.copy("sonarr") helper.copy("sonarr")
helper.copy("radarr") helper.copy("radarr")
@ -115,6 +140,13 @@ class MediaBot(Plugin):
# approve/decline. Capped at 200; oldest entries are evicted on overflow. # approve/decline. Capped at 200; oldest entries are evicted on overflow.
_pending_requests: dict[str, int] _pending_requests: dict[str, int]
# Background task running the daily digest scheduler.
_digest_task: Optional[asyncio.Task]
@classmethod
def get_db_upgrade_table(cls) -> UpgradeTable:
return upgrade_table
async def start(self) -> None: async def start(self) -> None:
self.config.load_and_update() self.config.load_and_update()
timeout = aiohttp.ClientTimeout(total=self.config["http_timeout"]) timeout = aiohttp.ClientTimeout(total=self.config["http_timeout"])
@ -133,9 +165,14 @@ class MediaBot(Plugin):
self._search_cache = {} self._search_cache = {}
self._pending_requests = {} self._pending_requests = {}
self._digest_task = None
if self.config["digest_enabled"] and (self.config["notifications_room"] or "").strip():
self._digest_task = asyncio.create_task(self._digest_loop())
self.log.info("Media bot started — users=%d", len(self.config["user_map"] or {})) self.log.info("Media bot started — users=%d", len(self.config["user_map"] or {}))
async def stop(self) -> None: async def stop(self) -> None:
if self._digest_task and not self._digest_task.done():
self._digest_task.cancel()
if hasattr(self, "session"): if hasattr(self, "session"):
await self.session.close() await self.session.close()
@ -235,6 +272,11 @@ class MediaBot(Plugin):
"- `!media completed` — finished in the last 24h\n" "- `!media completed` — finished in the last 24h\n"
"- `!media speed` — aggregate down/up speed\n" "- `!media speed` — aggregate down/up speed\n"
"- `!media pause` / `!media unpause` — global pause/resume\n\n" "- `!media pause` / `!media unpause` — global pause/resume\n\n"
"*Subscriptions*\n"
"- `!media subscribe <show>` — ping me when new episodes import\n"
"- `!media unsubscribe <show>` — stop pings\n"
"- `!media subscriptions` — list yours\n"
"- `!media digest` — fire today's digest now (admin/test)\n\n"
"_Admins can react 👍/👎 on a request to approve/decline._" "_Admins can react 👍/👎 on a request to approve/decline._"
) )
await self._say(evt, msg) await self._say(evt, msg)
@ -991,3 +1033,251 @@ class MediaBot(Plugin):
+ (f"<br/><br/>{message}" if message else "") + (f"<br/><br/>{message}" if message else "")
) )
return text, html return text, html
# ---------- Subscriptions ----------
@media.subcommand("subscribe", aliases=["sub"],
help="Subscribe to a Sonarr series — pings you when new episodes land")
@command.argument("query", pass_raw=True, required=True)
async def cmd_subscribe(self, evt: MessageEvent, query: str) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
try:
series = await self.sonarr.list_series()
except ArrError as ex:
await self._say(evt, f"Sonarr lookup failed: {ex}")
return
q = query.lower().strip()
matches = [s for s in series if q in (s.get("title") or "").lower()]
if not matches:
await self._say(evt, f"No Sonarr series matches **{query}**. Add it via Sonarr first.")
return
if len(matches) > 1:
titles = ", ".join(f"*{s['title']}*" for s in matches[:5])
await self._say(evt, f"Multiple matches: {titles}. Be more specific.")
return
s = matches[0]
try:
await self.database.execute(
"INSERT OR REPLACE INTO subscriptions (mxid, sonarr_series_id, title) "
"VALUES ($1, $2, $3)",
evt.sender, s["id"], s["title"],
)
except Exception as ex:
await self._say(evt, f"Subscribe failed: {ex}")
return
await self._say(evt, f"Subscribed to **{s['title']}**. You'll be pinged on new episode imports.")
@media.subcommand("unsubscribe", aliases=["unsub"], help="Unsubscribe from a series")
@command.argument("query", pass_raw=True, required=True)
async def cmd_unsubscribe(self, evt: MessageEvent, query: str) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
rows = await self.database.fetch(
"SELECT sonarr_series_id, title FROM subscriptions WHERE mxid = $1",
evt.sender,
)
q = query.lower().strip()
matches = [r for r in rows if q in (r["title"] or "").lower()]
if not matches:
await self._say(evt, f"You aren't subscribed to anything matching **{query}**.")
return
if len(matches) > 1:
titles = ", ".join(f"*{r['title']}*" for r in matches[:5])
await self._say(evt, f"Multiple matches: {titles}. Be more specific.")
return
m = matches[0]
await self.database.execute(
"DELETE FROM subscriptions WHERE mxid = $1 AND sonarr_series_id = $2",
evt.sender, m["sonarr_series_id"],
)
await self._say(evt, f"Unsubscribed from **{m['title']}**.")
@media.subcommand("subscriptions", aliases=["subs"], help="List your subscriptions")
async def cmd_subscriptions(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
rows = await self.database.fetch(
"SELECT title, added_at FROM subscriptions WHERE mxid = $1 ORDER BY title",
evt.sender,
)
if not rows:
await self._say(evt, "No subscriptions yet — `!media subscribe <show>` to add one.")
return
lines = [f"**Your subscriptions ({len(rows)}):**"]
for r in rows:
lines.append(f"- *{r['title']}*")
await self._say(evt, "\n".join(lines))
# ---------- Sonarr webhook ----------
@web.post("/sonarr-webhook")
async def sonarr_webhook(self, req: Request) -> Response:
secret = self.config["sonarr_webhook_secret"] or ""
if secret:
auth = (req.headers.get("Authorization") or "").strip()
scheme, _, token = auth.partition(" ")
if scheme.lower() != "bearer" or token != secret:
self.log.warning("Sonarr webhook bad auth from %s", req.remote)
return Response(status=401, text="unauthorized")
try:
payload = await req.json()
except Exception:
return Response(status=400, text="invalid json")
event_type = (payload.get("eventType") or "").lower()
# Fire on Download (import) and Upgrade events; ignore Grab + Test
if event_type == "test":
return json_response({"ok": True, "test": True})
if event_type not in ("download",):
return json_response({"ok": True, "ignored": event_type})
series = payload.get("series") or {}
series_id = series.get("id")
title = series.get("title") or "?"
eps = payload.get("episodes") or []
if not series_id:
return json_response({"ok": False, "error": "no series.id"})
subs = await self.database.fetch(
"SELECT mxid FROM subscriptions WHERE sonarr_series_id = $1",
series_id,
)
if not subs:
return json_response({"ok": True, "subscribers": 0})
room = (self.config["notifications_room"] or "").strip()
if not room:
return json_response({"ok": False, "error": "notifications_room not configured"})
ep_tags = []
for ep in eps:
sn = ep.get("seasonNumber")
en = ep.get("episodeNumber")
if sn is not None and en is not None:
ep_tags.append(f"S{sn:02d}E{en:02d}")
ep_str = " ".join(ep_tags) if ep_tags else ""
upgrade = " (upgrade)" if payload.get("isUpgrade") else ""
mention_html = ", ".join(
f'<a href="https://matrix.to/#/{r["mxid"]}">{r["mxid"]}</a>' for r in subs
)
mention_plain = ", ".join(r["mxid"] for r in subs)
text = f"📺 New: {title} {ep_str}{upgrade}{mention_plain}"
html = f"📺 New: <strong>{title}</strong> {ep_str}{upgrade}{mention_html}"
try:
content = TextMessageEventContent(
msgtype=MessageType.NOTICE,
body=text,
format=Format.HTML,
formatted_body=html,
)
await self.client.send_message(RoomID(room), content)
except Exception as ex:
self.log.exception("Sonarr webhook send failed")
return json_response({"ok": False, "error": str(ex)}, status=500)
return json_response({"ok": True, "subscribers": len(subs)})
# ---------- Daily digest ----------
@media.subcommand("digest", help="Manually fire today's digest")
async def cmd_digest(self, evt: MessageEvent) -> None:
await evt.mark_read()
if await self._reject_unmapped(evt):
return
room = (self.config["notifications_room"] or "").strip()
if not room:
await self._say(evt, "Set `notifications_room` in config first.")
return
try:
await self._send_digest(room)
await self._say(evt, "Digest sent ✅")
except Exception as ex:
self.log.exception("digest failed")
await self._say(evt, f"Digest failed: {ex}")
async def _digest_loop(self) -> None:
"""Sleep until digest_hour each day, then send."""
try:
while True:
hour = int(self.config["digest_hour"] or 8)
now = _local_now()
target = now.replace(hour=hour, minute=0, second=0, microsecond=0)
if target <= now:
target += timedelta(days=1)
wait_s = (target - now).total_seconds()
self.log.info("Digest scheduled in %.0fs (next: %s)", wait_s, target.isoformat())
await asyncio.sleep(wait_s)
today = date.today().isoformat()
row = await self.database.fetchrow("SELECT last_run_date FROM digest_state WHERE id = 1")
if row and row["last_run_date"] == today:
continue # already ran today
room = (self.config["notifications_room"] or "").strip()
if room:
try:
await self._send_digest(room)
await self.database.execute(
"UPDATE digest_state SET last_run_date = $1 WHERE id = 1", today
)
except Exception:
self.log.exception("digest send failed; will retry tomorrow")
except asyncio.CancelledError:
self.log.info("digest loop cancelled")
raise
async def _send_digest(self, room: str) -> None:
"""Build and post the daily digest to `room`."""
cutoff = datetime.now(timezone.utc).timestamp() - 86400
emby_uid = self._any_emby_uid("")
added: list[dict] = []
if emby_uid:
try:
added = await self.emby.recently_added(emby_uid, limit=20)
except EmbyError:
added = []
nzb_hist, qbt_all, sonarr_q, radarr_q = await asyncio.gather(
self.nzbget.history(), self.qbt.all_torrents(),
self.sonarr.queue(), self.radarr.queue(),
return_exceptions=True,
)
nzb_recent = []
if not isinstance(nzb_hist, Exception):
nzb_recent = [h for h in nzb_hist if (h.get("HistoryTime") or 0) >= cutoff]
qbt_recent = []
if not isinstance(qbt_all, Exception):
qbt_recent = [t for t in qbt_all
if (t.get("completion_on") or 0) >= cutoff
and (t.get("completion_on") or 0) > 0]
s_q = sonarr_q if not isinstance(sonarr_q, Exception) else []
r_q = radarr_q if not isinstance(radarr_q, Exception) else []
today_str = _local_now().strftime("%A, %b %d")
lines = [f"**📰 Daily digest — {today_str}**", ""]
lines.append(f"**🆕 Recently added ({len(added)}):**")
for it in added[:8]:
lines.append("- " + self._fmt_emby_item(it))
if not added:
lines.append("- (nothing in the last sweep)")
lines.append("")
total_completed = len(nzb_recent) + len(qbt_recent)
lines.append(f"**✅ Completed last 24h: {total_completed}** "
f"(NZBGet: {len(nzb_recent)} · qBt: {len(qbt_recent)})")
for h in nzb_recent[:5]:
name = h.get("Name") or h.get("NZBName") or "?"
size_mb = h.get("FileSizeMB") or 0
lines.append(f"- *{name}* — {size_mb / 1024:.1f} GB")
for t in qbt_recent[:5]:
lines.append(f"- *{t.get('name','?')}* — {_human_bytes(t.get('size') or 0)}")
lines.append("")
lines.append(f"**📥 Queued: {len(s_q)} TV · {len(r_q)} movies**")
await self.client.send_message(
RoomID(room),
TextMessageEventContent(msgtype=MessageType.NOTICE, body="\n".join(lines)),
)

View file

@ -40,6 +40,11 @@ class ArrClient:
class SonarrClient(ArrClient): class SonarrClient(ArrClient):
async def list_series(self) -> list[dict]:
"""Full list of series Sonarr knows about (used for subscription lookup)."""
data = await self._get("/api/v3/series")
return data if isinstance(data, list) else (data or [])
async def calendar(self, start: str, end: str) -> list[dict]: async def calendar(self, start: str, end: str) -> list[dict]:
data = await self._get( data = await self._get(
"/api/v3/calendar", "/api/v3/calendar",

29
media_bot/db.py Normal file
View file

@ -0,0 +1,29 @@
"""SQLite schema for the media bot — subscriptions + digest state."""
from mautrix.util.async_db import Connection, UpgradeTable
upgrade_table = UpgradeTable()
@upgrade_table.register(description="Initial schema: subscriptions + digest_state")
async def upgrade_v1(conn: Connection) -> None:
await conn.execute(
"""
CREATE TABLE subscriptions (
mxid TEXT NOT NULL,
sonarr_series_id INTEGER NOT NULL,
title TEXT NOT NULL,
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (mxid, sonarr_series_id)
)
"""
)
await conn.execute(
"""
CREATE TABLE digest_state (
id INTEGER PRIMARY KEY CHECK (id = 1),
last_run_date TEXT
)
"""
)
await conn.execute("INSERT INTO digest_state (id, last_run_date) VALUES (1, NULL)")