feat(bot): sweep stale 'pending' runs on startup
Corner case observed: fire-reminder writes the run row with
status='pending' UP FRONT (so the Activity tab shows progress
mid-run), then flips to a terminal status once it's done. If the
bot is killed between those two writes — e.g. a redeploy or crash —
the row sits at 'pending' forever. pg-boss already marked the job
'completed', so it won't retry. Activity surfaces and the dashboard
counters then show a "stuck" run that never moves.
sweepStalePendingRuns runs at bot startup, finds any 'pending' run
older than 5 minutes, and:
• Flips the run to 'failed' with a clear error_summary so the UI
stops treating it as in-flight.
• Flips its still-'pending' run_target rows to 'skipped' with the
same reason so per-group counts remain coherent.
The 5-minute floor is generous enough that an actual mid-run worker
rebalance isn't accidentally killed.
Tests:
* 4 sweep tests covering: no-stale path skips the second UPDATE;
with-stale path fires both UPDATEs; counts are forwarded; the
edge case where a stale run has zero pending targets.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
bb8d28a594
commit
309020fa5d
@ -9,6 +9,7 @@ import {
|
||||
registerDefaultHandlers,
|
||||
} from "./ipc/command-consumer.js";
|
||||
import { sweepStalePendingAccounts } from "./ipc/pair-handler.js";
|
||||
import { sweepStalePendingRuns } from "./scheduler/sweep-stale-runs.js";
|
||||
|
||||
async function main(): Promise<void> {
|
||||
logger.info("bot starting");
|
||||
@ -22,6 +23,7 @@ async function main(): Promise<void> {
|
||||
const stopConsumer = await startCommandConsumer();
|
||||
|
||||
await sweepStalePendingAccounts();
|
||||
await sweepStalePendingRuns();
|
||||
await sessionManager.resumeFromDb();
|
||||
|
||||
const shutdown = async (signal: string): Promise<void> => {
|
||||
|
||||
76
apps/bot/src/scheduler/sweep-stale-runs.test.ts
Normal file
76
apps/bot/src/scheduler/sweep-stale-runs.test.ts
Normal file
@ -0,0 +1,76 @@
|
||||
import { describe, it, expect, vi, beforeEach } from "vitest";
|
||||
|
||||
/**
|
||||
* Corner case under test: fire-reminder writes the run row with
|
||||
* status='pending' UP FRONT. If the bot is killed before it flips to
|
||||
* a terminal status, the row sits at 'pending' indefinitely — pg-boss
|
||||
* won't retry (the job already ran). Activity surfaces, the dashboard
|
||||
* counters, and the paused-banner all read the row at face value, so
|
||||
* the operator sees a "stuck" run that never moves.
|
||||
*
|
||||
* sweepStalePendingRuns recovers from this on bot startup.
|
||||
*/
|
||||
|
||||
// db.execute fan-out: build a list of {sql, return} pairs the test
|
||||
// can assert on, and replay them in order. Ordering matters because
|
||||
// the implementation does TWO updates (runs first, then targets) and
|
||||
// the second one must only run if the first returned anything.
|
||||
const executeMock = vi.fn();
|
||||
|
||||
vi.mock("../db.js", () => ({
|
||||
db: {
|
||||
execute: (...a: unknown[]) => executeMock(...a),
|
||||
},
|
||||
}));
|
||||
|
||||
import { sweepStalePendingRuns } from "./sweep-stale-runs.js";
|
||||
|
||||
beforeEach(() => {
|
||||
executeMock.mockReset();
|
||||
});
|
||||
|
||||
describe("sweepStalePendingRuns", () => {
|
||||
it("returns 0 when no stale rows exist (skips the second UPDATE)", async () => {
|
||||
executeMock.mockResolvedValueOnce({ rows: [] });
|
||||
const r = await sweepStalePendingRuns();
|
||||
expect(r).toEqual({ runs: 0, targets: 0 });
|
||||
// Only the first UPDATE (runs) runs; no second UPDATE for targets.
|
||||
expect(executeMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("fires both UPDATEs when there ARE stale rows", async () => {
|
||||
executeMock
|
||||
.mockResolvedValueOnce({ rows: [{ id: "run-A" }, { id: "run-B" }] })
|
||||
.mockResolvedValueOnce({ rows: [{ id: "t-1" }, { id: "t-2" }, { id: "t-3" }] });
|
||||
|
||||
const r = await sweepStalePendingRuns();
|
||||
|
||||
expect(r).toEqual({ runs: 2, targets: 3 });
|
||||
expect(executeMock).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("returns the actual swept counts so the caller can log them", async () => {
|
||||
executeMock
|
||||
.mockResolvedValueOnce({
|
||||
rows: [{ id: "run-A" }, { id: "run-B" }, { id: "run-C" }],
|
||||
})
|
||||
.mockResolvedValueOnce({ rows: Array.from({ length: 17 }, (_, i) => ({ id: `t-${i}` })) });
|
||||
|
||||
const r = await sweepStalePendingRuns();
|
||||
|
||||
expect(r.runs).toBe(3);
|
||||
expect(r.targets).toBe(17);
|
||||
});
|
||||
|
||||
it("doesn't throw when the targets UPDATE returns no rows (run with no pending targets)", async () => {
|
||||
// A stale run with zero pending targets is unusual but possible —
|
||||
// the run row got the up-front insert but the per-target inserts
|
||||
// never ran. Still a stale run, still gets cleared.
|
||||
executeMock
|
||||
.mockResolvedValueOnce({ rows: [{ id: "run-D" }] })
|
||||
.mockResolvedValueOnce({ rows: [] });
|
||||
|
||||
const r = await sweepStalePendingRuns();
|
||||
expect(r).toEqual({ runs: 1, targets: 0 });
|
||||
});
|
||||
});
|
||||
64
apps/bot/src/scheduler/sweep-stale-runs.ts
Normal file
64
apps/bot/src/scheduler/sweep-stale-runs.ts
Normal file
@ -0,0 +1,64 @@
|
||||
import { sql } from "drizzle-orm";
|
||||
import { db } from "../db.js";
|
||||
import { logger } from "../logger.js";
|
||||
|
||||
/**
|
||||
* Recover from "bot crashed / restarted mid-run" crashes.
|
||||
*
|
||||
* fire-reminder writes the run row with status='pending' UP FRONT so
|
||||
* the Activity tab can show progress mid-run, then flips to a
|
||||
* terminal status (success/partial/failed/paused/skipped) once it's
|
||||
* done. If the bot dies between those two writes, the row sits at
|
||||
* 'pending' forever — pg-boss already marked the job 'completed', so
|
||||
* it won't retry.
|
||||
*
|
||||
* This sweep runs at bot startup. It finds any 'pending' run older
|
||||
* than `maxAgeMs` (default 5 minutes — enough slack that a real
|
||||
* mid-run rebalance to another worker isn't accidentally killed) and:
|
||||
*
|
||||
* • Flips the run to 'failed' with a clear error_summary so the UI
|
||||
* stops showing it as in-flight.
|
||||
* • Flips its pending run_target rows to 'skipped' with the same
|
||||
* reason so per-group counts make sense.
|
||||
*
|
||||
* Does NOT touch the parent reminder's lifecycle status — the row was
|
||||
* 'active' when the run started and stays that way; the next
|
||||
* occurrence (cron) or operator action will fire a fresh run.
|
||||
*/
|
||||
export async function sweepStalePendingRuns(
|
||||
maxAgeMs: number = 5 * 60 * 1000,
|
||||
): Promise<{ runs: number; targets: number }> {
|
||||
const cutoffMs = Date.now() - maxAgeMs;
|
||||
const cutoff = new Date(cutoffMs);
|
||||
|
||||
const runs = await db.execute(sql`
|
||||
UPDATE reminder_runs
|
||||
SET status = 'failed',
|
||||
error_summary = 'Bot restarted before this run completed.'
|
||||
WHERE status = 'pending'
|
||||
AND fired_at < ${cutoff}
|
||||
RETURNING id
|
||||
`);
|
||||
const runRows = runs.rows as Array<{ id: string }>;
|
||||
if (runRows.length === 0) {
|
||||
logger.info("sweep-stale-runs: no stale pending runs");
|
||||
return { runs: 0, targets: 0 };
|
||||
}
|
||||
|
||||
const ids = runRows.map((r) => r.id);
|
||||
const targets = await db.execute(sql`
|
||||
UPDATE reminder_run_targets
|
||||
SET status = 'skipped',
|
||||
error = 'bot restarted before this group could be sent'
|
||||
WHERE status = 'pending'
|
||||
AND run_id IN (${sql.join(ids.map((id) => sql`${id}`), sql`, `)})
|
||||
RETURNING id
|
||||
`);
|
||||
const targetCount = (targets.rows as Array<unknown>).length;
|
||||
|
||||
logger.warn(
|
||||
{ runs: runRows.length, targets: targetCount, cutoff: cutoff.toISOString() },
|
||||
"sweep-stale-runs: cleared stale pending runs",
|
||||
);
|
||||
return { runs: runRows.length, targets: targetCount };
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user