diff --git a/base-config.yaml b/base-config.yaml index 31a774e..3da72f2 100644 --- a/base-config.yaml +++ b/base-config.yaml @@ -18,6 +18,16 @@ notifications_room: "" # Generate with `openssl rand -hex 32`. Empty = no auth check (NOT recommended). seerr_webhook_secret: "" +# Sonarr → Matrix subscription notifications. +# Configure Sonarr → Settings → Connect → Webhook with URL +# https://matrix.example.com/_matrix/maubot/plugin/media/sonarr-webhook +# and Headers: `Authorization: Bearer `. Triggers on Download events. +sonarr_webhook_secret: "" + +# Daily digest — fires once a day in `notifications_room`. +digest_enabled: true +digest_hour: 8 # local hour (Indianapolis), 0-23 + # --- Service endpoints --- seerr: diff --git a/maubot.yaml b/maubot.yaml index cc28362..932bf74 100644 --- a/maubot.yaml +++ b/maubot.yaml @@ -1,6 +1,6 @@ maubot: 0.3.1 id: com.3ddbrewery.media -version: 0.3.2 +version: 0.4.2 license: MIT modules: - media_bot @@ -8,5 +8,5 @@ main_class: MediaBot config: true extra_files: - base-config.yaml -database: false +database: true webapp: true diff --git a/media_bot/bot.py b/media_bot/bot.py index a5b7420..4ea1048 100644 --- a/media_bot/bot.py +++ b/media_bot/bot.py @@ -29,12 +29,34 @@ from mautrix.types import ( RoomID, TextMessageEventContent, ) +from mautrix.util.async_db import UpgradeTable from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper from .clients.arr import ArrError, RadarrClient, SonarrClient from .clients.downloads import DownloadError, NzbgetClient, QbtClient from .clients.emby import EmbyClient, EmbyError from .clients.seerr import SeerrClient, SeerrError +from .db import upgrade_table + + +def _is_us_dst(dt: datetime) -> bool: + """Approximate US DST: second Sunday of March → first Sunday of November. + Used because the maubot container ships without tzdata for ZoneInfo. + """ + y = dt.year + march1 = datetime(y, 3, 1) + second_sunday = march1 + timedelta(days=((6 - march1.weekday()) % 7) + 7) + nov1 = datetime(y, 11, 1) + first_sunday = nov1 + timedelta(days=(6 - nov1.weekday()) % 7) + naive = dt.replace(tzinfo=None) if dt.tzinfo else dt + return second_sunday <= naive < first_sunday + + +def _local_now() -> datetime: + """Approximate Indianapolis local time (Eastern, with DST). Naive.""" + utc = datetime.now(timezone.utc) + offset = -4 if _is_us_dst(utc) else -5 + return (utc + timedelta(hours=offset)).replace(tzinfo=None) # --- Seerr availability codes ---------------------------------------------- @@ -77,6 +99,9 @@ class Config(BaseProxyConfig): helper.copy("admin_users") helper.copy("notifications_room") helper.copy("seerr_webhook_secret") + helper.copy("sonarr_webhook_secret") + helper.copy("digest_enabled") + helper.copy("digest_hour") helper.copy("seerr") helper.copy("sonarr") helper.copy("radarr") @@ -115,6 +140,13 @@ class MediaBot(Plugin): # approve/decline. Capped at 200; oldest entries are evicted on overflow. _pending_requests: dict[str, int] + # Background task running the daily digest scheduler. + _digest_task: Optional[asyncio.Task] + + @classmethod + def get_db_upgrade_table(cls) -> UpgradeTable: + return upgrade_table + async def start(self) -> None: self.config.load_and_update() timeout = aiohttp.ClientTimeout(total=self.config["http_timeout"]) @@ -133,9 +165,14 @@ class MediaBot(Plugin): self._search_cache = {} self._pending_requests = {} + self._digest_task = None + if self.config["digest_enabled"] and (self.config["notifications_room"] or "").strip(): + self._digest_task = asyncio.create_task(self._digest_loop()) self.log.info("Media bot started — users=%d", len(self.config["user_map"] or {})) async def stop(self) -> None: + if self._digest_task and not self._digest_task.done(): + self._digest_task.cancel() if hasattr(self, "session"): await self.session.close() @@ -235,6 +272,11 @@ class MediaBot(Plugin): "- `!media completed` — finished in the last 24h\n" "- `!media speed` — aggregate down/up speed\n" "- `!media pause` / `!media unpause` — global pause/resume\n\n" + "*Subscriptions*\n" + "- `!media subscribe ` — ping me when new episodes import\n" + "- `!media unsubscribe ` — stop pings\n" + "- `!media subscriptions` — list yours\n" + "- `!media digest` — fire today's digest now (admin/test)\n\n" "_Admins can react 👍/👎 on a request to approve/decline._" ) await self._say(evt, msg) @@ -991,3 +1033,251 @@ class MediaBot(Plugin): + (f"

{message}" if message else "") ) return text, html + + # ---------- Subscriptions ---------- + + @media.subcommand("subscribe", aliases=["sub"], + help="Subscribe to a Sonarr series — pings you when new episodes land") + @command.argument("query", pass_raw=True, required=True) + async def cmd_subscribe(self, evt: MessageEvent, query: str) -> None: + await evt.mark_read() + if await self._reject_unmapped(evt): + return + try: + series = await self.sonarr.list_series() + except ArrError as ex: + await self._say(evt, f"Sonarr lookup failed: {ex}") + return + q = query.lower().strip() + matches = [s for s in series if q in (s.get("title") or "").lower()] + if not matches: + await self._say(evt, f"No Sonarr series matches **{query}**. Add it via Sonarr first.") + return + if len(matches) > 1: + titles = ", ".join(f"*{s['title']}*" for s in matches[:5]) + await self._say(evt, f"Multiple matches: {titles}. Be more specific.") + return + s = matches[0] + try: + await self.database.execute( + "INSERT OR REPLACE INTO subscriptions (mxid, sonarr_series_id, title) " + "VALUES ($1, $2, $3)", + evt.sender, s["id"], s["title"], + ) + except Exception as ex: + await self._say(evt, f"Subscribe failed: {ex}") + return + await self._say(evt, f"Subscribed to **{s['title']}**. You'll be pinged on new episode imports.") + + @media.subcommand("unsubscribe", aliases=["unsub"], help="Unsubscribe from a series") + @command.argument("query", pass_raw=True, required=True) + async def cmd_unsubscribe(self, evt: MessageEvent, query: str) -> None: + await evt.mark_read() + if await self._reject_unmapped(evt): + return + rows = await self.database.fetch( + "SELECT sonarr_series_id, title FROM subscriptions WHERE mxid = $1", + evt.sender, + ) + q = query.lower().strip() + matches = [r for r in rows if q in (r["title"] or "").lower()] + if not matches: + await self._say(evt, f"You aren't subscribed to anything matching **{query}**.") + return + if len(matches) > 1: + titles = ", ".join(f"*{r['title']}*" for r in matches[:5]) + await self._say(evt, f"Multiple matches: {titles}. Be more specific.") + return + m = matches[0] + await self.database.execute( + "DELETE FROM subscriptions WHERE mxid = $1 AND sonarr_series_id = $2", + evt.sender, m["sonarr_series_id"], + ) + await self._say(evt, f"Unsubscribed from **{m['title']}**.") + + @media.subcommand("subscriptions", aliases=["subs"], help="List your subscriptions") + async def cmd_subscriptions(self, evt: MessageEvent) -> None: + await evt.mark_read() + if await self._reject_unmapped(evt): + return + rows = await self.database.fetch( + "SELECT title, added_at FROM subscriptions WHERE mxid = $1 ORDER BY title", + evt.sender, + ) + if not rows: + await self._say(evt, "No subscriptions yet — `!media subscribe ` to add one.") + return + lines = [f"**Your subscriptions ({len(rows)}):**"] + for r in rows: + lines.append(f"- *{r['title']}*") + await self._say(evt, "\n".join(lines)) + + # ---------- Sonarr webhook ---------- + + @web.post("/sonarr-webhook") + async def sonarr_webhook(self, req: Request) -> Response: + secret = self.config["sonarr_webhook_secret"] or "" + if secret: + auth = (req.headers.get("Authorization") or "").strip() + scheme, _, token = auth.partition(" ") + if scheme.lower() != "bearer" or token != secret: + self.log.warning("Sonarr webhook bad auth from %s", req.remote) + return Response(status=401, text="unauthorized") + try: + payload = await req.json() + except Exception: + return Response(status=400, text="invalid json") + + event_type = (payload.get("eventType") or "").lower() + # Fire on Download (import) and Upgrade events; ignore Grab + Test + if event_type == "test": + return json_response({"ok": True, "test": True}) + if event_type not in ("download",): + return json_response({"ok": True, "ignored": event_type}) + + series = payload.get("series") or {} + series_id = series.get("id") + title = series.get("title") or "?" + eps = payload.get("episodes") or [] + if not series_id: + return json_response({"ok": False, "error": "no series.id"}) + + subs = await self.database.fetch( + "SELECT mxid FROM subscriptions WHERE sonarr_series_id = $1", + series_id, + ) + if not subs: + return json_response({"ok": True, "subscribers": 0}) + + room = (self.config["notifications_room"] or "").strip() + if not room: + return json_response({"ok": False, "error": "notifications_room not configured"}) + + ep_tags = [] + for ep in eps: + sn = ep.get("seasonNumber") + en = ep.get("episodeNumber") + if sn is not None and en is not None: + ep_tags.append(f"S{sn:02d}E{en:02d}") + ep_str = " ".join(ep_tags) if ep_tags else "" + upgrade = " (upgrade)" if payload.get("isUpgrade") else "" + + mention_html = ", ".join( + f'{r["mxid"]}' for r in subs + ) + mention_plain = ", ".join(r["mxid"] for r in subs) + text = f"📺 New: {title} {ep_str}{upgrade} — {mention_plain}" + html = f"📺 New: {title} {ep_str}{upgrade} — {mention_html}" + try: + content = TextMessageEventContent( + msgtype=MessageType.NOTICE, + body=text, + format=Format.HTML, + formatted_body=html, + ) + await self.client.send_message(RoomID(room), content) + except Exception as ex: + self.log.exception("Sonarr webhook send failed") + return json_response({"ok": False, "error": str(ex)}, status=500) + return json_response({"ok": True, "subscribers": len(subs)}) + + # ---------- Daily digest ---------- + + @media.subcommand("digest", help="Manually fire today's digest") + async def cmd_digest(self, evt: MessageEvent) -> None: + await evt.mark_read() + if await self._reject_unmapped(evt): + return + room = (self.config["notifications_room"] or "").strip() + if not room: + await self._say(evt, "Set `notifications_room` in config first.") + return + try: + await self._send_digest(room) + await self._say(evt, "Digest sent ✅") + except Exception as ex: + self.log.exception("digest failed") + await self._say(evt, f"Digest failed: {ex}") + + async def _digest_loop(self) -> None: + """Sleep until digest_hour each day, then send.""" + try: + while True: + hour = int(self.config["digest_hour"] or 8) + now = _local_now() + target = now.replace(hour=hour, minute=0, second=0, microsecond=0) + if target <= now: + target += timedelta(days=1) + wait_s = (target - now).total_seconds() + self.log.info("Digest scheduled in %.0fs (next: %s)", wait_s, target.isoformat()) + await asyncio.sleep(wait_s) + + today = date.today().isoformat() + row = await self.database.fetchrow("SELECT last_run_date FROM digest_state WHERE id = 1") + if row and row["last_run_date"] == today: + continue # already ran today + + room = (self.config["notifications_room"] or "").strip() + if room: + try: + await self._send_digest(room) + await self.database.execute( + "UPDATE digest_state SET last_run_date = $1 WHERE id = 1", today + ) + except Exception: + self.log.exception("digest send failed; will retry tomorrow") + except asyncio.CancelledError: + self.log.info("digest loop cancelled") + raise + + async def _send_digest(self, room: str) -> None: + """Build and post the daily digest to `room`.""" + cutoff = datetime.now(timezone.utc).timestamp() - 86400 + emby_uid = self._any_emby_uid("") + added: list[dict] = [] + if emby_uid: + try: + added = await self.emby.recently_added(emby_uid, limit=20) + except EmbyError: + added = [] + nzb_hist, qbt_all, sonarr_q, radarr_q = await asyncio.gather( + self.nzbget.history(), self.qbt.all_torrents(), + self.sonarr.queue(), self.radarr.queue(), + return_exceptions=True, + ) + + nzb_recent = [] + if not isinstance(nzb_hist, Exception): + nzb_recent = [h for h in nzb_hist if (h.get("HistoryTime") or 0) >= cutoff] + qbt_recent = [] + if not isinstance(qbt_all, Exception): + qbt_recent = [t for t in qbt_all + if (t.get("completion_on") or 0) >= cutoff + and (t.get("completion_on") or 0) > 0] + + s_q = sonarr_q if not isinstance(sonarr_q, Exception) else [] + r_q = radarr_q if not isinstance(radarr_q, Exception) else [] + + today_str = _local_now().strftime("%A, %b %d") + lines = [f"**📰 Daily digest — {today_str}**", ""] + lines.append(f"**🆕 Recently added ({len(added)}):**") + for it in added[:8]: + lines.append("- " + self._fmt_emby_item(it)) + if not added: + lines.append("- (nothing in the last sweep)") + lines.append("") + total_completed = len(nzb_recent) + len(qbt_recent) + lines.append(f"**✅ Completed last 24h: {total_completed}** " + f"(NZBGet: {len(nzb_recent)} · qBt: {len(qbt_recent)})") + for h in nzb_recent[:5]: + name = h.get("Name") or h.get("NZBName") or "?" + size_mb = h.get("FileSizeMB") or 0 + lines.append(f"- *{name}* — {size_mb / 1024:.1f} GB") + for t in qbt_recent[:5]: + lines.append(f"- *{t.get('name','?')}* — {_human_bytes(t.get('size') or 0)}") + lines.append("") + lines.append(f"**📥 Queued: {len(s_q)} TV · {len(r_q)} movies**") + await self.client.send_message( + RoomID(room), + TextMessageEventContent(msgtype=MessageType.NOTICE, body="\n".join(lines)), + ) diff --git a/media_bot/clients/arr.py b/media_bot/clients/arr.py index ccc25c4..91f6132 100644 --- a/media_bot/clients/arr.py +++ b/media_bot/clients/arr.py @@ -40,6 +40,11 @@ class ArrClient: class SonarrClient(ArrClient): + async def list_series(self) -> list[dict]: + """Full list of series Sonarr knows about (used for subscription lookup).""" + data = await self._get("/api/v3/series") + return data if isinstance(data, list) else (data or []) + async def calendar(self, start: str, end: str) -> list[dict]: data = await self._get( "/api/v3/calendar", diff --git a/media_bot/db.py b/media_bot/db.py new file mode 100644 index 0000000..055f0c4 --- /dev/null +++ b/media_bot/db.py @@ -0,0 +1,29 @@ +"""SQLite schema for the media bot — subscriptions + digest state.""" + +from mautrix.util.async_db import Connection, UpgradeTable + +upgrade_table = UpgradeTable() + + +@upgrade_table.register(description="Initial schema: subscriptions + digest_state") +async def upgrade_v1(conn: Connection) -> None: + await conn.execute( + """ + CREATE TABLE subscriptions ( + mxid TEXT NOT NULL, + sonarr_series_id INTEGER NOT NULL, + title TEXT NOT NULL, + added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (mxid, sonarr_series_id) + ) + """ + ) + await conn.execute( + """ + CREATE TABLE digest_state ( + id INTEGER PRIMARY KEY CHECK (id = 1), + last_run_date TEXT + ) + """ + ) + await conn.execute("INSERT INTO digest_state (id, last_run_date) VALUES (1, NULL)")