From 376bbe595b1f2d7fab3093f5c17f030a0a40240f Mon Sep 17 00:00:00 2001 From: yiekheng Date: Sun, 10 May 2026 15:54:21 +0800 Subject: [PATCH] feat(web,bot): resumeReminderRunAction + cancelReminderRunAction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Web actions: * resumeReminderRunAction({ runId }) → validates ownership and that the run is in 'paused' state, then publishes a reminder.resume command via pg_notify('bot.command'). The bot's command-consumer picks it up and enqueues a fresh pg-boss job at REMINDER_FIRE_QUEUE carrying { reminderId, runId }; fire-reminder's existing resume branch attaches to the row. * cancelReminderRunAction({ runId }) → flips remaining 'pending' targets to 'skipped' with error="canceled by operator", marks the run 'partial' with a clear errorSummary, and lifts the parent reminder out of 'paused' (recurring → active so the next occurrence fires; one-off → ended). Bot: * New BotCommand variant { type: "reminder.resume"; reminderId; runId } * command-consumer registers handleResumeReminder which calls enqueueReminderResume(boss, reminderId, runId) — a sibling of scheduleReminderFire that posts the job at REMINDER_FIRE_QUEUE with { reminderId, runId } and singletonKey "reminder:resume:" so the resume doesn't conflict with a future-occurrence schedule. Tests: * reminders.run-actions.test.ts (11 tests) — every guard rail (invalid uuid, missing run, missing reminder, foreign operator, wrong status) and the recurring/one-off lifecycle branches. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/bot/src/ipc/command-consumer.ts | 11 +- apps/bot/src/ipc/schedule-reminder-handler.ts | 12 +- apps/bot/src/scheduler/reminder-jobs.ts | 25 +++ .../src/actions/reminders.run-actions.test.ts | 211 ++++++++++++++++++ apps/web/src/actions/reminders.ts | 134 ++++++++++- apps/web/src/lib/notify.ts | 3 +- 6 files changed, 391 insertions(+), 5 deletions(-) create mode 100644 apps/web/src/actions/reminders.run-actions.test.ts diff --git a/apps/bot/src/ipc/command-consumer.ts b/apps/bot/src/ipc/command-consumer.ts index e03f681..bfd29e4 100644 --- a/apps/bot/src/ipc/command-consumer.ts +++ b/apps/bot/src/ipc/command-consumer.ts @@ -6,14 +6,18 @@ import { handleStartPairing } from "./pair-handler.js"; import { handleUnpair } from "./unpair-handler.js"; import { handleSyncGroups } from "./sync-groups-handler.js"; import { handleSendTest } from "./send-test-handler.js"; -import { handleScheduleReminder } from "./schedule-reminder-handler.js"; +import { + handleScheduleReminder, + handleResumeReminder, +} from "./schedule-reminder-handler.js"; export type BotCommand = | { type: "account.start_pairing"; accountId: string } | { type: "account.unpair"; accountId: string } | { type: "account.sync_groups"; accountId: string } | { type: "group.send_test"; groupId: string; text: string } - | { type: "reminder.schedule"; reminderId: string; scheduledAtIso: string }; + | { type: "reminder.schedule"; reminderId: string; scheduledAtIso: string } + | { type: "reminder.resume"; reminderId: string; runId: string }; type Handler = (cmd: BotCommand) => Promise; const handlers: { [K in BotCommand["type"]]?: (cmd: Extract) => Promise } = {}; @@ -79,4 +83,7 @@ export function registerDefaultHandlers(): void { registerHandler("reminder.schedule", async (cmd) => { await handleScheduleReminder(cmd.reminderId, cmd.scheduledAtIso); }); + registerHandler("reminder.resume", async (cmd) => { + await handleResumeReminder(cmd.reminderId, cmd.runId); + }); } diff --git a/apps/bot/src/ipc/schedule-reminder-handler.ts b/apps/bot/src/ipc/schedule-reminder-handler.ts index d49ba21..b1a9267 100644 --- a/apps/bot/src/ipc/schedule-reminder-handler.ts +++ b/apps/bot/src/ipc/schedule-reminder-handler.ts @@ -1,6 +1,16 @@ import { getBoss } from "../scheduler/pgboss-client.js"; -import { scheduleReminderFire } from "../scheduler/reminder-jobs.js"; +import { + scheduleReminderFire, + enqueueReminderResume, +} from "../scheduler/reminder-jobs.js"; export async function handleScheduleReminder(reminderId: string, scheduledAtIso: string): Promise { await scheduleReminderFire(getBoss(), reminderId, new Date(scheduledAtIso)); } + +export async function handleResumeReminder( + reminderId: string, + runId: string, +): Promise { + await enqueueReminderResume(getBoss(), reminderId, runId); +} diff --git a/apps/bot/src/scheduler/reminder-jobs.ts b/apps/bot/src/scheduler/reminder-jobs.ts index ce948dd..1ede06c 100644 --- a/apps/bot/src/scheduler/reminder-jobs.ts +++ b/apps/bot/src/scheduler/reminder-jobs.ts @@ -50,6 +50,31 @@ export async function scheduleReminderFire( return id; } +/** + * Re-enqueue a paused run so fire-reminder picks up the still-pending + * targets. Different singleton key from scheduleReminderFire so the + * resume doesn't clobber the next-occurrence scheduled job and vice + * versa. + */ +export async function enqueueReminderResume( + boss: PgBoss, + reminderId: string, + runId: string, +): Promise { + const id = await boss.send( + REMINDER_FIRE_QUEUE, + { reminderId, runId }, + { + retryLimit: 3, + retryDelay: 30, + retryBackoff: true, + singletonKey: `reminder:resume:${runId}`, + }, + ); + logger.info({ reminderId, runId, jobId: id }, "reminder.fire: resume enqueued"); + return id; +} + export async function cancelReminderFire(_boss: PgBoss, reminderId: string): Promise { // Soft cancel: pg-boss doesn't expose a clean cancel-by-singleton API in v12. // The scheduled job will still fire, but `fireReminder` exits early when the diff --git a/apps/web/src/actions/reminders.run-actions.test.ts b/apps/web/src/actions/reminders.run-actions.test.ts new file mode 100644 index 0000000..b781a71 --- /dev/null +++ b/apps/web/src/actions/reminders.run-actions.test.ts @@ -0,0 +1,211 @@ +/** + * Unit-tests the resume + cancel server actions in isolation. We mock + * the seeded operator, drizzle db, and the pgNotifyBot helper so the + * tests exercise the action's auth / status / lifecycle logic without + * a real Postgres connection. + */ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +const findRunMock = vi.fn(); +const findReminderMock = vi.fn(); +const findAccountMock = vi.fn(); +const updateMock = vi.fn(); +const transactionMock = vi.fn(); +const pgNotifyMock = vi.fn(); + +vi.mock("@/lib/db", () => ({ + db: { + query: { + reminderRuns: { findFirst: (...a: unknown[]) => findRunMock(...a) }, + reminders: { findFirst: (...a: unknown[]) => findReminderMock(...a) }, + whatsappAccounts: { + findFirst: (...a: unknown[]) => findAccountMock(...a), + }, + }, + update: () => ({ + set: () => ({ where: async (...a: unknown[]) => updateMock(...a) }), + }), + // The cancel action does its DB mutations inside a transaction. + // Run the callback against the same shape as `db` so its inner + // `tx.update(...).set(...).where(...)` calls land in updateMock. + transaction: async (fn: (tx: unknown) => Promise) => { + transactionMock(); + const tx = { + update: () => ({ + set: () => ({ + where: async (...a: unknown[]) => updateMock(...a), + }), + }), + }; + return fn(tx); + }, + }, +})); +vi.mock("@/lib/operator", () => ({ + getSeededOperator: async () => ({ id: "op-1" }), +})); +vi.mock("@/lib/notify", () => ({ + pgNotifyBot: (...a: unknown[]) => pgNotifyMock(...a), +})); +// Rate limiter doesn't fire from these actions, but stub it anyway in +// case the implementation grows it later. +vi.mock("@/lib/rate-limit", () => ({ + checkRateLimit: async () => ({ limited: false }), +})); +vi.mock("next/cache", () => ({ revalidatePath: vi.fn() })); +vi.mock("next/headers", () => ({ headers: async () => new Map() })); +vi.mock("next/navigation", () => ({ redirect: vi.fn() })); + +import { + resumeReminderRunAction, + cancelReminderRunAction, +} from "./reminders"; + +const PAUSED_RUN = { id: "11111111-1111-1111-1111-111111111111", reminderId: "r-1", status: "paused" }; +const REMINDER = { id: "r-1", accountId: "acc-1", scheduleKind: "recurring" }; +const REMINDER_ONE_OFF = { ...REMINDER, scheduleKind: "one_off" }; +const ACCOUNT = { id: "acc-1", operatorId: "op-1" }; + +beforeEach(() => { + findRunMock.mockReset(); + findReminderMock.mockReset(); + findAccountMock.mockReset(); + updateMock.mockReset(); + transactionMock.mockReset(); + pgNotifyMock.mockReset(); +}); + +describe("resumeReminderRunAction", () => { + it("rejects a non-uuid runId", async () => { + const r = await resumeReminderRunAction({ runId: "not-a-uuid" }); + expect(r.ok).toBe(false); + if (!r.ok) expect(r.error).toMatch(/Invalid/); + }); + + it("returns 'Run not found' when the run row is missing", async () => { + findRunMock.mockResolvedValue(undefined); + const r = await resumeReminderRunAction({ runId: PAUSED_RUN.id }); + expect(r).toEqual({ ok: false, error: "Run not found" }); + }); + + it("returns 'Reminder not found' when the run is orphaned", async () => { + findRunMock.mockResolvedValue(PAUSED_RUN); + findReminderMock.mockResolvedValue(undefined); + const r = await resumeReminderRunAction({ runId: PAUSED_RUN.id }); + expect(r).toEqual({ ok: false, error: "Reminder not found" }); + }); + + it("returns 'Run not yours' when another operator owns the account", async () => { + findRunMock.mockResolvedValue(PAUSED_RUN); + findReminderMock.mockResolvedValue(REMINDER); + findAccountMock.mockResolvedValue(undefined); + const r = await resumeReminderRunAction({ runId: PAUSED_RUN.id }); + expect(r).toEqual({ ok: false, error: "Run not yours" }); + }); + + it("rejects when run.status !== 'paused'", async () => { + findRunMock.mockResolvedValue({ ...PAUSED_RUN, status: "success" }); + findReminderMock.mockResolvedValue(REMINDER); + findAccountMock.mockResolvedValue(ACCOUNT); + const r = await resumeReminderRunAction({ runId: PAUSED_RUN.id }); + expect(r.ok).toBe(false); + if (!r.ok) expect(r.error).toMatch(/Cannot resume a success run/); + }); + + it("happy path: notifies the bot with reminder.resume and runId", async () => { + findRunMock.mockResolvedValue(PAUSED_RUN); + findReminderMock.mockResolvedValue(REMINDER); + findAccountMock.mockResolvedValue(ACCOUNT); + const r = await resumeReminderRunAction({ runId: PAUSED_RUN.id }); + expect(r).toEqual({ ok: true }); + expect(pgNotifyMock).toHaveBeenCalledTimes(1); + expect(pgNotifyMock).toHaveBeenCalledWith({ + type: "reminder.resume", + reminderId: REMINDER.id, + runId: PAUSED_RUN.id, + }); + }); +}); + +describe("cancelReminderRunAction", () => { + it("rejects a non-uuid runId", async () => { + const r = await cancelReminderRunAction({ runId: "nope" }); + expect(r.ok).toBe(false); + if (!r.ok) expect(r.error).toMatch(/Invalid/); + }); + + it("rejects when the run isn't paused", async () => { + findRunMock.mockResolvedValue({ ...PAUSED_RUN, status: "success" }); + findReminderMock.mockResolvedValue(REMINDER); + findAccountMock.mockResolvedValue(ACCOUNT); + const r = await cancelReminderRunAction({ runId: PAUSED_RUN.id }); + expect(r.ok).toBe(false); + if (!r.ok) expect(r.error).toMatch(/Cannot cancel/); + }); + + it("happy path: opens a transaction and runs three updates (targets / run / reminder)", async () => { + findRunMock.mockResolvedValue(PAUSED_RUN); + findReminderMock.mockResolvedValue(REMINDER); + findAccountMock.mockResolvedValue(ACCOUNT); + const r = await cancelReminderRunAction({ runId: PAUSED_RUN.id }); + expect(r).toEqual({ ok: true }); + expect(transactionMock).toHaveBeenCalledTimes(1); + // Three separate set/where calls inside the tx: update targets, + // update run, update reminder lifecycle. + expect(updateMock).toHaveBeenCalledTimes(3); + // Cancel does NOT enqueue the bot — it's purely a DB-side operation. + expect(pgNotifyMock).not.toHaveBeenCalled(); + }); + + it("recurring reminder: lifecycle goes back to active so the next occurrence fires", async () => { + // Use a tx-update spy that captures the SET payload. + const setSpy = vi.fn(); + const { db } = await import("@/lib/db"); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (db as any).transaction = async (fn: (tx: unknown) => Promise) => { + const tx = { + update: () => ({ + set: (payload: unknown) => { + setSpy(payload); + return { where: async () => undefined }; + }, + }), + }; + return fn(tx); + }; + + findRunMock.mockResolvedValue(PAUSED_RUN); + findReminderMock.mockResolvedValue(REMINDER); // recurring + findAccountMock.mockResolvedValue(ACCOUNT); + await cancelReminderRunAction({ runId: PAUSED_RUN.id }); + // Last set call is on the reminders table — status flips to active. + const calls = setSpy.mock.calls; + const lastPayload = calls[calls.length - 1]?.[0] as Record; + expect(lastPayload.status).toBe("active"); + }); + + it("one-off reminder: lifecycle ends (no future occurrence to wait for)", async () => { + const setSpy = vi.fn(); + const { db } = await import("@/lib/db"); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (db as any).transaction = async (fn: (tx: unknown) => Promise) => { + const tx = { + update: () => ({ + set: (payload: unknown) => { + setSpy(payload); + return { where: async () => undefined }; + }, + }), + }; + return fn(tx); + }; + + findRunMock.mockResolvedValue(PAUSED_RUN); + findReminderMock.mockResolvedValue(REMINDER_ONE_OFF); + findAccountMock.mockResolvedValue(ACCOUNT); + await cancelReminderRunAction({ runId: PAUSED_RUN.id }); + const calls = setSpy.mock.calls; + const lastPayload = calls[calls.length - 1]?.[0] as Record; + expect(lastPayload.status).toBe("ended"); + }); +}); diff --git a/apps/web/src/actions/reminders.ts b/apps/web/src/actions/reminders.ts index ca3530d..3910d2d 100644 --- a/apps/web/src/actions/reminders.ts +++ b/apps/web/src/actions/reminders.ts @@ -6,7 +6,13 @@ import { headers } from "next/headers"; import { eq } from "drizzle-orm"; import { z } from "zod"; import { DateTime } from "luxon"; -import { reminders, reminderTargets, reminderMessages } from "@cmbot/db"; +import { + reminders, + reminderTargets, + reminderMessages, + reminderRuns, + reminderRunTargets, +} from "@cmbot/db"; import { DEFAULT_TIMEZONE, isCronRule, nextOccurrence, validateMinInterval } from "@cmbot/shared"; import { db } from "@/lib/db"; import { getSeededOperator } from "@/lib/operator"; @@ -538,3 +544,129 @@ export async function updateReminderAction( revalidatePath(`/reminders/${reminderId}`); return { ok: true, reminderId }; } + +// --------------------------------------------------------------------------- +// Resume / cancel a paused run +// --------------------------------------------------------------------------- +const runIdSchema = z.object({ runId: z.string().uuid() }); + +export type ResumeReminderRunResult = { ok: true } | { ok: false; error: string }; + +/** + * Re-enqueue a paused reminder run. The bot picks it up, attaches to the + * existing run row, and only re-tries the rows still in `pending` state. + * + * Validates that the operator owns the underlying reminder + account + * pair and that the run is actually in 'paused' state — anything else + * is a no-op (so a stale UI button doesn't double-fire a run). + */ +export async function resumeReminderRunAction(input: { + runId: string; +}): Promise { + const op = await getSeededOperator(); + const parsed = runIdSchema.safeParse(input); + if (!parsed.success) { + return { ok: false, error: "Invalid runId" }; + } + + const run = await db.query.reminderRuns.findFirst({ + where: (r, { eq: dEq }) => dEq(r.id, parsed.data.runId), + }); + if (!run || !run.reminderId) return { ok: false, error: "Run not found" }; + + const reminder = await db.query.reminders.findFirst({ + where: (r, { eq: dEq }) => dEq(r.id, run.reminderId!), + }); + if (!reminder) return { ok: false, error: "Reminder not found" }; + + // Operator must own the account the reminder belongs to. + const owned = await db.query.whatsappAccounts.findFirst({ + where: (a, { eq: dEq, and: dAnd }) => + dAnd(dEq(a.id, reminder.accountId), dEq(a.operatorId, op.id)), + }); + if (!owned) return { ok: false, error: "Run not yours" }; + + if (run.status !== "paused") { + return { ok: false, error: `Cannot resume a ${run.status} run` }; + } + + await pgNotifyBot({ + type: "reminder.resume", + reminderId: reminder.id, + runId: run.id, + }); + + revalidatePath("/activity"); + revalidatePath(`/reminders/${reminder.id}`); + return { ok: true }; +} + +export type CancelReminderRunResult = { ok: true } | { ok: false; error: string }; + +/** + * Permanently end a paused run. Remaining `pending` targets become + * `skipped` with a clear "canceled by operator" reason; the run row + * resolves to `partial`. The reminder lifecycle is lifted out of + * 'paused' — recurring goes back to 'active' so the next occurrence + * fires; one-off ends. + */ +export async function cancelReminderRunAction(input: { + runId: string; +}): Promise { + const op = await getSeededOperator(); + const parsed = runIdSchema.safeParse(input); + if (!parsed.success) { + return { ok: false, error: "Invalid runId" }; + } + + const run = await db.query.reminderRuns.findFirst({ + where: (r, { eq: dEq }) => dEq(r.id, parsed.data.runId), + }); + if (!run || !run.reminderId) return { ok: false, error: "Run not found" }; + + const reminder = await db.query.reminders.findFirst({ + where: (r, { eq: dEq }) => dEq(r.id, run.reminderId!), + }); + if (!reminder) return { ok: false, error: "Reminder not found" }; + + const owned = await db.query.whatsappAccounts.findFirst({ + where: (a, { eq: dEq, and: dAnd }) => + dAnd(dEq(a.id, reminder.accountId), dEq(a.operatorId, op.id)), + }); + if (!owned) return { ok: false, error: "Run not yours" }; + + if (run.status !== "paused") { + return { ok: false, error: `Cannot cancel a ${run.status} run` }; + } + + await db.transaction(async (tx) => { + // Pending → skipped with a clear cause. + await tx + .update(reminderRunTargets) + .set({ status: "skipped", error: "canceled by operator" }) + .where(eq(reminderRunTargets.runId, run.id)); + + await tx + .update(reminderRuns) + .set({ + status: "partial", + errorSummary: + "Canceled by operator before all groups received the message.", + }) + .where(eq(reminderRuns.id, run.id)); + + // Lift the reminder out of 'paused'. Recurring goes back to active + // so the next occurrence can fire; one-off has no future occurrence. + await tx + .update(reminders) + .set({ + status: reminder.scheduleKind === "recurring" ? "active" : "ended", + updatedAt: new Date(), + }) + .where(eq(reminders.id, reminder.id)); + }); + + revalidatePath("/activity"); + revalidatePath(`/reminders/${reminder.id}`); + return { ok: true }; +} diff --git a/apps/web/src/lib/notify.ts b/apps/web/src/lib/notify.ts index f4e3ff8..88de38c 100644 --- a/apps/web/src/lib/notify.ts +++ b/apps/web/src/lib/notify.ts @@ -7,7 +7,8 @@ export type BotCommand = | { type: "account.unpair"; accountId: string } | { type: "account.sync_groups"; accountId: string } | { type: "group.send_test"; groupId: string; text: string } - | { type: "reminder.schedule"; reminderId: string; scheduledAtIso: string }; + | { type: "reminder.schedule"; reminderId: string; scheduledAtIso: string } + | { type: "reminder.resume"; reminderId: string; runId: string }; export async function pgNotifyBot(cmd: BotCommand): Promise { const json = JSON.stringify(cmd);