yiekheng a789b61e1f fix(bot): triple-fire reminder bug — force pg-boss policy + close TOCTOU dedupe
Repro: fire a reminder, message lands 2-3 times in WhatsApp (logs
showed three 'fire-reminder: done' entries within 1.5 s for the same
reminderId).

Two interlocking root causes:

  1. The queue was created at 'standard' policy (pre-dating the
     stately rollout). pg-boss's createQueue is idempotent and DOES
     NOT update the policy on an existing queue row, so re-deploying
     the code that requested policy=stately silently kept the
     standard policy. Standard accepts duplicate enqueues with the
     same singletonKey — three reminder.fire jobs for the same
     reminderId could all land at once.

  2. The handler-level recent-run dedupe was TOCTOU. The check ran
     OUTSIDE the per-account mutex, so three concurrent invocations
     all read 'no recent run', then queued up on the mutex one at a
     time and each INSERTed a fresh run + sent the message.

Fixes:

  - registerReminderJobs now forces the queue policy via direct SQL
    (UPDATE pgboss.queue SET policy = 'stately' WHERE name = ...
    AND policy <> 'stately') on every boot. Idempotent + survives
    pre-existing standard-policy rows.
  - fireReminderInner re-checks for a recent run AFTER the mutex is
    held but BEFORE the INSERT. By that point any concurrent winner
    has already inserted, so the duplicate sees the row and bails
    cleanly.

New test in fire-reminder.test.ts (the TOCTOU repro): outer check
returns no recent run, inner check returns a freshly-inserted one,
asserts the mutex was acquired but the second findFirst was hit
(i.e. we got past the outer check and the inner check stopped us).

Verified live: pgboss.queue.policy is now 'stately' for reminder.fire.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 19:43:41 +08:00

544 lines
20 KiB
TypeScript

import { and, eq, inArray } from "drizzle-orm";
import { reminderRuns, reminderRunTargets, reminders } from "@cmbot/db";
import {
generateWAMessageContent,
generateMessageID,
type AnyMessageContent,
type proto,
type WASocket,
} from "@whiskeysockets/baileys";
import pLimit from "p-limit";
import { readFile } from "node:fs/promises";
import { db } from "../db.js";
import { logger } from "../logger.js";
import { sessionManager } from "../whatsapp/session-manager.js";
import {
absoluteMediaPath,
nextOccurrence,
resolveDeliveryKind,
windowEndAt,
} from "@cmbot/shared";
import { env } from "../env.js";
import { writeAuditLog } from "../audit.js";
import { getReminderWithDetails } from "../reminders/crud.js";
import { getBoss } from "./pgboss-client.js";
import { scheduleReminderFire } from "./reminder-jobs.js";
import { pgNotifyWeb } from "../ipc/notify.js";
import { accountMutex } from "./per-key-mutex.js";
import { accountRateLimiter } from "./rate-limiter.js";
import { MediaUploadCache } from "./media-upload-cache.js";
export type FireReminderPayload = {
reminderId: string;
/** Optional resume hook. When present, fire-reminder ATTACHES to
* the existing run instead of creating a new one and only re-tries
* targets in `pending` status. Set by the resume server action. */
runId?: string;
};
/**
* Window in which two fire-reminder jobs for the same reminder are
* treated as duplicates. Generous enough to absorb real-world double-
* submits (the operator clicks Save twice; pg_notify floods the
* command-consumer; pg-boss policy didn't dedupe a microsecond-apart
* race) — short enough that a deliberately rapid recurring schedule
* (e.g. every minute, in dev) still fires every occurrence.
*/
const DUPLICATE_FIRE_WINDOW_MS = 30_000;
/** Random delay between same-group message parts. Just enough for
* visible ordering in the chat at WA's natural pace. */
function partJitterMs(): number {
return 200 + Math.floor(Math.random() * 300); // 200..499
}
/** Baileys's WASocket exposes assertSessions on its internal interface,
* but it isn't part of the public type. Call it once per group before
* the first send so relayMessage doesn't trip on missing sessions. */
type SocketWithAssertSessions = WASocket & {
assertSessions?: (jids: string[], force: boolean) => Promise<boolean>;
};
async function ensureGroupSessions(socket: WASocket, groupJid: string): Promise<void> {
const internal = socket as SocketWithAssertSessions;
if (typeof internal.assertSessions !== "function") return;
const meta = await socket.groupMetadata(groupJid);
const participantJids = meta.participants.map((p) => p.id);
// Chunk so a single bad participant doesn't fail the whole group.
const CHUNK = 5;
for (let i = 0; i < participantJids.length; i += CHUNK) {
const chunk = participantJids.slice(i, i + CHUNK);
try {
await internal.assertSessions(chunk, true);
} catch (err) {
logger.warn(
{ groupJid, err: (err as Error).message },
"fire-reminder: assertSessions chunk failed",
);
}
}
}
export async function fireReminder(payload: FireReminderPayload): Promise<void> {
const reminder = await getReminderWithDetails(payload.reminderId);
if (!reminder) {
logger.warn({ reminderId: payload.reminderId }, "fire-reminder: reminder not found");
return;
}
// Resumes are allowed even when the reminder's lifecycle status is
// 'paused' — we WANT to take a paused reminder back to active mid-
// resume. Fresh fires still require status='active'.
if (!payload.runId && reminder.status !== "active") {
logger.info(
{ reminderId: reminder.id, status: reminder.status },
"fire-reminder: skipping (not active)",
);
return;
}
// Defense-in-depth dedupe: if pg-boss enqueues two reminder.fire jobs
// for the same reminderId within microseconds (e.g. a duplicate
// schedule call slipped past the queue's singletonKey), the second
// worker would otherwise create a SECOND run and the same message
// gets sent twice. Bail out if a run for this reminder already exists
// and was created less than DUPLICATE_FIRE_WINDOW_MS ago.
if (!payload.runId) {
const recent = await db.query.reminderRuns.findFirst({
where: (r, { eq: dEq, and: dAnd, gt: dGt }) =>
dAnd(
dEq(r.reminderId, reminder.id),
dGt(r.firedAt, new Date(Date.now() - DUPLICATE_FIRE_WINDOW_MS)),
),
orderBy: (r, { desc }) => [desc(r.firedAt)],
});
if (recent) {
logger.warn(
{
reminderId: reminder.id,
recentRunId: recent.id,
recentFiredAt: recent.firedAt,
},
"fire-reminder: duplicate fire detected (a run for this reminder was just created), skipping",
);
return;
}
}
// Per-account mutex: two reminders on the SAME account take turns
// (running them concurrently would double the effective send rate
// and risk a ban). Different accounts run in parallel.
await accountMutex.run(reminder.accountId, () => fireReminderInner(reminder, payload.runId));
}
async function fireReminderInner(
reminder: NonNullable<Awaited<ReturnType<typeof getReminderWithDetails>>>,
resumeRunId?: string,
): Promise<void> {
// Resume path attaches to the existing run row; fresh path inserts a new one.
let runId: string;
if (resumeRunId) {
const existing = await db.query.reminderRuns.findFirst({
where: (r, { eq: dEq }) => dEq(r.id, resumeRunId),
});
if (!existing) {
logger.warn(
{ reminderId: reminder.id, resumeRunId },
"fire-reminder: resume target run missing",
);
return;
}
runId = existing.id;
// Flip the run back to in-flight so the UI stops showing it as paused.
await db
.update(reminderRuns)
.set({ status: "pending", errorSummary: null })
.where(eq(reminderRuns.id, runId));
} else {
// Re-check the dedupe window now that we're inside the per-account
// mutex. The outer check in fireReminder() is a fast-path bail-out
// but it's TOCTOU: three concurrent jobs can all read "no recent
// run" before any of them inserts, so the message gets sent 2-3
// times. Inside the mutex, the queue serialises us — by the time
// duplicate #2 reaches this point, duplicate #1 has already
// INSERTed and we'll find that row here.
const recent = await db.query.reminderRuns.findFirst({
where: (r, { eq: dEq, and: dAnd, gt: dGt }) =>
dAnd(
dEq(r.reminderId, reminder.id),
dGt(r.firedAt, new Date(Date.now() - DUPLICATE_FIRE_WINDOW_MS)),
),
orderBy: (r, { desc }) => [desc(r.firedAt)],
});
if (recent) {
logger.warn(
{
reminderId: reminder.id,
recentRunId: recent.id,
recentFiredAt: recent.firedAt,
},
"fire-reminder: duplicate fire detected inside mutex (a run was just inserted by a concurrent job), skipping",
);
return;
}
const [run] = await db
.insert(reminderRuns)
.values({
reminderId: reminder.id,
reminderName: reminder.name,
status: "pending",
})
.returning({ id: reminderRuns.id });
runId = run!.id;
}
const session = sessionManager.getSession(reminder.accountId);
if (!session) {
logger.warn({ reminderId: reminder.id }, "fire-reminder: account not connected");
if (!resumeRunId) {
await markAllSkipped(runId, reminder, "account not connected");
}
await db
.update(reminderRuns)
.set({ status: "skipped", errorSummary: "account not connected" })
.where(eq(reminderRuns.id, runId));
await pgNotifyWeb({
type: "reminder.fired",
reminderId: reminder.id,
runId,
status: "skipped",
});
return;
}
// Up-front bulk loads. Drops ~3000 round-trips to ~3 for a 1000-group run.
const groupIds = reminder.targets.map((t) => t.groupId);
const groupRows = groupIds.length
? await db.query.whatsappGroups.findMany({ where: (g) => inArray(g.id, groupIds) })
: [];
const groupById = new Map(groupRows.map((g) => [g.id, g]));
const mediaIds = Array.from(
new Set(reminder.messages.map((m) => m.mediaId).filter((id): id is string => Boolean(id))),
);
const mediaRows = mediaIds.length
? await db.query.mediaFiles.findMany({ where: (m) => inArray(m.id, mediaIds) })
: [];
const mediaById = new Map(mediaRows.map((m) => [m.id, m]));
// Pre-create run_target rows on the fresh path so the Activity tab
// shows progress mid-run. Resume reuses the existing rows.
if (!resumeRunId && reminder.targets.length > 0) {
await db.insert(reminderRunTargets).values(
reminder.targets.map((t) => ({
runId,
groupId: t.groupId,
groupLabel: groupById.get(t.groupId)?.name ?? null,
status: "pending" as const,
})),
);
}
// On resume, only the still-pending rows are processed. On a fresh
// fire that's every row since we just inserted them all as pending.
const pendingRows = await db.query.reminderRunTargets.findMany({
where: (t, { eq: dEq, and: dAnd }) => dAnd(dEq(t.runId, runId), dEq(t.status, "pending")),
});
const pendingGroupIds = new Set(pendingRows.map((r) => r.groupId));
const targetsToProcess = reminder.targets.filter((t) => pendingGroupIds.has(t.groupId));
// Already-sent / already-failed counts from prior run rounds (resume
// case). The final tally adds these to what THIS round produces.
const priorSentCount = resumeRunId
? (
await db.query.reminderRunTargets.findMany({
where: (t, { eq: dEq, and: dAnd }) =>
dAnd(dEq(t.runId, runId), dEq(t.status, "sent")),
})
).length
: 0;
const priorFailedCount = resumeRunId
? (
await db.query.reminderRunTargets.findMany({
where: (t, { eq: dEq, and: dAnd }) =>
dAnd(dEq(t.runId, runId), dEq(t.status, "failed")),
})
).length
: 0;
// Window-end timestamp. If the reminder fires AFTER today's deadline
// hour (cron miss-fired late, or it's already 7pm) this is in the
// past and the FIRST gate check trips immediately, ending the run
// as failed without sending anything.
const windowEnd = windowEndAt(
reminder.timezone,
reminder.deliveryWindowEndHour,
new Date(),
);
// Per-run media upload cache (one prepare call per unique mediaId).
const uploadCache = new MediaUploadCache<proto.IMessage>(async (mediaId) => {
const media = mediaById.get(mediaId);
if (!media) throw new Error(`media row missing: ${mediaId}`);
const filePath = absoluteMediaPath(media.storagePath, env.MEDIA_DIR);
const buffer = await readFile(filePath);
const head = buffer.subarray(0, 12);
const resolved = resolveDeliveryKind(media.mimeType, head);
const senderKind: "image" | "video" | "document" =
resolved === "image" || resolved === "video" ? resolved : "document";
const content: AnyMessageContent =
senderKind === "image"
? { image: buffer, mimetype: media.mimeType }
: senderKind === "video"
? { video: buffer, mimetype: media.mimeType }
: {
document: buffer,
fileName: media.filenameOriginal,
mimetype: media.mimeType,
};
return generateWAMessageContent(content, {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
upload: (session.socket as any).waUploadToServer,
});
});
// Per-account rate limiter — gates each socket send.
const rateLimiter = accountRateLimiter.get(reminder.accountId);
let sentCount = 0;
let failedCount = 0;
let skippedCount = 0;
let windowClosed = false;
const groupConcurrency = pLimit(env.BOT_GROUP_CONCURRENCY);
await Promise.all(
targetsToProcess.map((target) =>
groupConcurrency(async () => {
// Window-end gate. CRITICAL: leave the row as `pending` (NOT
// `skipped`) so the run can be resumed later.
if (Date.now() >= windowEnd.getTime()) {
windowClosed = true;
return;
}
const group = groupById.get(target.groupId);
if (!group) {
await db
.update(reminderRunTargets)
.set({ status: "skipped", error: "group missing from db" })
.where(
and(
eq(reminderRunTargets.runId, runId),
eq(reminderRunTargets.groupId, target.groupId),
),
);
skippedCount++;
return;
}
const start = Date.now();
try {
await ensureGroupSessions(session.socket, group.waGroupJid);
let lastMessageId: string | undefined;
for (const part of reminder.messages) {
await rateLimiter.acquire();
if (part.kind === "text" && part.textContent) {
const r = await session.socket.sendMessage(group.waGroupJid, {
text: part.textContent,
});
lastMessageId = r?.key?.id ?? undefined;
} else if (part.mediaId) {
const prebuilt = await uploadCache.get(part.mediaId);
if (part.textContent) injectCaption(prebuilt, part.textContent);
const messageId = generateMessageID();
await session.socket.relayMessage(group.waGroupJid, prebuilt, { messageId });
lastMessageId = messageId;
}
await new Promise((r) => setTimeout(r, partJitterMs()));
}
await db
.update(reminderRunTargets)
.set({
status: "sent",
waMessageId: lastMessageId ?? null,
latencyMs: Date.now() - start,
})
.where(
and(
eq(reminderRunTargets.runId, runId),
eq(reminderRunTargets.groupId, target.groupId),
),
);
sentCount++;
} catch (err) {
logger.error(
{ err, reminderId: reminder.id, groupId: target.groupId },
"fire-reminder: send failed",
);
await db
.update(reminderRunTargets)
.set({ status: "failed", error: (err as Error).message })
.where(
and(
eq(reminderRunTargets.runId, runId),
eq(reminderRunTargets.groupId, target.groupId),
),
);
failedCount++;
}
}),
),
);
// Compose the final status. Four shapes:
// paused : window closed mid-run with at least one row still pending
// AND we delivered at least one in this run or a prior round.
// Resumable. Sent rows stay sent, pending stays pending.
// success : every target sent.
// partial : every target attempted; some sent, some failed/skipped.
// failed : zero sent across all rounds, OR window closed before the
// first send (no progress to resume).
const total = reminder.targets.length;
const totalSent = priorSentCount + sentCount;
const totalFailed = priorFailedCount + failedCount;
const remainingPending = (
await db.query.reminderRunTargets.findMany({
where: (t, { eq: dEq, and: dAnd }) =>
dAnd(dEq(t.runId, runId), dEq(t.status, "pending")),
})
).length;
let status: "success" | "partial" | "failed" | "paused";
let errorSummary: string | null = null;
if (windowClosed && remainingPending > 0 && totalSent > 0) {
status = "paused";
errorSummary = `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}). ${totalSent} of ${total} groups delivered, ${remainingPending} still pending. Resume from the Activity tab.`;
} else if (windowClosed && totalSent === 0) {
status = "failed";
errorSummary = `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}) before any group could be sent. The reminder fired too late in the day.`;
} else if (totalSent === total) {
status = "success";
} else if (totalSent > 0) {
status = "partial";
errorSummary = `${totalSent} of ${total} groups delivered (${totalFailed} failed, ${skippedCount} skipped).`;
} else {
status = "failed";
errorSummary = total === 0 ? "No targets attached to reminder." : `All ${total} sends failed.`;
}
await db
.update(reminderRuns)
.set({ status, errorSummary })
.where(eq(reminderRuns.id, runId));
await pgNotifyWeb({
type: "reminder.fired",
reminderId: reminder.id,
runId,
status,
sent: totalSent,
total,
});
// Lifecycle bookkeeping. Skip when the run is paused — the reminder
// shouldn't end or re-arm while a resume is still possible. We also
// flip the reminder row itself to status='paused' so dashboards and
// the list view can reflect it.
if (status === "paused") {
await db
.update(reminders)
.set({ status: "paused", updatedAt: new Date() })
.where(eq(reminders.id, reminder.id));
logger.info(
{ reminderId: reminder.id, runId, totalSent, remainingPending },
"fire-reminder: paused — leaving lifecycle alone for resume",
);
} else {
if (reminder.scheduleKind === "one_off") {
await db
.update(reminders)
.set({ status: "inactive", updatedAt: new Date() })
.where(eq(reminders.id, reminder.id));
} else if (reminder.scheduleKind === "recurring" && reminder.rrule) {
const next = nextOccurrence(reminder.rrule, reminder.timezone, new Date());
await db
.update(reminders)
.set({
// If we're resuming a previously-paused reminder, lift it
// back to active so the next cron occurrence fires normally.
status: "active",
lastFiredAt: new Date(),
updatedAt: new Date(),
})
.where(eq(reminders.id, reminder.id));
if (next) {
try {
await scheduleReminderFire(getBoss(), reminder.id, next);
logger.info({ reminderId: reminder.id, next }, "fire-reminder: re-armed for next occurrence");
} catch (err) {
logger.error({ err, reminderId: reminder.id }, "fire-reminder: failed to re-arm next occurrence");
}
} else {
logger.info({ reminderId: reminder.id }, "fire-reminder: no further occurrences, ending");
await db.update(reminders).set({ status: "inactive" }).where(eq(reminders.id, reminder.id));
}
}
}
await writeAuditLog(db, {
operatorId: reminder.createdBy,
source: "system",
action: "reminder.fired",
targetType: "reminder",
targetId: reminder.id,
payload: { runId, status, sent: sentCount, failed: failedCount, skipped: skippedCount },
});
logger.info(
{
reminderId: reminder.id,
runId,
status,
sent: sentCount,
failed: failedCount,
skipped: skippedCount,
windowClosed,
},
"fire-reminder: done",
);
}
async function markAllSkipped(
runId: string,
reminder: NonNullable<Awaited<ReturnType<typeof getReminderWithDetails>>>,
error: string,
): Promise<void> {
if (reminder.targets.length === 0) return;
const rows = await db.query.whatsappGroups.findMany({
where: (g) => inArray(g.id, reminder.targets.map((t) => t.groupId)),
columns: { id: true, name: true },
});
const labelById = new Map(rows.map((r) => [r.id, r.name]));
await db.insert(reminderRunTargets).values(
reminder.targets.map((t) => ({
runId,
groupId: t.groupId,
groupLabel: labelById.get(t.groupId) ?? null,
status: "skipped" as const,
error,
})),
);
}
/**
* Inject the caption into the prebuilt media message. Baileys' relayMessage
* doesn't take a caption alongside the content; the protobuf already has
* the slot, so we mutate it just before relaying.
*/
function injectCaption(msg: proto.IMessage, caption: string): void {
if (msg.imageMessage) msg.imageMessage.caption = caption;
else if (msg.videoMessage) msg.videoMessage.caption = caption;
else if (msg.documentMessage) msg.documentMessage.caption = caption;
}