From 309020fa5dc78fa9d02d0bb47e93a2024375ac89 Mon Sep 17 00:00:00 2001 From: yiekheng Date: Sun, 10 May 2026 16:05:18 +0800 Subject: [PATCH] feat(bot): sweep stale 'pending' runs on startup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Corner case observed: fire-reminder writes the run row with status='pending' UP FRONT (so the Activity tab shows progress mid-run), then flips to a terminal status once it's done. If the bot is killed between those two writes — e.g. a redeploy or crash — the row sits at 'pending' forever. pg-boss already marked the job 'completed', so it won't retry. Activity surfaces and the dashboard counters then show a "stuck" run that never moves. sweepStalePendingRuns runs at bot startup, finds any 'pending' run older than 5 minutes, and: • Flips the run to 'failed' with a clear error_summary so the UI stops treating it as in-flight. • Flips its still-'pending' run_target rows to 'skipped' with the same reason so per-group counts remain coherent. The 5-minute floor is generous enough that an actual mid-run worker rebalance isn't accidentally killed. Tests: * 4 sweep tests covering: no-stale path skips the second UPDATE; with-stale path fires both UPDATEs; counts are forwarded; the edge case where a stale run has zero pending targets. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/bot/src/index.ts | 2 + .../src/scheduler/sweep-stale-runs.test.ts | 76 +++++++++++++++++++ apps/bot/src/scheduler/sweep-stale-runs.ts | 64 ++++++++++++++++ 3 files changed, 142 insertions(+) create mode 100644 apps/bot/src/scheduler/sweep-stale-runs.test.ts create mode 100644 apps/bot/src/scheduler/sweep-stale-runs.ts diff --git a/apps/bot/src/index.ts b/apps/bot/src/index.ts index bfde14e..79c4c56 100644 --- a/apps/bot/src/index.ts +++ b/apps/bot/src/index.ts @@ -9,6 +9,7 @@ import { registerDefaultHandlers, } from "./ipc/command-consumer.js"; import { sweepStalePendingAccounts } from "./ipc/pair-handler.js"; +import { sweepStalePendingRuns } from "./scheduler/sweep-stale-runs.js"; async function main(): Promise { logger.info("bot starting"); @@ -22,6 +23,7 @@ async function main(): Promise { const stopConsumer = await startCommandConsumer(); await sweepStalePendingAccounts(); + await sweepStalePendingRuns(); await sessionManager.resumeFromDb(); const shutdown = async (signal: string): Promise => { diff --git a/apps/bot/src/scheduler/sweep-stale-runs.test.ts b/apps/bot/src/scheduler/sweep-stale-runs.test.ts new file mode 100644 index 0000000..ae35871 --- /dev/null +++ b/apps/bot/src/scheduler/sweep-stale-runs.test.ts @@ -0,0 +1,76 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +/** + * Corner case under test: fire-reminder writes the run row with + * status='pending' UP FRONT. If the bot is killed before it flips to + * a terminal status, the row sits at 'pending' indefinitely — pg-boss + * won't retry (the job already ran). Activity surfaces, the dashboard + * counters, and the paused-banner all read the row at face value, so + * the operator sees a "stuck" run that never moves. + * + * sweepStalePendingRuns recovers from this on bot startup. + */ + +// db.execute fan-out: build a list of {sql, return} pairs the test +// can assert on, and replay them in order. Ordering matters because +// the implementation does TWO updates (runs first, then targets) and +// the second one must only run if the first returned anything. +const executeMock = vi.fn(); + +vi.mock("../db.js", () => ({ + db: { + execute: (...a: unknown[]) => executeMock(...a), + }, +})); + +import { sweepStalePendingRuns } from "./sweep-stale-runs.js"; + +beforeEach(() => { + executeMock.mockReset(); +}); + +describe("sweepStalePendingRuns", () => { + it("returns 0 when no stale rows exist (skips the second UPDATE)", async () => { + executeMock.mockResolvedValueOnce({ rows: [] }); + const r = await sweepStalePendingRuns(); + expect(r).toEqual({ runs: 0, targets: 0 }); + // Only the first UPDATE (runs) runs; no second UPDATE for targets. + expect(executeMock).toHaveBeenCalledTimes(1); + }); + + it("fires both UPDATEs when there ARE stale rows", async () => { + executeMock + .mockResolvedValueOnce({ rows: [{ id: "run-A" }, { id: "run-B" }] }) + .mockResolvedValueOnce({ rows: [{ id: "t-1" }, { id: "t-2" }, { id: "t-3" }] }); + + const r = await sweepStalePendingRuns(); + + expect(r).toEqual({ runs: 2, targets: 3 }); + expect(executeMock).toHaveBeenCalledTimes(2); + }); + + it("returns the actual swept counts so the caller can log them", async () => { + executeMock + .mockResolvedValueOnce({ + rows: [{ id: "run-A" }, { id: "run-B" }, { id: "run-C" }], + }) + .mockResolvedValueOnce({ rows: Array.from({ length: 17 }, (_, i) => ({ id: `t-${i}` })) }); + + const r = await sweepStalePendingRuns(); + + expect(r.runs).toBe(3); + expect(r.targets).toBe(17); + }); + + it("doesn't throw when the targets UPDATE returns no rows (run with no pending targets)", async () => { + // A stale run with zero pending targets is unusual but possible — + // the run row got the up-front insert but the per-target inserts + // never ran. Still a stale run, still gets cleared. + executeMock + .mockResolvedValueOnce({ rows: [{ id: "run-D" }] }) + .mockResolvedValueOnce({ rows: [] }); + + const r = await sweepStalePendingRuns(); + expect(r).toEqual({ runs: 1, targets: 0 }); + }); +}); diff --git a/apps/bot/src/scheduler/sweep-stale-runs.ts b/apps/bot/src/scheduler/sweep-stale-runs.ts new file mode 100644 index 0000000..f7484f0 --- /dev/null +++ b/apps/bot/src/scheduler/sweep-stale-runs.ts @@ -0,0 +1,64 @@ +import { sql } from "drizzle-orm"; +import { db } from "../db.js"; +import { logger } from "../logger.js"; + +/** + * Recover from "bot crashed / restarted mid-run" crashes. + * + * fire-reminder writes the run row with status='pending' UP FRONT so + * the Activity tab can show progress mid-run, then flips to a + * terminal status (success/partial/failed/paused/skipped) once it's + * done. If the bot dies between those two writes, the row sits at + * 'pending' forever — pg-boss already marked the job 'completed', so + * it won't retry. + * + * This sweep runs at bot startup. It finds any 'pending' run older + * than `maxAgeMs` (default 5 minutes — enough slack that a real + * mid-run rebalance to another worker isn't accidentally killed) and: + * + * • Flips the run to 'failed' with a clear error_summary so the UI + * stops showing it as in-flight. + * • Flips its pending run_target rows to 'skipped' with the same + * reason so per-group counts make sense. + * + * Does NOT touch the parent reminder's lifecycle status — the row was + * 'active' when the run started and stays that way; the next + * occurrence (cron) or operator action will fire a fresh run. + */ +export async function sweepStalePendingRuns( + maxAgeMs: number = 5 * 60 * 1000, +): Promise<{ runs: number; targets: number }> { + const cutoffMs = Date.now() - maxAgeMs; + const cutoff = new Date(cutoffMs); + + const runs = await db.execute(sql` + UPDATE reminder_runs + SET status = 'failed', + error_summary = 'Bot restarted before this run completed.' + WHERE status = 'pending' + AND fired_at < ${cutoff} + RETURNING id + `); + const runRows = runs.rows as Array<{ id: string }>; + if (runRows.length === 0) { + logger.info("sweep-stale-runs: no stale pending runs"); + return { runs: 0, targets: 0 }; + } + + const ids = runRows.map((r) => r.id); + const targets = await db.execute(sql` + UPDATE reminder_run_targets + SET status = 'skipped', + error = 'bot restarted before this group could be sent' + WHERE status = 'pending' + AND run_id IN (${sql.join(ids.map((id) => sql`${id}`), sql`, `)}) + RETURNING id + `); + const targetCount = (targets.rows as Array).length; + + logger.warn( + { runs: runRows.length, targets: targetCount, cutoff: cutoff.toISOString() }, + "sweep-stale-runs: cleared stale pending runs", + ); + return { runs: runRows.length, targets: targetCount }; +}