From a789b61e1f7d341cf7a50e3ab0cf09f3e0e42bbd Mon Sep 17 00:00:00 2001 From: yiekheng Date: Sun, 10 May 2026 19:43:41 +0800 Subject: [PATCH] =?UTF-8?q?fix(bot):=20triple-fire=20reminder=20bug=20?= =?UTF-8?q?=E2=80=94=20force=20pg-boss=20policy=20+=20close=20TOCTOU=20ded?= =?UTF-8?q?upe?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Repro: fire a reminder, message lands 2-3 times in WhatsApp (logs showed three 'fire-reminder: done' entries within 1.5 s for the same reminderId). Two interlocking root causes: 1. The queue was created at 'standard' policy (pre-dating the stately rollout). pg-boss's createQueue is idempotent and DOES NOT update the policy on an existing queue row, so re-deploying the code that requested policy=stately silently kept the standard policy. Standard accepts duplicate enqueues with the same singletonKey — three reminder.fire jobs for the same reminderId could all land at once. 2. The handler-level recent-run dedupe was TOCTOU. The check ran OUTSIDE the per-account mutex, so three concurrent invocations all read 'no recent run', then queued up on the mutex one at a time and each INSERTed a fresh run + sent the message. Fixes: - registerReminderJobs now forces the queue policy via direct SQL (UPDATE pgboss.queue SET policy = 'stately' WHERE name = ... AND policy <> 'stately') on every boot. Idempotent + survives pre-existing standard-policy rows. - fireReminderInner re-checks for a recent run AFTER the mutex is held but BEFORE the INSERT. By that point any concurrent winner has already inserted, so the duplicate sees the row and bails cleanly. New test in fire-reminder.test.ts (the TOCTOU repro): outer check returns no recent run, inner check returns a freshly-inserted one, asserts the mutex was acquired but the second findFirst was hit (i.e. we got past the outer check and the inner check stopped us). Verified live: pgboss.queue.policy is now 'stately' for reminder.fire. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/bot/src/scheduler/fire-reminder.test.ts | 45 ++++++++++++++++++++ apps/bot/src/scheduler/fire-reminder.ts | 26 +++++++++++ apps/bot/src/scheduler/reminder-jobs.ts | 19 +++++++++ 3 files changed, 90 insertions(+) diff --git a/apps/bot/src/scheduler/fire-reminder.test.ts b/apps/bot/src/scheduler/fire-reminder.test.ts index ca02ff7..5046a43 100644 --- a/apps/bot/src/scheduler/fire-reminder.test.ts +++ b/apps/bot/src/scheduler/fire-reminder.test.ts @@ -108,6 +108,51 @@ describe("fireReminder", () => { expect(accountMutex.run).not.toHaveBeenCalled(); }); + it("BAILS OUT INSIDE the mutex when a concurrent job inserted a run while we were queued (TOCTOU repro)", async () => { + // Repro: three pg-boss jobs arrive in the same microsecond. All + // three pass the OUTER recent-run check (no run exists yet) and + // queue up on the per-account mutex. The first acquires, INSERTs + // a run, sends. The second acquires AFTER the first finished — + // its inner check now sees the just-inserted run and must bail, + // otherwise the message would be sent twice (or three times for + // the third job). Without the inner check this regression + // produced "qwerd msg three times" in production. + getReminderMock.mockResolvedValue({ + id: "r-1", + accountId: "acct-A", + status: "active", + targets: [], + messages: [], + createdBy: "op-1", + scheduleKind: "one_off", + rrule: null, + timezone: "Asia/Kuala_Lumpur", + deliveryWindowStartHour: 6, + deliveryWindowEndHour: 18, + name: "Test", + }); + // First call (outer check) returns no recent run → mutex acquired. + // Second call (inner check inside fireReminderInner) returns a + // freshly-inserted run from the concurrent winner, so the INSERT + // path bails. We never reach the .insert(reminderRuns) builder so + // the test passes by virtue of the inner-check log + early return. + findExistingRunMock + .mockResolvedValueOnce(undefined) + .mockResolvedValueOnce({ + id: "run-just-inserted-by-the-other-worker", + reminderId: "r-1", + firedAt: new Date(), + status: "pending", + }); + + await fireReminder({ reminderId: "r-1" }); + + // The mutex DID get acquired (we got past the outer check), but + // the inner check should have stopped us before any side effects. + expect(accountMutex.run).toHaveBeenCalledTimes(1); + expect(findExistingRunMock).toHaveBeenCalledTimes(2); + }); + it("BAILS OUT (no mutex acquired) when a fresh fire collides with a recent run", async () => { // Two pg-boss jobs landing within microseconds for the same // reminder should NOT both fire. The first creates the run; the diff --git a/apps/bot/src/scheduler/fire-reminder.ts b/apps/bot/src/scheduler/fire-reminder.ts index f658d4e..fb6fd9d 100644 --- a/apps/bot/src/scheduler/fire-reminder.ts +++ b/apps/bot/src/scheduler/fire-reminder.ts @@ -154,6 +154,32 @@ async function fireReminderInner( .set({ status: "pending", errorSummary: null }) .where(eq(reminderRuns.id, runId)); } else { + // Re-check the dedupe window now that we're inside the per-account + // mutex. The outer check in fireReminder() is a fast-path bail-out + // but it's TOCTOU: three concurrent jobs can all read "no recent + // run" before any of them inserts, so the message gets sent 2-3 + // times. Inside the mutex, the queue serialises us — by the time + // duplicate #2 reaches this point, duplicate #1 has already + // INSERTed and we'll find that row here. + const recent = await db.query.reminderRuns.findFirst({ + where: (r, { eq: dEq, and: dAnd, gt: dGt }) => + dAnd( + dEq(r.reminderId, reminder.id), + dGt(r.firedAt, new Date(Date.now() - DUPLICATE_FIRE_WINDOW_MS)), + ), + orderBy: (r, { desc }) => [desc(r.firedAt)], + }); + if (recent) { + logger.warn( + { + reminderId: reminder.id, + recentRunId: recent.id, + recentFiredAt: recent.firedAt, + }, + "fire-reminder: duplicate fire detected inside mutex (a run was just inserted by a concurrent job), skipping", + ); + return; + } const [run] = await db .insert(reminderRuns) .values({ diff --git a/apps/bot/src/scheduler/reminder-jobs.ts b/apps/bot/src/scheduler/reminder-jobs.ts index 1145c53..3a03311 100644 --- a/apps/bot/src/scheduler/reminder-jobs.ts +++ b/apps/bot/src/scheduler/reminder-jobs.ts @@ -1,6 +1,8 @@ import type { PgBoss } from "pg-boss"; +import { sql } from "drizzle-orm"; import { logger } from "../logger.js"; import { env } from "../env.js"; +import { db } from "../db.js"; import { fireReminder, type FireReminderPayload } from "./fire-reminder.js"; export const REMINDER_FIRE_QUEUE = "reminder.fire"; @@ -16,6 +18,23 @@ export async function registerReminderJobs(boss: PgBoss): Promise { // when two reminder.fire jobs landed within microseconds. // https://github.com/timgit/pg-boss/blob/master/docs/usage.md#queue-policies await boss.createQueue(REMINDER_FIRE_QUEUE, { policy: "stately" }); + // pg-boss v12's createQueue is idempotent and DOES NOT update the + // policy on an existing queue row. Earlier deployments created the + // queue with the default 'standard' policy, so the queue keeps + // accepting duplicate enqueues and the same reminder fires twice or + // three times. Force the policy via direct SQL on every boot — this + // also future-proofs the upgrade path if pg-boss ever lets us flip + // policy through its API. + try { + await db.execute( + sql`UPDATE pgboss.queue SET policy = 'stately' WHERE name = ${REMINDER_FIRE_QUEUE} AND policy <> 'stately'`, + ); + } catch (err) { + logger.warn( + { err }, + "reminder.fire: failed to force queue policy=stately (handler-level dedupe still applies)", + ); + } await boss.work( REMINDER_FIRE_QUEUE, {