diff --git a/apps/bot/src/index.ts b/apps/bot/src/index.ts index bfde14e..79c4c56 100644 --- a/apps/bot/src/index.ts +++ b/apps/bot/src/index.ts @@ -9,6 +9,7 @@ import { registerDefaultHandlers, } from "./ipc/command-consumer.js"; import { sweepStalePendingAccounts } from "./ipc/pair-handler.js"; +import { sweepStalePendingRuns } from "./scheduler/sweep-stale-runs.js"; async function main(): Promise { logger.info("bot starting"); @@ -22,6 +23,7 @@ async function main(): Promise { const stopConsumer = await startCommandConsumer(); await sweepStalePendingAccounts(); + await sweepStalePendingRuns(); await sessionManager.resumeFromDb(); const shutdown = async (signal: string): Promise => { diff --git a/apps/bot/src/scheduler/sweep-stale-runs.test.ts b/apps/bot/src/scheduler/sweep-stale-runs.test.ts new file mode 100644 index 0000000..ae35871 --- /dev/null +++ b/apps/bot/src/scheduler/sweep-stale-runs.test.ts @@ -0,0 +1,76 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +/** + * Corner case under test: fire-reminder writes the run row with + * status='pending' UP FRONT. If the bot is killed before it flips to + * a terminal status, the row sits at 'pending' indefinitely — pg-boss + * won't retry (the job already ran). Activity surfaces, the dashboard + * counters, and the paused-banner all read the row at face value, so + * the operator sees a "stuck" run that never moves. + * + * sweepStalePendingRuns recovers from this on bot startup. + */ + +// db.execute fan-out: build a list of {sql, return} pairs the test +// can assert on, and replay them in order. Ordering matters because +// the implementation does TWO updates (runs first, then targets) and +// the second one must only run if the first returned anything. +const executeMock = vi.fn(); + +vi.mock("../db.js", () => ({ + db: { + execute: (...a: unknown[]) => executeMock(...a), + }, +})); + +import { sweepStalePendingRuns } from "./sweep-stale-runs.js"; + +beforeEach(() => { + executeMock.mockReset(); +}); + +describe("sweepStalePendingRuns", () => { + it("returns 0 when no stale rows exist (skips the second UPDATE)", async () => { + executeMock.mockResolvedValueOnce({ rows: [] }); + const r = await sweepStalePendingRuns(); + expect(r).toEqual({ runs: 0, targets: 0 }); + // Only the first UPDATE (runs) runs; no second UPDATE for targets. + expect(executeMock).toHaveBeenCalledTimes(1); + }); + + it("fires both UPDATEs when there ARE stale rows", async () => { + executeMock + .mockResolvedValueOnce({ rows: [{ id: "run-A" }, { id: "run-B" }] }) + .mockResolvedValueOnce({ rows: [{ id: "t-1" }, { id: "t-2" }, { id: "t-3" }] }); + + const r = await sweepStalePendingRuns(); + + expect(r).toEqual({ runs: 2, targets: 3 }); + expect(executeMock).toHaveBeenCalledTimes(2); + }); + + it("returns the actual swept counts so the caller can log them", async () => { + executeMock + .mockResolvedValueOnce({ + rows: [{ id: "run-A" }, { id: "run-B" }, { id: "run-C" }], + }) + .mockResolvedValueOnce({ rows: Array.from({ length: 17 }, (_, i) => ({ id: `t-${i}` })) }); + + const r = await sweepStalePendingRuns(); + + expect(r.runs).toBe(3); + expect(r.targets).toBe(17); + }); + + it("doesn't throw when the targets UPDATE returns no rows (run with no pending targets)", async () => { + // A stale run with zero pending targets is unusual but possible — + // the run row got the up-front insert but the per-target inserts + // never ran. Still a stale run, still gets cleared. + executeMock + .mockResolvedValueOnce({ rows: [{ id: "run-D" }] }) + .mockResolvedValueOnce({ rows: [] }); + + const r = await sweepStalePendingRuns(); + expect(r).toEqual({ runs: 1, targets: 0 }); + }); +}); diff --git a/apps/bot/src/scheduler/sweep-stale-runs.ts b/apps/bot/src/scheduler/sweep-stale-runs.ts new file mode 100644 index 0000000..f7484f0 --- /dev/null +++ b/apps/bot/src/scheduler/sweep-stale-runs.ts @@ -0,0 +1,64 @@ +import { sql } from "drizzle-orm"; +import { db } from "../db.js"; +import { logger } from "../logger.js"; + +/** + * Recover from "bot crashed / restarted mid-run" crashes. + * + * fire-reminder writes the run row with status='pending' UP FRONT so + * the Activity tab can show progress mid-run, then flips to a + * terminal status (success/partial/failed/paused/skipped) once it's + * done. If the bot dies between those two writes, the row sits at + * 'pending' forever — pg-boss already marked the job 'completed', so + * it won't retry. + * + * This sweep runs at bot startup. It finds any 'pending' run older + * than `maxAgeMs` (default 5 minutes — enough slack that a real + * mid-run rebalance to another worker isn't accidentally killed) and: + * + * • Flips the run to 'failed' with a clear error_summary so the UI + * stops showing it as in-flight. + * • Flips its pending run_target rows to 'skipped' with the same + * reason so per-group counts make sense. + * + * Does NOT touch the parent reminder's lifecycle status — the row was + * 'active' when the run started and stays that way; the next + * occurrence (cron) or operator action will fire a fresh run. + */ +export async function sweepStalePendingRuns( + maxAgeMs: number = 5 * 60 * 1000, +): Promise<{ runs: number; targets: number }> { + const cutoffMs = Date.now() - maxAgeMs; + const cutoff = new Date(cutoffMs); + + const runs = await db.execute(sql` + UPDATE reminder_runs + SET status = 'failed', + error_summary = 'Bot restarted before this run completed.' + WHERE status = 'pending' + AND fired_at < ${cutoff} + RETURNING id + `); + const runRows = runs.rows as Array<{ id: string }>; + if (runRows.length === 0) { + logger.info("sweep-stale-runs: no stale pending runs"); + return { runs: 0, targets: 0 }; + } + + const ids = runRows.map((r) => r.id); + const targets = await db.execute(sql` + UPDATE reminder_run_targets + SET status = 'skipped', + error = 'bot restarted before this group could be sent' + WHERE status = 'pending' + AND run_id IN (${sql.join(ids.map((id) => sql`${id}`), sql`, `)}) + RETURNING id + `); + const targetCount = (targets.rows as Array).length; + + logger.warn( + { runs: runRows.length, targets: targetCount, cutoff: cutoff.toISOString() }, + "sweep-stale-runs: cleared stale pending runs", + ); + return { runs: runRows.length, targets: targetCount }; +}