diff --git a/apps/bot/src/scheduler/fire-reminder.test.ts b/apps/bot/src/scheduler/fire-reminder.test.ts index ca02ff7..5046a43 100644 --- a/apps/bot/src/scheduler/fire-reminder.test.ts +++ b/apps/bot/src/scheduler/fire-reminder.test.ts @@ -108,6 +108,51 @@ describe("fireReminder", () => { expect(accountMutex.run).not.toHaveBeenCalled(); }); + it("BAILS OUT INSIDE the mutex when a concurrent job inserted a run while we were queued (TOCTOU repro)", async () => { + // Repro: three pg-boss jobs arrive in the same microsecond. All + // three pass the OUTER recent-run check (no run exists yet) and + // queue up on the per-account mutex. The first acquires, INSERTs + // a run, sends. The second acquires AFTER the first finished — + // its inner check now sees the just-inserted run and must bail, + // otherwise the message would be sent twice (or three times for + // the third job). Without the inner check this regression + // produced "qwerd msg three times" in production. + getReminderMock.mockResolvedValue({ + id: "r-1", + accountId: "acct-A", + status: "active", + targets: [], + messages: [], + createdBy: "op-1", + scheduleKind: "one_off", + rrule: null, + timezone: "Asia/Kuala_Lumpur", + deliveryWindowStartHour: 6, + deliveryWindowEndHour: 18, + name: "Test", + }); + // First call (outer check) returns no recent run → mutex acquired. + // Second call (inner check inside fireReminderInner) returns a + // freshly-inserted run from the concurrent winner, so the INSERT + // path bails. We never reach the .insert(reminderRuns) builder so + // the test passes by virtue of the inner-check log + early return. + findExistingRunMock + .mockResolvedValueOnce(undefined) + .mockResolvedValueOnce({ + id: "run-just-inserted-by-the-other-worker", + reminderId: "r-1", + firedAt: new Date(), + status: "pending", + }); + + await fireReminder({ reminderId: "r-1" }); + + // The mutex DID get acquired (we got past the outer check), but + // the inner check should have stopped us before any side effects. + expect(accountMutex.run).toHaveBeenCalledTimes(1); + expect(findExistingRunMock).toHaveBeenCalledTimes(2); + }); + it("BAILS OUT (no mutex acquired) when a fresh fire collides with a recent run", async () => { // Two pg-boss jobs landing within microseconds for the same // reminder should NOT both fire. The first creates the run; the diff --git a/apps/bot/src/scheduler/fire-reminder.ts b/apps/bot/src/scheduler/fire-reminder.ts index f658d4e..fb6fd9d 100644 --- a/apps/bot/src/scheduler/fire-reminder.ts +++ b/apps/bot/src/scheduler/fire-reminder.ts @@ -154,6 +154,32 @@ async function fireReminderInner( .set({ status: "pending", errorSummary: null }) .where(eq(reminderRuns.id, runId)); } else { + // Re-check the dedupe window now that we're inside the per-account + // mutex. The outer check in fireReminder() is a fast-path bail-out + // but it's TOCTOU: three concurrent jobs can all read "no recent + // run" before any of them inserts, so the message gets sent 2-3 + // times. Inside the mutex, the queue serialises us — by the time + // duplicate #2 reaches this point, duplicate #1 has already + // INSERTed and we'll find that row here. + const recent = await db.query.reminderRuns.findFirst({ + where: (r, { eq: dEq, and: dAnd, gt: dGt }) => + dAnd( + dEq(r.reminderId, reminder.id), + dGt(r.firedAt, new Date(Date.now() - DUPLICATE_FIRE_WINDOW_MS)), + ), + orderBy: (r, { desc }) => [desc(r.firedAt)], + }); + if (recent) { + logger.warn( + { + reminderId: reminder.id, + recentRunId: recent.id, + recentFiredAt: recent.firedAt, + }, + "fire-reminder: duplicate fire detected inside mutex (a run was just inserted by a concurrent job), skipping", + ); + return; + } const [run] = await db .insert(reminderRuns) .values({ diff --git a/apps/bot/src/scheduler/reminder-jobs.ts b/apps/bot/src/scheduler/reminder-jobs.ts index 1145c53..3a03311 100644 --- a/apps/bot/src/scheduler/reminder-jobs.ts +++ b/apps/bot/src/scheduler/reminder-jobs.ts @@ -1,6 +1,8 @@ import type { PgBoss } from "pg-boss"; +import { sql } from "drizzle-orm"; import { logger } from "../logger.js"; import { env } from "../env.js"; +import { db } from "../db.js"; import { fireReminder, type FireReminderPayload } from "./fire-reminder.js"; export const REMINDER_FIRE_QUEUE = "reminder.fire"; @@ -16,6 +18,23 @@ export async function registerReminderJobs(boss: PgBoss): Promise { // when two reminder.fire jobs landed within microseconds. // https://github.com/timgit/pg-boss/blob/master/docs/usage.md#queue-policies await boss.createQueue(REMINDER_FIRE_QUEUE, { policy: "stately" }); + // pg-boss v12's createQueue is idempotent and DOES NOT update the + // policy on an existing queue row. Earlier deployments created the + // queue with the default 'standard' policy, so the queue keeps + // accepting duplicate enqueues and the same reminder fires twice or + // three times. Force the policy via direct SQL on every boot — this + // also future-proofs the upgrade path if pg-boss ever lets us flip + // policy through its API. + try { + await db.execute( + sql`UPDATE pgboss.queue SET policy = 'stately' WHERE name = ${REMINDER_FIRE_QUEUE} AND policy <> 'stately'`, + ); + } catch (err) { + logger.warn( + { err }, + "reminder.fire: failed to force queue policy=stately (handler-level dedupe still applies)", + ); + } await boss.work( REMINDER_FIRE_QUEUE, {