fix(bot): dedupe duplicate reminder.fire jobs (msg sent twice)
Observed: reminder fired twice within ~2s. The bot logs showed two
distinct pg-boss jobIds for the same reminder enqueued at the same
scheduledAt — both ran fire-reminder, both sent the message.
Root cause: pg-boss's `singletonKey` only deduplicates on queues with
a 'singleton' / 'stately' / 'short' policy. Our queue was created
without specifying a policy, defaulting to 'standard', which IGNORES
the singletonKey. Two sends with the same key produced two jobs.
Fix lives at two layers:
* Layer 1 — queue policy. createQueue(REMINDER_FIRE_QUEUE) now
passes `{ policy: 'stately' }`. With this, future fresh deploys
fold a duplicate send (same singletonKey) into the existing
'created' job rather than producing a second one. This doesn't
retroactively change an existing queue's policy (pg-boss doesn't
support that), but new queues are correct from creation.
* Layer 2 — defense-in-depth check inside fireReminder. Before
acquiring the per-account mutex, query reminderRuns for any row
with the same reminderId fired in the last 30s. If found, log
+ bail. This guards against:
- Existing queues stuck on policy='standard'.
- Race windows even within 'stately' policy.
- The operator double-clicking Save in the wizard.
- A jittery pg_notify('bot.command') replay.
Resume jobs (payload.runId set) skip this check — they're meant
to attach to an existing run.
Tests:
* New "BAILS OUT when a fresh fire collides with a recent run" case
in fire-reminder.test.ts.
* beforeEach now resets findExistingRunMock too, since both the
resume and dedupe paths share that mock.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
be3f28a1e6
commit
4cb4015666
@ -54,6 +54,7 @@ describe("fireReminder", () => {
|
|||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
vi.mocked(accountMutex.run).mockClear();
|
vi.mocked(accountMutex.run).mockClear();
|
||||||
getReminderMock.mockReset();
|
getReminderMock.mockReset();
|
||||||
|
findExistingRunMock.mockReset();
|
||||||
});
|
});
|
||||||
|
|
||||||
it("acquires accountMutex keyed by accountId for active reminders", async () => {
|
it("acquires accountMutex keyed by accountId for active reminders", async () => {
|
||||||
@ -107,6 +108,39 @@ describe("fireReminder", () => {
|
|||||||
expect(accountMutex.run).not.toHaveBeenCalled();
|
expect(accountMutex.run).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("BAILS OUT (no mutex acquired) when a fresh fire collides with a recent run", async () => {
|
||||||
|
// Two pg-boss jobs landing within microseconds for the same
|
||||||
|
// reminder should NOT both fire. The first creates the run; the
|
||||||
|
// second sees that run is < DUPLICATE_FIRE_WINDOW_MS old and exits.
|
||||||
|
getReminderMock.mockResolvedValue({
|
||||||
|
id: "r-1",
|
||||||
|
accountId: "acct-A",
|
||||||
|
status: "active",
|
||||||
|
targets: [],
|
||||||
|
messages: [],
|
||||||
|
createdBy: "op-1",
|
||||||
|
scheduleKind: "one_off",
|
||||||
|
rrule: null,
|
||||||
|
timezone: "Asia/Kuala_Lumpur",
|
||||||
|
deliveryWindowStartHour: 6,
|
||||||
|
deliveryWindowEndHour: 18,
|
||||||
|
name: "Test",
|
||||||
|
});
|
||||||
|
// The duplicate-fire check shares the reminderRuns.findFirst mock.
|
||||||
|
// Return a fresh run (firedAt = "just now") to simulate the
|
||||||
|
// collision.
|
||||||
|
findExistingRunMock.mockResolvedValue({
|
||||||
|
id: "run-recent",
|
||||||
|
reminderId: "r-1",
|
||||||
|
firedAt: new Date(),
|
||||||
|
status: "pending",
|
||||||
|
});
|
||||||
|
|
||||||
|
await fireReminder({ reminderId: "r-1" });
|
||||||
|
|
||||||
|
expect(accountMutex.run).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
it("DOES acquire the mutex on a resume even when the reminder is paused", async () => {
|
it("DOES acquire the mutex on a resume even when the reminder is paused", async () => {
|
||||||
// Resume path must allow status='paused' (and 'active') so the
|
// Resume path must allow status='paused' (and 'active') so the
|
||||||
// operator can drag a paused reminder back into delivery. Fresh
|
// operator can drag a paused reminder back into delivery. Fresh
|
||||||
|
|||||||
@ -36,6 +36,16 @@ export type FireReminderPayload = {
|
|||||||
runId?: string;
|
runId?: string;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Window in which two fire-reminder jobs for the same reminder are
|
||||||
|
* treated as duplicates. Generous enough to absorb real-world double-
|
||||||
|
* submits (the operator clicks Save twice; pg_notify floods the
|
||||||
|
* command-consumer; pg-boss policy didn't dedupe a microsecond-apart
|
||||||
|
* race) — short enough that a deliberately rapid recurring schedule
|
||||||
|
* (e.g. every minute, in dev) still fires every occurrence.
|
||||||
|
*/
|
||||||
|
const DUPLICATE_FIRE_WINDOW_MS = 30_000;
|
||||||
|
|
||||||
/** Random delay between same-group message parts. Just enough for
|
/** Random delay between same-group message parts. Just enough for
|
||||||
* visible ordering in the chat at WA's natural pace. */
|
* visible ordering in the chat at WA's natural pace. */
|
||||||
function partJitterMs(): number {
|
function partJitterMs(): number {
|
||||||
@ -86,6 +96,34 @@ export async function fireReminder(payload: FireReminderPayload): Promise<void>
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Defense-in-depth dedupe: if pg-boss enqueues two reminder.fire jobs
|
||||||
|
// for the same reminderId within microseconds (e.g. a duplicate
|
||||||
|
// schedule call slipped past the queue's singletonKey), the second
|
||||||
|
// worker would otherwise create a SECOND run and the same message
|
||||||
|
// gets sent twice. Bail out if a run for this reminder already exists
|
||||||
|
// and was created less than DUPLICATE_FIRE_WINDOW_MS ago.
|
||||||
|
if (!payload.runId) {
|
||||||
|
const recent = await db.query.reminderRuns.findFirst({
|
||||||
|
where: (r, { eq: dEq, and: dAnd, gt: dGt }) =>
|
||||||
|
dAnd(
|
||||||
|
dEq(r.reminderId, reminder.id),
|
||||||
|
dGt(r.firedAt, new Date(Date.now() - DUPLICATE_FIRE_WINDOW_MS)),
|
||||||
|
),
|
||||||
|
orderBy: (r, { desc }) => [desc(r.firedAt)],
|
||||||
|
});
|
||||||
|
if (recent) {
|
||||||
|
logger.warn(
|
||||||
|
{
|
||||||
|
reminderId: reminder.id,
|
||||||
|
recentRunId: recent.id,
|
||||||
|
recentFiredAt: recent.firedAt,
|
||||||
|
},
|
||||||
|
"fire-reminder: duplicate fire detected (a run for this reminder was just created), skipping",
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Per-account mutex: two reminders on the SAME account take turns
|
// Per-account mutex: two reminders on the SAME account take turns
|
||||||
// (running them concurrently would double the effective send rate
|
// (running them concurrently would double the effective send rate
|
||||||
// and risk a ban). Different accounts run in parallel.
|
// and risk a ban). Different accounts run in parallel.
|
||||||
|
|||||||
@ -6,7 +6,16 @@ import { fireReminder, type FireReminderPayload } from "./fire-reminder.js";
|
|||||||
export const REMINDER_FIRE_QUEUE = "reminder.fire";
|
export const REMINDER_FIRE_QUEUE = "reminder.fire";
|
||||||
|
|
||||||
export async function registerReminderJobs(boss: PgBoss): Promise<void> {
|
export async function registerReminderJobs(boss: PgBoss): Promise<void> {
|
||||||
await boss.createQueue(REMINDER_FIRE_QUEUE);
|
// 'stately' = at most 1 job per (state, singletonKey). Combined with
|
||||||
|
// singletonKey="reminder:<id>" on every send, that means a duplicate
|
||||||
|
// schedule call (e.g. operator double-clicked Save, or the
|
||||||
|
// pg_notify('bot.command') consumer fired twice in the same tick)
|
||||||
|
// is folded into the existing 'created' job instead of producing a
|
||||||
|
// second run. The default 'standard' policy DOES NOT dedupe by
|
||||||
|
// singletonKey — that's how we ended up firing a reminder twice
|
||||||
|
// when two reminder.fire jobs landed within microseconds.
|
||||||
|
// https://github.com/timgit/pg-boss/blob/master/docs/usage.md#queue-policies
|
||||||
|
await boss.createQueue(REMINDER_FIRE_QUEUE, { policy: "stately" });
|
||||||
await boss.work<FireReminderPayload>(
|
await boss.work<FireReminderPayload>(
|
||||||
REMINDER_FIRE_QUEUE,
|
REMINDER_FIRE_QUEUE,
|
||||||
{
|
{
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user