From 4cb401566641f732f92543739b3911497f688108 Mon Sep 17 00:00:00 2001 From: yiekheng Date: Sun, 10 May 2026 16:41:11 +0800 Subject: [PATCH] fix(bot): dedupe duplicate reminder.fire jobs (msg sent twice) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Observed: reminder fired twice within ~2s. The bot logs showed two distinct pg-boss jobIds for the same reminder enqueued at the same scheduledAt — both ran fire-reminder, both sent the message. Root cause: pg-boss's `singletonKey` only deduplicates on queues with a 'singleton' / 'stately' / 'short' policy. Our queue was created without specifying a policy, defaulting to 'standard', which IGNORES the singletonKey. Two sends with the same key produced two jobs. Fix lives at two layers: * Layer 1 — queue policy. createQueue(REMINDER_FIRE_QUEUE) now passes `{ policy: 'stately' }`. With this, future fresh deploys fold a duplicate send (same singletonKey) into the existing 'created' job rather than producing a second one. This doesn't retroactively change an existing queue's policy (pg-boss doesn't support that), but new queues are correct from creation. * Layer 2 — defense-in-depth check inside fireReminder. Before acquiring the per-account mutex, query reminderRuns for any row with the same reminderId fired in the last 30s. If found, log + bail. This guards against: - Existing queues stuck on policy='standard'. - Race windows even within 'stately' policy. - The operator double-clicking Save in the wizard. - A jittery pg_notify('bot.command') replay. Resume jobs (payload.runId set) skip this check — they're meant to attach to an existing run. Tests: * New "BAILS OUT when a fresh fire collides with a recent run" case in fire-reminder.test.ts. * beforeEach now resets findExistingRunMock too, since both the resume and dedupe paths share that mock. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/bot/src/scheduler/fire-reminder.test.ts | 34 ++++++++++++++++++ apps/bot/src/scheduler/fire-reminder.ts | 38 ++++++++++++++++++++ apps/bot/src/scheduler/reminder-jobs.ts | 11 +++++- 3 files changed, 82 insertions(+), 1 deletion(-) diff --git a/apps/bot/src/scheduler/fire-reminder.test.ts b/apps/bot/src/scheduler/fire-reminder.test.ts index 5b173be..ca02ff7 100644 --- a/apps/bot/src/scheduler/fire-reminder.test.ts +++ b/apps/bot/src/scheduler/fire-reminder.test.ts @@ -54,6 +54,7 @@ describe("fireReminder", () => { beforeEach(() => { vi.mocked(accountMutex.run).mockClear(); getReminderMock.mockReset(); + findExistingRunMock.mockReset(); }); it("acquires accountMutex keyed by accountId for active reminders", async () => { @@ -107,6 +108,39 @@ describe("fireReminder", () => { expect(accountMutex.run).not.toHaveBeenCalled(); }); + it("BAILS OUT (no mutex acquired) when a fresh fire collides with a recent run", async () => { + // Two pg-boss jobs landing within microseconds for the same + // reminder should NOT both fire. The first creates the run; the + // second sees that run is < DUPLICATE_FIRE_WINDOW_MS old and exits. + getReminderMock.mockResolvedValue({ + id: "r-1", + accountId: "acct-A", + status: "active", + targets: [], + messages: [], + createdBy: "op-1", + scheduleKind: "one_off", + rrule: null, + timezone: "Asia/Kuala_Lumpur", + deliveryWindowStartHour: 6, + deliveryWindowEndHour: 18, + name: "Test", + }); + // The duplicate-fire check shares the reminderRuns.findFirst mock. + // Return a fresh run (firedAt = "just now") to simulate the + // collision. + findExistingRunMock.mockResolvedValue({ + id: "run-recent", + reminderId: "r-1", + firedAt: new Date(), + status: "pending", + }); + + await fireReminder({ reminderId: "r-1" }); + + expect(accountMutex.run).not.toHaveBeenCalled(); + }); + it("DOES acquire the mutex on a resume even when the reminder is paused", async () => { // Resume path must allow status='paused' (and 'active') so the // operator can drag a paused reminder back into delivery. Fresh diff --git a/apps/bot/src/scheduler/fire-reminder.ts b/apps/bot/src/scheduler/fire-reminder.ts index 5cbf51b..f658d4e 100644 --- a/apps/bot/src/scheduler/fire-reminder.ts +++ b/apps/bot/src/scheduler/fire-reminder.ts @@ -36,6 +36,16 @@ export type FireReminderPayload = { runId?: string; }; +/** + * Window in which two fire-reminder jobs for the same reminder are + * treated as duplicates. Generous enough to absorb real-world double- + * submits (the operator clicks Save twice; pg_notify floods the + * command-consumer; pg-boss policy didn't dedupe a microsecond-apart + * race) — short enough that a deliberately rapid recurring schedule + * (e.g. every minute, in dev) still fires every occurrence. + */ +const DUPLICATE_FIRE_WINDOW_MS = 30_000; + /** Random delay between same-group message parts. Just enough for * visible ordering in the chat at WA's natural pace. */ function partJitterMs(): number { @@ -86,6 +96,34 @@ export async function fireReminder(payload: FireReminderPayload): Promise return; } + // Defense-in-depth dedupe: if pg-boss enqueues two reminder.fire jobs + // for the same reminderId within microseconds (e.g. a duplicate + // schedule call slipped past the queue's singletonKey), the second + // worker would otherwise create a SECOND run and the same message + // gets sent twice. Bail out if a run for this reminder already exists + // and was created less than DUPLICATE_FIRE_WINDOW_MS ago. + if (!payload.runId) { + const recent = await db.query.reminderRuns.findFirst({ + where: (r, { eq: dEq, and: dAnd, gt: dGt }) => + dAnd( + dEq(r.reminderId, reminder.id), + dGt(r.firedAt, new Date(Date.now() - DUPLICATE_FIRE_WINDOW_MS)), + ), + orderBy: (r, { desc }) => [desc(r.firedAt)], + }); + if (recent) { + logger.warn( + { + reminderId: reminder.id, + recentRunId: recent.id, + recentFiredAt: recent.firedAt, + }, + "fire-reminder: duplicate fire detected (a run for this reminder was just created), skipping", + ); + return; + } + } + // Per-account mutex: two reminders on the SAME account take turns // (running them concurrently would double the effective send rate // and risk a ban). Different accounts run in parallel. diff --git a/apps/bot/src/scheduler/reminder-jobs.ts b/apps/bot/src/scheduler/reminder-jobs.ts index 1ede06c..1145c53 100644 --- a/apps/bot/src/scheduler/reminder-jobs.ts +++ b/apps/bot/src/scheduler/reminder-jobs.ts @@ -6,7 +6,16 @@ import { fireReminder, type FireReminderPayload } from "./fire-reminder.js"; export const REMINDER_FIRE_QUEUE = "reminder.fire"; export async function registerReminderJobs(boss: PgBoss): Promise { - await boss.createQueue(REMINDER_FIRE_QUEUE); + // 'stately' = at most 1 job per (state, singletonKey). Combined with + // singletonKey="reminder:" on every send, that means a duplicate + // schedule call (e.g. operator double-clicked Save, or the + // pg_notify('bot.command') consumer fired twice in the same tick) + // is folded into the existing 'created' job instead of producing a + // second run. The default 'standard' policy DOES NOT dedupe by + // singletonKey — that's how we ended up firing a reminder twice + // when two reminder.fire jobs landed within microseconds. + // https://github.com/timgit/pg-boss/blob/master/docs/usage.md#queue-policies + await boss.createQueue(REMINDER_FIRE_QUEUE, { policy: "stately" }); await boss.work( REMINDER_FIRE_QUEUE, {