From 57786f9d0979cb8e24e86e30e1619b86d53bdeaa Mon Sep 17 00:00:00 2001 From: yiekheng Date: Sun, 10 May 2026 15:48:52 +0800 Subject: [PATCH] feat(bot,web): window-end gate + paused/resume run lifecycle fire-reminder.ts now: * Computes windowEnd via @cmbot/shared/windowEndAt(timezone, endHour, now). Per-target loop trips the gate before sending; pending rows are LEFT pending (not flipped to skipped) so the run is resumable. * Accepts an optional runId on the FireReminderPayload. When set, the handler ATTACHES to that existing run instead of creating a new one and only re-tries pending targets. Resume is allowed even when the reminder.status is 'paused' (otherwise we couldn't drag it back into delivery). * Final-status logic adds a 'paused' branch (window closed mid-run with at least one row still pending AND something delivered); failed when window closed before any send; partial / success otherwise. * Lifecycle: a paused run flips the reminder row to status='paused' and skips the recurring re-arm. Resuming or completing later flips it back to 'active'. * SSE event payload gains optional sent/total counts. reminderFiredToNotification picks up: * New 'paused' headline + 'X of Y groups delivered. Tap to resume or cancel.' body. * 'partial' body uses sent/total when present. WebEventMap and the bot's WebEvent union match the new shape. Tests: * fire-reminder.test.ts gains a "resume against paused reminder acquires mutex" case. * notifications.test.ts gains 3 paused/partial-sent body cases. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/bot/src/ipc/notify.ts | 12 +- apps/bot/src/scheduler/fire-reminder.test.ts | 48 +++- apps/bot/src/scheduler/fire-reminder.ts | 246 +++++++++++++++---- apps/web/src/hooks/use-events.ts | 8 +- apps/web/src/lib/notifications.test.ts | 38 +++ apps/web/src/lib/notifications.ts | 29 ++- 6 files changed, 320 insertions(+), 61 deletions(-) diff --git a/apps/bot/src/ipc/notify.ts b/apps/bot/src/ipc/notify.ts index adc869b..4f9265f 100644 --- a/apps/bot/src/ipc/notify.ts +++ b/apps/bot/src/ipc/notify.ts @@ -11,7 +11,17 @@ export type WebEvent = | { type: "session.disconnected"; accountId: string } | { type: "session.timeout"; accountId: string } | { type: "groups.synced"; accountId: string; count: number } - | { type: "reminder.fired"; reminderId: string; runId: string; status: string } + | { + type: "reminder.fired"; + reminderId: string; + runId: string; + status: string; + // Optional delivered/total counts so the web side can render + // "X of Y groups delivered" in the paused-status notification + // body. Omitted on terminal-status events that don't need them. + sent?: number; + total?: number; + } | { type: "reminder.failed"; reminderId: string; error: string } // The web action enqueues a send_test via pg_notify and shows // "Sending…" optimistically. This event closes the loop. diff --git a/apps/bot/src/scheduler/fire-reminder.test.ts b/apps/bot/src/scheduler/fire-reminder.test.ts index d59886e..de2f3a3 100644 --- a/apps/bot/src/scheduler/fire-reminder.test.ts +++ b/apps/bot/src/scheduler/fire-reminder.test.ts @@ -18,13 +18,24 @@ const getReminderMock = vi.fn(); vi.mock("../reminders/crud.js", () => ({ getReminderWithDetails: (...args: unknown[]) => getReminderMock(...args), })); +// Drizzle's chainable query builders are mocked just deeply enough to +// let fire-reminder's happy path (and the resume path) walk through. +const findExistingRunMock = vi.fn(); vi.mock("../db.js", () => ({ db: { - insert: () => ({ values: () => ({ returning: async () => [{ id: "run-1" }] }) }), + insert: () => ({ + values: () => ({ + returning: async () => [{ id: "run-1" }], + }), + // Targets path: no .returning() chained. + values_no_returning: async () => undefined, + }), update: () => ({ set: () => ({ where: async () => undefined }) }), query: { whatsappGroups: { findMany: async () => [] }, mediaFiles: { findMany: async () => [] }, + reminderRunTargets: { findMany: async () => [] }, + reminderRuns: { findFirst: (...args: unknown[]) => findExistingRunMock(...args) }, }, }, })); @@ -56,6 +67,8 @@ describe("fireReminder", () => { scheduleKind: "one_off", rrule: null, timezone: "Asia/Kuala_Lumpur", + deliveryWindowStartHour: 6, + deliveryWindowEndHour: 18, name: "Test", }); @@ -76,6 +89,8 @@ describe("fireReminder", () => { scheduleKind: "one_off", rrule: null, timezone: "Asia/Kuala_Lumpur", + deliveryWindowStartHour: 6, + deliveryWindowEndHour: 18, name: "Test", }); @@ -92,6 +107,33 @@ describe("fireReminder", () => { expect(accountMutex.run).not.toHaveBeenCalled(); }); + it("DOES acquire the mutex on a resume even when the reminder is paused", async () => { + // Resume path must allow status='paused' (and 'active') so the + // operator can drag a paused reminder back into delivery. Fresh + // fires still require status='active'; that's covered by the + // earlier "inactive" test. + getReminderMock.mockResolvedValue({ + id: "r-1", + accountId: "acct-A", + status: "paused", + targets: [], + messages: [], + createdBy: "op-1", + scheduleKind: "one_off", + rrule: null, + timezone: "Asia/Kuala_Lumpur", + deliveryWindowStartHour: 6, + deliveryWindowEndHour: 18, + name: "Test", + }); + findExistingRunMock.mockResolvedValue({ id: "run-existing" }); + + await fireReminder({ reminderId: "r-1", runId: "run-existing" }); + + expect(accountMutex.run).toHaveBeenCalledTimes(1); + expect(accountMutex.run).toHaveBeenCalledWith("acct-A", expect.any(Function)); + }); + it("uses different mutex keys for different accounts (cross-account isolation)", async () => { getReminderMock.mockResolvedValueOnce({ id: "r-A", @@ -103,6 +145,8 @@ describe("fireReminder", () => { scheduleKind: "one_off", rrule: null, timezone: "Asia/Kuala_Lumpur", + deliveryWindowStartHour: 6, + deliveryWindowEndHour: 18, name: "A", }); getReminderMock.mockResolvedValueOnce({ @@ -115,6 +159,8 @@ describe("fireReminder", () => { scheduleKind: "one_off", rrule: null, timezone: "Asia/Kuala_Lumpur", + deliveryWindowStartHour: 6, + deliveryWindowEndHour: 18, name: "B", }); diff --git a/apps/bot/src/scheduler/fire-reminder.ts b/apps/bot/src/scheduler/fire-reminder.ts index 585ce52..8e2a2e4 100644 --- a/apps/bot/src/scheduler/fire-reminder.ts +++ b/apps/bot/src/scheduler/fire-reminder.ts @@ -12,7 +12,12 @@ import { readFile } from "node:fs/promises"; import { db } from "../db.js"; import { logger } from "../logger.js"; import { sessionManager } from "../whatsapp/session-manager.js"; -import { absoluteMediaPath, nextOccurrence, resolveDeliveryKind } from "@cmbot/shared"; +import { + absoluteMediaPath, + nextOccurrence, + resolveDeliveryKind, + windowEndAt, +} from "@cmbot/shared"; import { env } from "../env.js"; import { writeAuditLog } from "../audit.js"; import { getReminderWithDetails } from "../reminders/crud.js"; @@ -23,7 +28,13 @@ import { accountMutex } from "./per-key-mutex.js"; import { accountRateLimiter } from "./rate-limiter.js"; import { MediaUploadCache } from "./media-upload-cache.js"; -export type FireReminderPayload = { reminderId: string }; +export type FireReminderPayload = { + reminderId: string; + /** Optional resume hook. When present, fire-reminder ATTACHES to + * the existing run instead of creating a new one and only re-tries + * targets in `pending` status. Set by the resume server action. */ + runId?: string; +}; /** Random delay between same-group message parts. Just enough for * visible ordering in the chat at WA's natural pace. */ @@ -64,39 +75,74 @@ export async function fireReminder(payload: FireReminderPayload): Promise logger.warn({ reminderId: payload.reminderId }, "fire-reminder: reminder not found"); return; } - if (reminder.status !== "active") { - logger.info({ reminderId: reminder.id, status: reminder.status }, "fire-reminder: skipping (not active)"); + // Resumes are allowed even when the reminder's lifecycle status is + // 'paused' — we WANT to take a paused reminder back to active mid- + // resume. Fresh fires still require status='active'. + if (!payload.runId && reminder.status !== "active") { + logger.info( + { reminderId: reminder.id, status: reminder.status }, + "fire-reminder: skipping (not active)", + ); return; } // Per-account mutex: two reminders on the SAME account take turns // (running them concurrently would double the effective send rate // and risk a ban). Different accounts run in parallel. - await accountMutex.run(reminder.accountId, () => fireReminderInner(reminder)); + await accountMutex.run(reminder.accountId, () => fireReminderInner(reminder, payload.runId)); } async function fireReminderInner( reminder: NonNullable>>, + resumeRunId?: string, ): Promise { - const [run] = await db - .insert(reminderRuns) - .values({ - reminderId: reminder.id, - reminderName: reminder.name, - status: "pending", - }) - .returning({ id: reminderRuns.id }); - const runId = run!.id; + // Resume path attaches to the existing run row; fresh path inserts a new one. + let runId: string; + if (resumeRunId) { + const existing = await db.query.reminderRuns.findFirst({ + where: (r, { eq: dEq }) => dEq(r.id, resumeRunId), + }); + if (!existing) { + logger.warn( + { reminderId: reminder.id, resumeRunId }, + "fire-reminder: resume target run missing", + ); + return; + } + runId = existing.id; + // Flip the run back to in-flight so the UI stops showing it as paused. + await db + .update(reminderRuns) + .set({ status: "pending", errorSummary: null }) + .where(eq(reminderRuns.id, runId)); + } else { + const [run] = await db + .insert(reminderRuns) + .values({ + reminderId: reminder.id, + reminderName: reminder.name, + status: "pending", + }) + .returning({ id: reminderRuns.id }); + runId = run!.id; + } const session = sessionManager.getSession(reminder.accountId); if (!session) { logger.warn({ reminderId: reminder.id }, "fire-reminder: account not connected"); - await markAllSkipped(runId, reminder, "account not connected"); + if (!resumeRunId) { + await markAllSkipped(runId, reminder, "account not connected"); + } await db .update(reminderRuns) .set({ status: "skipped", errorSummary: "account not connected" }) .where(eq(reminderRuns.id, runId)); - await pgNotifyWeb({ type: "reminder.fired", reminderId: reminder.id, runId, status: "skipped" }); + await pgNotifyWeb({ + type: "reminder.fired", + reminderId: reminder.id, + runId, + status: "skipped", + }); return; } @@ -115,8 +161,9 @@ async function fireReminderInner( : []; const mediaById = new Map(mediaRows.map((m) => [m.id, m])); - // Pre-create run_target rows so the Activity tab shows progress mid-run. - if (reminder.targets.length > 0) { + // Pre-create run_target rows on the fresh path so the Activity tab + // shows progress mid-run. Resume reuses the existing rows. + if (!resumeRunId && reminder.targets.length > 0) { await db.insert(reminderRunTargets).values( reminder.targets.map((t) => ({ runId, @@ -127,11 +174,44 @@ async function fireReminderInner( ); } - // Per-run media upload cache. Each unique mediaId is prepared via - // generateWAMessageContent ONCE (which uploads to WA's CDN through - // the socket's waUploadToServer); the resulting proto.Message is - // reused for every group via socket.relayMessage. For 1000 groups - // × 5 MB image, this turns 5 GB of upload into 5 MB. + // On resume, only the still-pending rows are processed. On a fresh + // fire that's every row since we just inserted them all as pending. + const pendingRows = await db.query.reminderRunTargets.findMany({ + where: (t, { eq: dEq, and: dAnd }) => dAnd(dEq(t.runId, runId), dEq(t.status, "pending")), + }); + const pendingGroupIds = new Set(pendingRows.map((r) => r.groupId)); + const targetsToProcess = reminder.targets.filter((t) => pendingGroupIds.has(t.groupId)); + + // Already-sent / already-failed counts from prior run rounds (resume + // case). The final tally adds these to what THIS round produces. + const priorSentCount = resumeRunId + ? ( + await db.query.reminderRunTargets.findMany({ + where: (t, { eq: dEq, and: dAnd }) => + dAnd(dEq(t.runId, runId), dEq(t.status, "sent")), + }) + ).length + : 0; + const priorFailedCount = resumeRunId + ? ( + await db.query.reminderRunTargets.findMany({ + where: (t, { eq: dEq, and: dAnd }) => + dAnd(dEq(t.runId, runId), dEq(t.status, "failed")), + }) + ).length + : 0; + + // Window-end timestamp. If the reminder fires AFTER today's deadline + // hour (cron miss-fired late, or it's already 7pm) this is in the + // past and the FIRST gate check trips immediately, ending the run + // as failed without sending anything. + const windowEnd = windowEndAt( + reminder.timezone, + reminder.deliveryWindowEndHour, + new Date(), + ); + + // Per-run media upload cache (one prepare call per unique mediaId). const uploadCache = new MediaUploadCache(async (mediaId) => { const media = mediaById.get(mediaId); if (!media) throw new Error(`media row missing: ${mediaId}`); @@ -157,19 +237,26 @@ async function fireReminderInner( }); }); - // Per-account rate limiter — gates each socket send to stay within - // the account's safe band (BOT_MAX_SEND_PER_MINUTE, default 40). + // Per-account rate limiter — gates each socket send. const rateLimiter = accountRateLimiter.get(reminder.accountId); let sentCount = 0; let failedCount = 0; let skippedCount = 0; + let windowClosed = false; const groupConcurrency = pLimit(env.BOT_GROUP_CONCURRENCY); await Promise.all( - reminder.targets.map((target) => + targetsToProcess.map((target) => groupConcurrency(async () => { + // Window-end gate. CRITICAL: leave the row as `pending` (NOT + // `skipped`) so the run can be resumed later. + if (Date.now() >= windowEnd.getTime()) { + windowClosed = true; + return; + } + const group = groupById.get(target.groupId); if (!group) { await db @@ -187,8 +274,6 @@ async function fireReminderInner( const start = Date.now(); try { - // Once per group, before the first send. sendMessage handles - // sessions internally; relayMessage does not. await ensureGroupSessions(session.socket, group.waGroupJid); let lastMessageId: string | undefined; @@ -242,14 +327,37 @@ async function fireReminderInner( ), ); + // Compose the final status. Four shapes: + // paused : window closed mid-run with at least one row still pending + // AND we delivered at least one in this run or a prior round. + // Resumable. Sent rows stay sent, pending stays pending. + // success : every target sent. + // partial : every target attempted; some sent, some failed/skipped. + // failed : zero sent across all rounds, OR window closed before the + // first send (no progress to resume). const total = reminder.targets.length; - let status: "success" | "partial" | "failed"; + const totalSent = priorSentCount + sentCount; + const totalFailed = priorFailedCount + failedCount; + const remainingPending = ( + await db.query.reminderRunTargets.findMany({ + where: (t, { eq: dEq, and: dAnd }) => + dAnd(dEq(t.runId, runId), dEq(t.status, "pending")), + }) + ).length; + + let status: "success" | "partial" | "failed" | "paused"; let errorSummary: string | null = null; - if (sentCount === total) { + if (windowClosed && remainingPending > 0 && totalSent > 0) { + status = "paused"; + errorSummary = `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}). ${totalSent} of ${total} groups delivered, ${remainingPending} still pending. Resume from the Activity tab.`; + } else if (windowClosed && totalSent === 0) { + status = "failed"; + errorSummary = `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}) before any group could be sent. The reminder fired too late in the day.`; + } else if (totalSent === total) { status = "success"; - } else if (sentCount > 0) { + } else if (totalSent > 0) { status = "partial"; - errorSummary = `${sentCount} of ${total} groups delivered (${failedCount} failed, ${skippedCount} skipped).`; + errorSummary = `${totalSent} of ${total} groups delivered (${totalFailed} failed, ${skippedCount} skipped).`; } else { status = "failed"; errorSummary = total === 0 ? "No targets attached to reminder." : `All ${total} sends failed.`; @@ -260,29 +368,57 @@ async function fireReminderInner( .set({ status, errorSummary }) .where(eq(reminderRuns.id, runId)); - await pgNotifyWeb({ type: "reminder.fired", reminderId: reminder.id, runId, status }); + await pgNotifyWeb({ + type: "reminder.fired", + reminderId: reminder.id, + runId, + status, + sent: totalSent, + total, + }); - if (reminder.scheduleKind === "one_off") { + // Lifecycle bookkeeping. Skip when the run is paused — the reminder + // shouldn't end or re-arm while a resume is still possible. We also + // flip the reminder row itself to status='paused' so dashboards and + // the list view can reflect it. + if (status === "paused") { await db .update(reminders) - .set({ status: "ended", updatedAt: new Date() }) + .set({ status: "paused", updatedAt: new Date() }) .where(eq(reminders.id, reminder.id)); - } else if (reminder.scheduleKind === "recurring" && reminder.rrule) { - const next = nextOccurrence(reminder.rrule, reminder.timezone, new Date()); - await db - .update(reminders) - .set({ lastFiredAt: new Date(), updatedAt: new Date() }) - .where(eq(reminders.id, reminder.id)); - if (next) { - try { - await scheduleReminderFire(getBoss(), reminder.id, next); - logger.info({ reminderId: reminder.id, next }, "fire-reminder: re-armed for next occurrence"); - } catch (err) { - logger.error({ err, reminderId: reminder.id }, "fire-reminder: failed to re-arm next occurrence"); + logger.info( + { reminderId: reminder.id, runId, totalSent, remainingPending }, + "fire-reminder: paused — leaving lifecycle alone for resume", + ); + } else { + if (reminder.scheduleKind === "one_off") { + await db + .update(reminders) + .set({ status: "ended", updatedAt: new Date() }) + .where(eq(reminders.id, reminder.id)); + } else if (reminder.scheduleKind === "recurring" && reminder.rrule) { + const next = nextOccurrence(reminder.rrule, reminder.timezone, new Date()); + await db + .update(reminders) + .set({ + // If we're resuming a previously-paused reminder, lift it + // back to active so the next cron occurrence fires normally. + status: "active", + lastFiredAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(reminders.id, reminder.id)); + if (next) { + try { + await scheduleReminderFire(getBoss(), reminder.id, next); + logger.info({ reminderId: reminder.id, next }, "fire-reminder: re-armed for next occurrence"); + } catch (err) { + logger.error({ err, reminderId: reminder.id }, "fire-reminder: failed to re-arm next occurrence"); + } + } else { + logger.info({ reminderId: reminder.id }, "fire-reminder: no further occurrences, ending"); + await db.update(reminders).set({ status: "ended" }).where(eq(reminders.id, reminder.id)); } - } else { - logger.info({ reminderId: reminder.id }, "fire-reminder: no further occurrences, ending"); - await db.update(reminders).set({ status: "ended" }).where(eq(reminders.id, reminder.id)); } } @@ -296,7 +432,15 @@ async function fireReminderInner( }); logger.info( - { reminderId: reminder.id, runId, status, sent: sentCount, failed: failedCount, skipped: skippedCount }, + { + reminderId: reminder.id, + runId, + status, + sent: sentCount, + failed: failedCount, + skipped: skippedCount, + windowClosed, + }, "fire-reminder: done", ); } diff --git a/apps/web/src/hooks/use-events.ts b/apps/web/src/hooks/use-events.ts index b295970..95c3b6b 100644 --- a/apps/web/src/hooks/use-events.ts +++ b/apps/web/src/hooks/use-events.ts @@ -10,7 +10,13 @@ export type WebEventMap = { "session.disconnected": { accountId: string }; "session.timeout": { accountId: string }; "groups.synced": { accountId: string; count: number }; - "reminder.fired": { reminderId: string; runId: string; status: string }; + "reminder.fired": { + reminderId: string; + runId: string; + status: string; + sent?: number; + total?: number; + }; "reminder.failed": { reminderId: string; error: string }; "send_test.done": { groupId: string; ok: boolean; error: string | null }; }; diff --git a/apps/web/src/lib/notifications.test.ts b/apps/web/src/lib/notifications.test.ts index c1e85af..27e6c7f 100644 --- a/apps/web/src/lib/notifications.test.ts +++ b/apps/web/src/lib/notifications.test.ts @@ -240,6 +240,44 @@ describe("reminderFiredToNotification mapping", () => { expect(args).toBeNull(); }); + it("renders 'paused' with the resume/cancel call-to-action and sent/total", () => { + const args = reminderFiredToNotification({ + type: "reminder.fired", + reminderId: "r-p", + runId: "run-p", + status: "paused", + sent: 412, + total: 1000, + }); + expect(args?.title).toBe("Reminder paused"); + expect(args?.body).toBe("412 of 1000 groups delivered. Tap to resume or cancel."); + expect(args?.tag).toBe("reminder:r-p"); + expect(args?.href).toBe("/reminders/r-p"); + }); + + it("renders 'paused' without sent/total with a generic body", () => { + const args = reminderFiredToNotification({ + type: "reminder.fired", + reminderId: "r-p", + runId: "run-p", + status: "paused", + }); + expect(args?.title).toBe("Reminder paused"); + expect(args?.body).toMatch(/Delivery window closed/); + }); + + it("renders 'partial' with sent/total → 'X of Y groups delivered'", () => { + const args = reminderFiredToNotification({ + type: "reminder.fired", + reminderId: "r-2", + runId: "run-2", + status: "partial", + sent: 87, + total: 100, + }); + expect(args?.body).toBe("87 of 100 groups delivered. See activity for details."); + }); + it("uses the same tag for repeat fires of the same reminder so they coalesce", () => { const a = reminderFiredToNotification({ type: "reminder.fired", diff --git a/apps/web/src/lib/notifications.ts b/apps/web/src/lib/notifications.ts index 37cbd63..76dbbae 100644 --- a/apps/web/src/lib/notifications.ts +++ b/apps/web/src/lib/notifications.ts @@ -138,20 +138,35 @@ export function reminderFiredToNotification(event: { reminderId: string; runId: string; status: string; + sent?: number; + total?: number; }): ShowNotificationOptions | null { if (event.status === "skipped") return null; const headline = event.status === "success" ? "Reminder sent" - : event.status === "partial" - ? "Reminder partly sent" - : "Reminder failed"; - const body = + : event.status === "paused" + ? "Reminder paused" + : event.status === "partial" + ? "Reminder partly sent" + : "Reminder failed"; + let body = event.status === "success" ? "All groups received the message." - : event.status === "partial" - ? "Some groups received the message; others failed. See activity." - : "No groups received the message. See activity."; + : event.status === "paused" + ? "Delivery window closed before all groups got the message." + : event.status === "partial" + ? "Some groups received the message; others failed. See activity." + : "No groups received the message. See activity."; + if (event.status === "paused" && event.sent !== undefined && event.total !== undefined) { + body = `${event.sent} of ${event.total} groups delivered. Tap to resume or cancel.`; + } else if ( + event.status === "partial" && + event.sent !== undefined && + event.total !== undefined + ) { + body = `${event.sent} of ${event.total} groups delivered. See activity for details.`; + } return { title: headline, body,