import { and, eq, inArray } from "drizzle-orm"; import { reminderRuns, reminderRunTargets, reminders } from "@cmbot/db"; import { generateWAMessageContent, generateMessageID, type AnyMessageContent, type proto, type WASocket, } from "@whiskeysockets/baileys"; import pLimit from "p-limit"; import { readFile } from "node:fs/promises"; import { db } from "../db.js"; import { logger } from "../logger.js"; import { sessionManager } from "../whatsapp/session-manager.js"; import { absoluteMediaPath, nextOccurrence, resolveDeliveryKind, windowEndAt, } from "@cmbot/shared"; import { env } from "../env.js"; import { writeAuditLog } from "../audit.js"; import { getReminderWithDetails } from "../reminders/crud.js"; import { getBoss } from "./pgboss-client.js"; import { scheduleReminderFire } from "./reminder-jobs.js"; import { pgNotifyWeb } from "../ipc/notify.js"; import { accountMutex } from "./per-key-mutex.js"; import { accountRateLimiter } from "./rate-limiter.js"; import { MediaUploadCache } from "./media-upload-cache.js"; export type FireReminderPayload = { reminderId: string; /** Optional resume hook. When present, fire-reminder ATTACHES to * the existing run instead of creating a new one and only re-tries * targets in `pending` status. Set by the resume server action. */ runId?: string; }; /** * Window in which two fire-reminder jobs for the same reminder are * treated as duplicates. Generous enough to absorb real-world double- * submits (the operator clicks Save twice; pg_notify floods the * command-consumer; pg-boss policy didn't dedupe a microsecond-apart * race) — short enough that a deliberately rapid recurring schedule * (e.g. every minute, in dev) still fires every occurrence. */ const DUPLICATE_FIRE_WINDOW_MS = 30_000; /** Random delay between same-group message parts. Just enough for * visible ordering in the chat at WA's natural pace. */ function partJitterMs(): number { return 200 + Math.floor(Math.random() * 300); // 200..499 } /** Baileys's WASocket exposes assertSessions on its internal interface, * but it isn't part of the public type. Call it once per group before * the first send so relayMessage doesn't trip on missing sessions. */ type SocketWithAssertSessions = WASocket & { assertSessions?: (jids: string[], force: boolean) => Promise; }; async function ensureGroupSessions(socket: WASocket, groupJid: string): Promise { const internal = socket as SocketWithAssertSessions; if (typeof internal.assertSessions !== "function") return; const meta = await socket.groupMetadata(groupJid); const participantJids = meta.participants.map((p) => p.id); // Chunk so a single bad participant doesn't fail the whole group. const CHUNK = 5; for (let i = 0; i < participantJids.length; i += CHUNK) { const chunk = participantJids.slice(i, i + CHUNK); try { await internal.assertSessions(chunk, true); } catch (err) { logger.warn( { groupJid, err: (err as Error).message }, "fire-reminder: assertSessions chunk failed", ); } } } export async function fireReminder(payload: FireReminderPayload): Promise { const reminder = await getReminderWithDetails(payload.reminderId); if (!reminder) { logger.warn({ reminderId: payload.reminderId }, "fire-reminder: reminder not found"); return; } // Resumes are allowed even when the reminder's lifecycle status is // 'paused' — we WANT to take a paused reminder back to active mid- // resume. Fresh fires still require status='active'. if (!payload.runId && reminder.status !== "active") { logger.info( { reminderId: reminder.id, status: reminder.status }, "fire-reminder: skipping (not active)", ); return; } // Defense-in-depth dedupe: if pg-boss enqueues two reminder.fire jobs // for the same reminderId within microseconds (e.g. a duplicate // schedule call slipped past the queue's singletonKey), the second // worker would otherwise create a SECOND run and the same message // gets sent twice. Bail out if a run for this reminder already exists // and was created less than DUPLICATE_FIRE_WINDOW_MS ago. if (!payload.runId) { const recent = await db.query.reminderRuns.findFirst({ where: (r, { eq: dEq, and: dAnd, gt: dGt }) => dAnd( dEq(r.reminderId, reminder.id), dGt(r.firedAt, new Date(Date.now() - DUPLICATE_FIRE_WINDOW_MS)), ), orderBy: (r, { desc }) => [desc(r.firedAt)], }); if (recent) { logger.warn( { reminderId: reminder.id, recentRunId: recent.id, recentFiredAt: recent.firedAt, }, "fire-reminder: duplicate fire detected (a run for this reminder was just created), skipping", ); return; } } // Per-account mutex: two reminders on the SAME account take turns // (running them concurrently would double the effective send rate // and risk a ban). Different accounts run in parallel. await accountMutex.run(reminder.accountId, () => fireReminderInner(reminder, payload.runId)); } async function fireReminderInner( reminder: NonNullable>>, resumeRunId?: string, ): Promise { // Resume path attaches to the existing run row; fresh path inserts a new one. let runId: string; if (resumeRunId) { const existing = await db.query.reminderRuns.findFirst({ where: (r, { eq: dEq }) => dEq(r.id, resumeRunId), }); if (!existing) { logger.warn( { reminderId: reminder.id, resumeRunId }, "fire-reminder: resume target run missing", ); return; } runId = existing.id; // Flip the run back to in-flight so the UI stops showing it as paused. await db .update(reminderRuns) .set({ status: "pending", errorSummary: null }) .where(eq(reminderRuns.id, runId)); } else { // Re-check the dedupe window now that we're inside the per-account // mutex. The outer check in fireReminder() is a fast-path bail-out // but it's TOCTOU: three concurrent jobs can all read "no recent // run" before any of them inserts, so the message gets sent 2-3 // times. Inside the mutex, the queue serialises us — by the time // duplicate #2 reaches this point, duplicate #1 has already // INSERTed and we'll find that row here. const recent = await db.query.reminderRuns.findFirst({ where: (r, { eq: dEq, and: dAnd, gt: dGt }) => dAnd( dEq(r.reminderId, reminder.id), dGt(r.firedAt, new Date(Date.now() - DUPLICATE_FIRE_WINDOW_MS)), ), orderBy: (r, { desc }) => [desc(r.firedAt)], }); if (recent) { logger.warn( { reminderId: reminder.id, recentRunId: recent.id, recentFiredAt: recent.firedAt, }, "fire-reminder: duplicate fire detected inside mutex (a run was just inserted by a concurrent job), skipping", ); return; } const [run] = await db .insert(reminderRuns) .values({ reminderId: reminder.id, reminderName: reminder.name, status: "pending", }) .returning({ id: reminderRuns.id }); runId = run!.id; } const session = sessionManager.getSession(reminder.accountId); if (!session) { logger.warn({ reminderId: reminder.id }, "fire-reminder: account not connected"); if (!resumeRunId) { await markAllSkipped(runId, reminder, "account not connected"); } await db .update(reminderRuns) .set({ status: "skipped", errorSummary: "account not connected" }) .where(eq(reminderRuns.id, runId)); await pgNotifyWeb({ type: "reminder.fired", reminderId: reminder.id, runId, status: "skipped", }); return; } // Up-front bulk loads. Drops ~3000 round-trips to ~3 for a 1000-group run. const groupIds = reminder.targets.map((t) => t.groupId); const groupRows = groupIds.length ? await db.query.whatsappGroups.findMany({ where: (g) => inArray(g.id, groupIds) }) : []; const groupById = new Map(groupRows.map((g) => [g.id, g])); const mediaIds = Array.from( new Set(reminder.messages.map((m) => m.mediaId).filter((id): id is string => Boolean(id))), ); const mediaRows = mediaIds.length ? await db.query.mediaFiles.findMany({ where: (m) => inArray(m.id, mediaIds) }) : []; const mediaById = new Map(mediaRows.map((m) => [m.id, m])); // Pre-create run_target rows on the fresh path so the Activity tab // shows progress mid-run. Resume reuses the existing rows. if (!resumeRunId && reminder.targets.length > 0) { await db.insert(reminderRunTargets).values( reminder.targets.map((t) => ({ runId, groupId: t.groupId, groupLabel: groupById.get(t.groupId)?.name ?? null, status: "pending" as const, })), ); } // On resume, only the still-pending rows are processed. On a fresh // fire that's every row since we just inserted them all as pending. const pendingRows = await db.query.reminderRunTargets.findMany({ where: (t, { eq: dEq, and: dAnd }) => dAnd(dEq(t.runId, runId), dEq(t.status, "pending")), }); const pendingGroupIds = new Set(pendingRows.map((r) => r.groupId)); const targetsToProcess = reminder.targets.filter((t) => pendingGroupIds.has(t.groupId)); // Already-sent / already-failed counts from prior run rounds (resume // case). The final tally adds these to what THIS round produces. const priorSentCount = resumeRunId ? ( await db.query.reminderRunTargets.findMany({ where: (t, { eq: dEq, and: dAnd }) => dAnd(dEq(t.runId, runId), dEq(t.status, "sent")), }) ).length : 0; const priorFailedCount = resumeRunId ? ( await db.query.reminderRunTargets.findMany({ where: (t, { eq: dEq, and: dAnd }) => dAnd(dEq(t.runId, runId), dEq(t.status, "failed")), }) ).length : 0; // Window-end timestamp. If the reminder fires AFTER today's deadline // hour (cron miss-fired late, or it's already 7pm) this is in the // past and the FIRST gate check trips immediately, ending the run // as failed without sending anything. const windowEnd = windowEndAt( reminder.timezone, reminder.deliveryWindowEndHour, new Date(), ); // Per-run media upload cache (one prepare call per unique mediaId). const uploadCache = new MediaUploadCache(async (mediaId) => { const media = mediaById.get(mediaId); if (!media) throw new Error(`media row missing: ${mediaId}`); const filePath = absoluteMediaPath(media.storagePath, env.MEDIA_DIR); const buffer = await readFile(filePath); const head = buffer.subarray(0, 12); const resolved = resolveDeliveryKind(media.mimeType, head); const senderKind: "image" | "video" | "document" = resolved === "image" || resolved === "video" ? resolved : "document"; const content: AnyMessageContent = senderKind === "image" ? { image: buffer, mimetype: media.mimeType } : senderKind === "video" ? { video: buffer, mimetype: media.mimeType } : { document: buffer, fileName: media.filenameOriginal, mimetype: media.mimeType, }; return generateWAMessageContent(content, { // eslint-disable-next-line @typescript-eslint/no-explicit-any upload: (session.socket as any).waUploadToServer, }); }); // Per-account rate limiter — gates each socket send. const rateLimiter = accountRateLimiter.get(reminder.accountId); let sentCount = 0; let failedCount = 0; let skippedCount = 0; let windowClosed = false; const groupConcurrency = pLimit(env.BOT_GROUP_CONCURRENCY); await Promise.all( targetsToProcess.map((target) => groupConcurrency(async () => { // Window-end gate. CRITICAL: leave the row as `pending` (NOT // `skipped`) so the run can be resumed later. if (Date.now() >= windowEnd.getTime()) { windowClosed = true; return; } const group = groupById.get(target.groupId); if (!group) { await db .update(reminderRunTargets) .set({ status: "skipped", error: "group missing from db" }) .where( and( eq(reminderRunTargets.runId, runId), eq(reminderRunTargets.groupId, target.groupId), ), ); skippedCount++; return; } const start = Date.now(); try { await ensureGroupSessions(session.socket, group.waGroupJid); let lastMessageId: string | undefined; for (const part of reminder.messages) { await rateLimiter.acquire(); if (part.kind === "text" && part.textContent) { const r = await session.socket.sendMessage(group.waGroupJid, { text: part.textContent, }); lastMessageId = r?.key?.id ?? undefined; } else if (part.mediaId) { const prebuilt = await uploadCache.get(part.mediaId); if (part.textContent) injectCaption(prebuilt, part.textContent); const messageId = generateMessageID(); await session.socket.relayMessage(group.waGroupJid, prebuilt, { messageId }); lastMessageId = messageId; } await new Promise((r) => setTimeout(r, partJitterMs())); } await db .update(reminderRunTargets) .set({ status: "sent", waMessageId: lastMessageId ?? null, latencyMs: Date.now() - start, }) .where( and( eq(reminderRunTargets.runId, runId), eq(reminderRunTargets.groupId, target.groupId), ), ); sentCount++; } catch (err) { logger.error( { err, reminderId: reminder.id, groupId: target.groupId }, "fire-reminder: send failed", ); await db .update(reminderRunTargets) .set({ status: "failed", error: (err as Error).message }) .where( and( eq(reminderRunTargets.runId, runId), eq(reminderRunTargets.groupId, target.groupId), ), ); failedCount++; } }), ), ); // Compose the final status. Four shapes: // paused : window closed mid-run with at least one row still pending // AND we delivered at least one in this run or a prior round. // Resumable. Sent rows stay sent, pending stays pending. // success : every target sent. // partial : every target attempted; some sent, some failed/skipped. // failed : zero sent across all rounds, OR window closed before the // first send (no progress to resume). const total = reminder.targets.length; const totalSent = priorSentCount + sentCount; const totalFailed = priorFailedCount + failedCount; const remainingPending = ( await db.query.reminderRunTargets.findMany({ where: (t, { eq: dEq, and: dAnd }) => dAnd(dEq(t.runId, runId), dEq(t.status, "pending")), }) ).length; let status: "success" | "partial" | "failed" | "paused"; let errorSummary: string | null = null; if (windowClosed && remainingPending > 0 && totalSent > 0) { status = "paused"; errorSummary = `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}). ${totalSent} of ${total} groups delivered, ${remainingPending} still pending. Resume from the Activity tab.`; } else if (windowClosed && totalSent === 0) { status = "failed"; errorSummary = `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}) before any group could be sent. The reminder fired too late in the day.`; } else if (totalSent === total) { status = "success"; } else if (totalSent > 0) { status = "partial"; errorSummary = `${totalSent} of ${total} groups delivered (${totalFailed} failed, ${skippedCount} skipped).`; } else { status = "failed"; errorSummary = total === 0 ? "No targets attached to reminder." : `All ${total} sends failed.`; } await db .update(reminderRuns) .set({ status, errorSummary }) .where(eq(reminderRuns.id, runId)); await pgNotifyWeb({ type: "reminder.fired", reminderId: reminder.id, runId, status, sent: totalSent, total, }); // Lifecycle bookkeeping. Skip when the run is paused — the reminder // shouldn't end or re-arm while a resume is still possible. We also // flip the reminder row itself to status='paused' so dashboards and // the list view can reflect it. if (status === "paused") { await db .update(reminders) .set({ status: "paused", updatedAt: new Date() }) .where(eq(reminders.id, reminder.id)); logger.info( { reminderId: reminder.id, runId, totalSent, remainingPending }, "fire-reminder: paused — leaving lifecycle alone for resume", ); } else { if (reminder.scheduleKind === "one_off") { await db .update(reminders) .set({ status: "inactive", updatedAt: new Date() }) .where(eq(reminders.id, reminder.id)); } else if (reminder.scheduleKind === "recurring" && reminder.rrule) { const next = nextOccurrence(reminder.rrule, reminder.timezone, new Date()); await db .update(reminders) .set({ // If we're resuming a previously-paused reminder, lift it // back to active so the next cron occurrence fires normally. status: "active", lastFiredAt: new Date(), updatedAt: new Date(), }) .where(eq(reminders.id, reminder.id)); if (next) { try { await scheduleReminderFire(getBoss(), reminder.id, next); logger.info({ reminderId: reminder.id, next }, "fire-reminder: re-armed for next occurrence"); } catch (err) { logger.error({ err, reminderId: reminder.id }, "fire-reminder: failed to re-arm next occurrence"); } } else { logger.info({ reminderId: reminder.id }, "fire-reminder: no further occurrences, ending"); await db.update(reminders).set({ status: "inactive" }).where(eq(reminders.id, reminder.id)); } } } await writeAuditLog(db, { operatorId: reminder.createdBy, source: "system", action: "reminder.fired", targetType: "reminder", targetId: reminder.id, payload: { runId, status, sent: sentCount, failed: failedCount, skipped: skippedCount }, }); logger.info( { reminderId: reminder.id, runId, status, sent: sentCount, failed: failedCount, skipped: skippedCount, windowClosed, }, "fire-reminder: done", ); } async function markAllSkipped( runId: string, reminder: NonNullable>>, error: string, ): Promise { if (reminder.targets.length === 0) return; const rows = await db.query.whatsappGroups.findMany({ where: (g) => inArray(g.id, reminder.targets.map((t) => t.groupId)), columns: { id: true, name: true }, }); const labelById = new Map(rows.map((r) => [r.id, r.name])); await db.insert(reminderRunTargets).values( reminder.targets.map((t) => ({ runId, groupId: t.groupId, groupLabel: labelById.get(t.groupId) ?? null, status: "skipped" as const, error, })), ); } /** * Inject the caption into the prebuilt media message. Baileys' relayMessage * doesn't take a caption alongside the content; the protobuf already has * the slot, so we mutate it just before relaying. */ function injectCaption(msg: proto.IMessage, caption: string): void { if (msg.imageMessage) msg.imageMessage.caption = caption; else if (msg.videoMessage) msg.videoMessage.caption = caption; else if (msg.documentMessage) msg.documentMessage.caption = caption; }