v0.5.1: defensive DB bootstrap + error-tolerant digest loop
The v0.4 schema migration silently failed on first deploy — Maubot's UpgradeTable hook didn't create the subscriptions / digest_state tables in the shared Postgres instance. When the digest fired at 8 AM, the SELECT against digest_state raised, the loop's only except was CancelledError, and the task crashed silently. No digest, no logs. Two fixes: - _ensure_db_schema() runs CREATE TABLE IF NOT EXISTS on start() so the bot self-heals if migrations don't run for any reason - digest loop's inner work is now wrapped — any exception sleeps an hour and retries instead of killing the whole schedule
This commit is contained in:
parent
b2c6b86546
commit
eb9fc07021
2 changed files with 70 additions and 19 deletions
|
|
@ -1,6 +1,6 @@
|
||||||
maubot: 0.3.1
|
maubot: 0.3.1
|
||||||
id: com.3ddbrewery.media
|
id: com.3ddbrewery.media
|
||||||
version: 0.5.0
|
version: 0.5.1
|
||||||
license: MIT
|
license: MIT
|
||||||
modules:
|
modules:
|
||||||
- media_bot
|
- media_bot
|
||||||
|
|
|
||||||
|
|
@ -171,6 +171,7 @@ class MediaBot(Plugin):
|
||||||
|
|
||||||
self._search_cache = {}
|
self._search_cache = {}
|
||||||
self._pending_requests = {}
|
self._pending_requests = {}
|
||||||
|
await self._ensure_db_schema()
|
||||||
self._digest_task = None
|
self._digest_task = None
|
||||||
if self.config["digest_enabled"] and (self.config["notifications_room"] or "").strip():
|
if self.config["digest_enabled"] and (self.config["notifications_room"] or "").strip():
|
||||||
self._digest_task = asyncio.create_task(self._digest_loop())
|
self._digest_task = asyncio.create_task(self._digest_loop())
|
||||||
|
|
@ -1340,36 +1341,86 @@ class MediaBot(Plugin):
|
||||||
await self._say(evt, f"Digest failed: {ex}")
|
await self._say(evt, f"Digest failed: {ex}")
|
||||||
|
|
||||||
async def _digest_loop(self) -> None:
|
async def _digest_loop(self) -> None:
|
||||||
"""Sleep until digest_hour each day, then send."""
|
"""Sleep until digest_hour each day, then send. Survives any error
|
||||||
|
except CancelledError so a single failure doesn't kill the schedule."""
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
hour = int(self.config["digest_hour"] or 8)
|
try:
|
||||||
now = _local_now()
|
hour = int(self.config["digest_hour"] or 8)
|
||||||
target = now.replace(hour=hour, minute=0, second=0, microsecond=0)
|
now = _local_now()
|
||||||
if target <= now:
|
target = now.replace(hour=hour, minute=0, second=0, microsecond=0)
|
||||||
target += timedelta(days=1)
|
if target <= now:
|
||||||
wait_s = (target - now).total_seconds()
|
target += timedelta(days=1)
|
||||||
self.log.info("Digest scheduled in %.0fs (next: %s)", wait_s, target.isoformat())
|
wait_s = (target - now).total_seconds()
|
||||||
await asyncio.sleep(wait_s)
|
self.log.info("Digest scheduled in %.0fs (next: %s)", wait_s, target.isoformat())
|
||||||
|
await asyncio.sleep(wait_s)
|
||||||
|
|
||||||
today = date.today().isoformat()
|
today = date.today().isoformat()
|
||||||
row = await self.database.fetchrow("SELECT last_run_date FROM digest_state WHERE id = 1")
|
try:
|
||||||
if row and row["last_run_date"] == today:
|
row = await self.database.fetchrow(
|
||||||
continue # already ran today
|
"SELECT last_run_date FROM digest_state WHERE id = 1"
|
||||||
|
)
|
||||||
room = (self.config["notifications_room"] or "").strip()
|
except Exception:
|
||||||
if room:
|
self.log.exception("digest_state read failed; firing anyway")
|
||||||
|
row = None
|
||||||
|
if row and row["last_run_date"] == today:
|
||||||
|
continue
|
||||||
|
|
||||||
|
room = (self.config["notifications_room"] or "").strip()
|
||||||
|
if not room:
|
||||||
|
continue
|
||||||
|
await self._send_digest(room)
|
||||||
try:
|
try:
|
||||||
await self._send_digest(room)
|
|
||||||
await self.database.execute(
|
await self.database.execute(
|
||||||
"UPDATE digest_state SET last_run_date = $1 WHERE id = 1", today
|
"UPDATE digest_state SET last_run_date = $1 WHERE id = 1", today
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
self.log.exception("digest send failed; will retry tomorrow")
|
self.log.exception("digest_state write failed (idempotency lost)")
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
except Exception:
|
||||||
|
self.log.exception("digest loop iteration failed; sleeping 1h then retrying")
|
||||||
|
await asyncio.sleep(3600)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
self.log.info("digest loop cancelled")
|
self.log.info("digest loop cancelled")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
async def _ensure_db_schema(self) -> None:
|
||||||
|
"""Defensive: confirm subscriptions + digest_state exist on startup.
|
||||||
|
If maubot's UpgradeTable migration didn't run for some reason (it
|
||||||
|
silently failed to create tables once — see v0.4 history), this
|
||||||
|
creates them with IF NOT EXISTS so the bot self-heals."""
|
||||||
|
if not getattr(self, "database", None):
|
||||||
|
self.log.warning("self.database is None — !media subscribe/digest won't persist")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
await self.database.execute(
|
||||||
|
"""
|
||||||
|
CREATE TABLE IF NOT EXISTS subscriptions (
|
||||||
|
mxid TEXT NOT NULL,
|
||||||
|
sonarr_series_id INTEGER NOT NULL,
|
||||||
|
title TEXT NOT NULL,
|
||||||
|
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
PRIMARY KEY (mxid, sonarr_series_id)
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
await self.database.execute(
|
||||||
|
"""
|
||||||
|
CREATE TABLE IF NOT EXISTS digest_state (
|
||||||
|
id INTEGER PRIMARY KEY CHECK (id = 1),
|
||||||
|
last_run_date TEXT
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
await self.database.execute(
|
||||||
|
"INSERT INTO digest_state (id, last_run_date) VALUES (1, NULL) "
|
||||||
|
"ON CONFLICT (id) DO NOTHING"
|
||||||
|
)
|
||||||
|
self.log.info("DB schema verified (subscriptions + digest_state)")
|
||||||
|
except Exception:
|
||||||
|
self.log.exception("DB schema bootstrap failed")
|
||||||
|
|
||||||
async def _send_digest(self, room: str) -> None:
|
async def _send_digest(self, room: str) -> None:
|
||||||
"""Build and post the daily digest to `room`."""
|
"""Build and post the daily digest to `room`."""
|
||||||
cutoff = datetime.now(timezone.utc).timestamp() - 86400
|
cutoff = datetime.now(timezone.utc).timestamp() - 86400
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue