fix(bot): dedupe duplicate reminder.fire jobs (msg sent twice)
Observed: reminder fired twice within ~2s. The bot logs showed two
distinct pg-boss jobIds for the same reminder enqueued at the same
scheduledAt — both ran fire-reminder, both sent the message.
Root cause: pg-boss's `singletonKey` only deduplicates on queues with
a 'singleton' / 'stately' / 'short' policy. Our queue was created
without specifying a policy, defaulting to 'standard', which IGNORES
the singletonKey. Two sends with the same key produced two jobs.
Fix lives at two layers:
* Layer 1 — queue policy. createQueue(REMINDER_FIRE_QUEUE) now
passes `{ policy: 'stately' }`. With this, future fresh deploys
fold a duplicate send (same singletonKey) into the existing
'created' job rather than producing a second one. This doesn't
retroactively change an existing queue's policy (pg-boss doesn't
support that), but new queues are correct from creation.
* Layer 2 — defense-in-depth check inside fireReminder. Before
acquiring the per-account mutex, query reminderRuns for any row
with the same reminderId fired in the last 30s. If found, log
+ bail. This guards against:
- Existing queues stuck on policy='standard'.
- Race windows even within 'stately' policy.
- The operator double-clicking Save in the wizard.
- A jittery pg_notify('bot.command') replay.
Resume jobs (payload.runId set) skip this check — they're meant
to attach to an existing run.
Tests:
* New "BAILS OUT when a fresh fire collides with a recent run" case
in fire-reminder.test.ts.
* beforeEach now resets findExistingRunMock too, since both the
resume and dedupe paths share that mock.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
be3f28a1e6
commit
38a99eae74
@ -54,6 +54,7 @@ describe("fireReminder", () => {
|
||||
beforeEach(() => {
|
||||
vi.mocked(accountMutex.run).mockClear();
|
||||
getReminderMock.mockReset();
|
||||
findExistingRunMock.mockReset();
|
||||
});
|
||||
|
||||
it("acquires accountMutex keyed by accountId for active reminders", async () => {
|
||||
@ -107,6 +108,39 @@ describe("fireReminder", () => {
|
||||
expect(accountMutex.run).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("BAILS OUT (no mutex acquired) when a fresh fire collides with a recent run", async () => {
|
||||
// Two pg-boss jobs landing within microseconds for the same
|
||||
// reminder should NOT both fire. The first creates the run; the
|
||||
// second sees that run is < DUPLICATE_FIRE_WINDOW_MS old and exits.
|
||||
getReminderMock.mockResolvedValue({
|
||||
id: "r-1",
|
||||
accountId: "acct-A",
|
||||
status: "active",
|
||||
targets: [],
|
||||
messages: [],
|
||||
createdBy: "op-1",
|
||||
scheduleKind: "one_off",
|
||||
rrule: null,
|
||||
timezone: "Asia/Kuala_Lumpur",
|
||||
deliveryWindowStartHour: 6,
|
||||
deliveryWindowEndHour: 18,
|
||||
name: "Test",
|
||||
});
|
||||
// The duplicate-fire check shares the reminderRuns.findFirst mock.
|
||||
// Return a fresh run (firedAt = "just now") to simulate the
|
||||
// collision.
|
||||
findExistingRunMock.mockResolvedValue({
|
||||
id: "run-recent",
|
||||
reminderId: "r-1",
|
||||
firedAt: new Date(),
|
||||
status: "pending",
|
||||
});
|
||||
|
||||
await fireReminder({ reminderId: "r-1" });
|
||||
|
||||
expect(accountMutex.run).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("DOES acquire the mutex on a resume even when the reminder is paused", async () => {
|
||||
// Resume path must allow status='paused' (and 'active') so the
|
||||
// operator can drag a paused reminder back into delivery. Fresh
|
||||
|
||||
@ -36,6 +36,16 @@ export type FireReminderPayload = {
|
||||
runId?: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Window in which two fire-reminder jobs for the same reminder are
|
||||
* treated as duplicates. Generous enough to absorb real-world double-
|
||||
* submits (the operator clicks Save twice; pg_notify floods the
|
||||
* command-consumer; pg-boss policy didn't dedupe a microsecond-apart
|
||||
* race) — short enough that a deliberately rapid recurring schedule
|
||||
* (e.g. every minute, in dev) still fires every occurrence.
|
||||
*/
|
||||
const DUPLICATE_FIRE_WINDOW_MS = 30_000;
|
||||
|
||||
/** Random delay between same-group message parts. Just enough for
|
||||
* visible ordering in the chat at WA's natural pace. */
|
||||
function partJitterMs(): number {
|
||||
@ -86,6 +96,34 @@ export async function fireReminder(payload: FireReminderPayload): Promise<void>
|
||||
return;
|
||||
}
|
||||
|
||||
// Defense-in-depth dedupe: if pg-boss enqueues two reminder.fire jobs
|
||||
// for the same reminderId within microseconds (e.g. a duplicate
|
||||
// schedule call slipped past the queue's singletonKey), the second
|
||||
// worker would otherwise create a SECOND run and the same message
|
||||
// gets sent twice. Bail out if a run for this reminder already exists
|
||||
// and was created less than DUPLICATE_FIRE_WINDOW_MS ago.
|
||||
if (!payload.runId) {
|
||||
const recent = await db.query.reminderRuns.findFirst({
|
||||
where: (r, { eq: dEq, and: dAnd, gt: dGt }) =>
|
||||
dAnd(
|
||||
dEq(r.reminderId, reminder.id),
|
||||
dGt(r.firedAt, new Date(Date.now() - DUPLICATE_FIRE_WINDOW_MS)),
|
||||
),
|
||||
orderBy: (r, { desc }) => [desc(r.firedAt)],
|
||||
});
|
||||
if (recent) {
|
||||
logger.warn(
|
||||
{
|
||||
reminderId: reminder.id,
|
||||
recentRunId: recent.id,
|
||||
recentFiredAt: recent.firedAt,
|
||||
},
|
||||
"fire-reminder: duplicate fire detected (a run for this reminder was just created), skipping",
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Per-account mutex: two reminders on the SAME account take turns
|
||||
// (running them concurrently would double the effective send rate
|
||||
// and risk a ban). Different accounts run in parallel.
|
||||
|
||||
@ -6,7 +6,16 @@ import { fireReminder, type FireReminderPayload } from "./fire-reminder.js";
|
||||
export const REMINDER_FIRE_QUEUE = "reminder.fire";
|
||||
|
||||
export async function registerReminderJobs(boss: PgBoss): Promise<void> {
|
||||
await boss.createQueue(REMINDER_FIRE_QUEUE);
|
||||
// 'stately' = at most 1 job per (state, singletonKey). Combined with
|
||||
// singletonKey="reminder:<id>" on every send, that means a duplicate
|
||||
// schedule call (e.g. operator double-clicked Save, or the
|
||||
// pg_notify('bot.command') consumer fired twice in the same tick)
|
||||
// is folded into the existing 'created' job instead of producing a
|
||||
// second run. The default 'standard' policy DOES NOT dedupe by
|
||||
// singletonKey — that's how we ended up firing a reminder twice
|
||||
// when two reminder.fire jobs landed within microseconds.
|
||||
// https://github.com/timgit/pg-boss/blob/master/docs/usage.md#queue-policies
|
||||
await boss.createQueue(REMINDER_FIRE_QUEUE, { policy: "stately" });
|
||||
await boss.work<FireReminderPayload>(
|
||||
REMINDER_FIRE_QUEUE,
|
||||
{
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user