feat(web,bot): resumeReminderRunAction + cancelReminderRunAction

Web actions:

* resumeReminderRunAction({ runId }) → validates ownership and that
  the run is in 'paused' state, then publishes a reminder.resume
  command via pg_notify('bot.command'). The bot's command-consumer
  picks it up and enqueues a fresh pg-boss job at REMINDER_FIRE_QUEUE
  carrying { reminderId, runId }; fire-reminder's existing resume
  branch attaches to the row.
* cancelReminderRunAction({ runId }) → flips remaining 'pending'
  targets to 'skipped' with error="canceled by operator", marks the
  run 'partial' with a clear errorSummary, and lifts the parent
  reminder out of 'paused' (recurring → active so the next
  occurrence fires; one-off → ended).

Bot:

* New BotCommand variant { type: "reminder.resume"; reminderId; runId }
* command-consumer registers handleResumeReminder which calls
  enqueueReminderResume(boss, reminderId, runId) — a sibling of
  scheduleReminderFire that posts the job at REMINDER_FIRE_QUEUE
  with { reminderId, runId } and singletonKey "reminder:resume:<runId>"
  so the resume doesn't conflict with a future-occurrence schedule.

Tests:
* reminders.run-actions.test.ts (11 tests) — every guard rail
  (invalid uuid, missing run, missing reminder, foreign operator,
  wrong status) and the recurring/one-off lifecycle branches.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
yiekheng 2026-05-10 15:54:21 +08:00
parent 57786f9d09
commit 376bbe595b
6 changed files with 391 additions and 5 deletions

View File

@ -6,14 +6,18 @@ import { handleStartPairing } from "./pair-handler.js";
import { handleUnpair } from "./unpair-handler.js";
import { handleSyncGroups } from "./sync-groups-handler.js";
import { handleSendTest } from "./send-test-handler.js";
import { handleScheduleReminder } from "./schedule-reminder-handler.js";
import {
handleScheduleReminder,
handleResumeReminder,
} from "./schedule-reminder-handler.js";
export type BotCommand =
| { type: "account.start_pairing"; accountId: string }
| { type: "account.unpair"; accountId: string }
| { type: "account.sync_groups"; accountId: string }
| { type: "group.send_test"; groupId: string; text: string }
| { type: "reminder.schedule"; reminderId: string; scheduledAtIso: string };
| { type: "reminder.schedule"; reminderId: string; scheduledAtIso: string }
| { type: "reminder.resume"; reminderId: string; runId: string };
type Handler = (cmd: BotCommand) => Promise<void>;
const handlers: { [K in BotCommand["type"]]?: (cmd: Extract<BotCommand, { type: K }>) => Promise<void> } = {};
@ -79,4 +83,7 @@ export function registerDefaultHandlers(): void {
registerHandler("reminder.schedule", async (cmd) => {
await handleScheduleReminder(cmd.reminderId, cmd.scheduledAtIso);
});
registerHandler("reminder.resume", async (cmd) => {
await handleResumeReminder(cmd.reminderId, cmd.runId);
});
}

View File

@ -1,6 +1,16 @@
import { getBoss } from "../scheduler/pgboss-client.js";
import { scheduleReminderFire } from "../scheduler/reminder-jobs.js";
import {
scheduleReminderFire,
enqueueReminderResume,
} from "../scheduler/reminder-jobs.js";
export async function handleScheduleReminder(reminderId: string, scheduledAtIso: string): Promise<void> {
await scheduleReminderFire(getBoss(), reminderId, new Date(scheduledAtIso));
}
export async function handleResumeReminder(
reminderId: string,
runId: string,
): Promise<void> {
await enqueueReminderResume(getBoss(), reminderId, runId);
}

View File

@ -50,6 +50,31 @@ export async function scheduleReminderFire(
return id;
}
/**
* Re-enqueue a paused run so fire-reminder picks up the still-pending
* targets. Different singleton key from scheduleReminderFire so the
* resume doesn't clobber the next-occurrence scheduled job and vice
* versa.
*/
export async function enqueueReminderResume(
boss: PgBoss,
reminderId: string,
runId: string,
): Promise<string | null> {
const id = await boss.send(
REMINDER_FIRE_QUEUE,
{ reminderId, runId },
{
retryLimit: 3,
retryDelay: 30,
retryBackoff: true,
singletonKey: `reminder:resume:${runId}`,
},
);
logger.info({ reminderId, runId, jobId: id }, "reminder.fire: resume enqueued");
return id;
}
export async function cancelReminderFire(_boss: PgBoss, reminderId: string): Promise<void> {
// Soft cancel: pg-boss doesn't expose a clean cancel-by-singleton API in v12.
// The scheduled job will still fire, but `fireReminder` exits early when the

View File

@ -0,0 +1,211 @@
/**
* Unit-tests the resume + cancel server actions in isolation. We mock
* the seeded operator, drizzle db, and the pgNotifyBot helper so the
* tests exercise the action's auth / status / lifecycle logic without
* a real Postgres connection.
*/
import { describe, it, expect, vi, beforeEach } from "vitest";
const findRunMock = vi.fn();
const findReminderMock = vi.fn();
const findAccountMock = vi.fn();
const updateMock = vi.fn();
const transactionMock = vi.fn();
const pgNotifyMock = vi.fn();
vi.mock("@/lib/db", () => ({
db: {
query: {
reminderRuns: { findFirst: (...a: unknown[]) => findRunMock(...a) },
reminders: { findFirst: (...a: unknown[]) => findReminderMock(...a) },
whatsappAccounts: {
findFirst: (...a: unknown[]) => findAccountMock(...a),
},
},
update: () => ({
set: () => ({ where: async (...a: unknown[]) => updateMock(...a) }),
}),
// The cancel action does its DB mutations inside a transaction.
// Run the callback against the same shape as `db` so its inner
// `tx.update(...).set(...).where(...)` calls land in updateMock.
transaction: async (fn: (tx: unknown) => Promise<unknown>) => {
transactionMock();
const tx = {
update: () => ({
set: () => ({
where: async (...a: unknown[]) => updateMock(...a),
}),
}),
};
return fn(tx);
},
},
}));
vi.mock("@/lib/operator", () => ({
getSeededOperator: async () => ({ id: "op-1" }),
}));
vi.mock("@/lib/notify", () => ({
pgNotifyBot: (...a: unknown[]) => pgNotifyMock(...a),
}));
// Rate limiter doesn't fire from these actions, but stub it anyway in
// case the implementation grows it later.
vi.mock("@/lib/rate-limit", () => ({
checkRateLimit: async () => ({ limited: false }),
}));
vi.mock("next/cache", () => ({ revalidatePath: vi.fn() }));
vi.mock("next/headers", () => ({ headers: async () => new Map() }));
vi.mock("next/navigation", () => ({ redirect: vi.fn() }));
import {
resumeReminderRunAction,
cancelReminderRunAction,
} from "./reminders";
const PAUSED_RUN = { id: "11111111-1111-1111-1111-111111111111", reminderId: "r-1", status: "paused" };
const REMINDER = { id: "r-1", accountId: "acc-1", scheduleKind: "recurring" };
const REMINDER_ONE_OFF = { ...REMINDER, scheduleKind: "one_off" };
const ACCOUNT = { id: "acc-1", operatorId: "op-1" };
beforeEach(() => {
findRunMock.mockReset();
findReminderMock.mockReset();
findAccountMock.mockReset();
updateMock.mockReset();
transactionMock.mockReset();
pgNotifyMock.mockReset();
});
describe("resumeReminderRunAction", () => {
it("rejects a non-uuid runId", async () => {
const r = await resumeReminderRunAction({ runId: "not-a-uuid" });
expect(r.ok).toBe(false);
if (!r.ok) expect(r.error).toMatch(/Invalid/);
});
it("returns 'Run not found' when the run row is missing", async () => {
findRunMock.mockResolvedValue(undefined);
const r = await resumeReminderRunAction({ runId: PAUSED_RUN.id });
expect(r).toEqual({ ok: false, error: "Run not found" });
});
it("returns 'Reminder not found' when the run is orphaned", async () => {
findRunMock.mockResolvedValue(PAUSED_RUN);
findReminderMock.mockResolvedValue(undefined);
const r = await resumeReminderRunAction({ runId: PAUSED_RUN.id });
expect(r).toEqual({ ok: false, error: "Reminder not found" });
});
it("returns 'Run not yours' when another operator owns the account", async () => {
findRunMock.mockResolvedValue(PAUSED_RUN);
findReminderMock.mockResolvedValue(REMINDER);
findAccountMock.mockResolvedValue(undefined);
const r = await resumeReminderRunAction({ runId: PAUSED_RUN.id });
expect(r).toEqual({ ok: false, error: "Run not yours" });
});
it("rejects when run.status !== 'paused'", async () => {
findRunMock.mockResolvedValue({ ...PAUSED_RUN, status: "success" });
findReminderMock.mockResolvedValue(REMINDER);
findAccountMock.mockResolvedValue(ACCOUNT);
const r = await resumeReminderRunAction({ runId: PAUSED_RUN.id });
expect(r.ok).toBe(false);
if (!r.ok) expect(r.error).toMatch(/Cannot resume a success run/);
});
it("happy path: notifies the bot with reminder.resume and runId", async () => {
findRunMock.mockResolvedValue(PAUSED_RUN);
findReminderMock.mockResolvedValue(REMINDER);
findAccountMock.mockResolvedValue(ACCOUNT);
const r = await resumeReminderRunAction({ runId: PAUSED_RUN.id });
expect(r).toEqual({ ok: true });
expect(pgNotifyMock).toHaveBeenCalledTimes(1);
expect(pgNotifyMock).toHaveBeenCalledWith({
type: "reminder.resume",
reminderId: REMINDER.id,
runId: PAUSED_RUN.id,
});
});
});
describe("cancelReminderRunAction", () => {
it("rejects a non-uuid runId", async () => {
const r = await cancelReminderRunAction({ runId: "nope" });
expect(r.ok).toBe(false);
if (!r.ok) expect(r.error).toMatch(/Invalid/);
});
it("rejects when the run isn't paused", async () => {
findRunMock.mockResolvedValue({ ...PAUSED_RUN, status: "success" });
findReminderMock.mockResolvedValue(REMINDER);
findAccountMock.mockResolvedValue(ACCOUNT);
const r = await cancelReminderRunAction({ runId: PAUSED_RUN.id });
expect(r.ok).toBe(false);
if (!r.ok) expect(r.error).toMatch(/Cannot cancel/);
});
it("happy path: opens a transaction and runs three updates (targets / run / reminder)", async () => {
findRunMock.mockResolvedValue(PAUSED_RUN);
findReminderMock.mockResolvedValue(REMINDER);
findAccountMock.mockResolvedValue(ACCOUNT);
const r = await cancelReminderRunAction({ runId: PAUSED_RUN.id });
expect(r).toEqual({ ok: true });
expect(transactionMock).toHaveBeenCalledTimes(1);
// Three separate set/where calls inside the tx: update targets,
// update run, update reminder lifecycle.
expect(updateMock).toHaveBeenCalledTimes(3);
// Cancel does NOT enqueue the bot — it's purely a DB-side operation.
expect(pgNotifyMock).not.toHaveBeenCalled();
});
it("recurring reminder: lifecycle goes back to active so the next occurrence fires", async () => {
// Use a tx-update spy that captures the SET payload.
const setSpy = vi.fn();
const { db } = await import("@/lib/db");
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(db as any).transaction = async (fn: (tx: unknown) => Promise<unknown>) => {
const tx = {
update: () => ({
set: (payload: unknown) => {
setSpy(payload);
return { where: async () => undefined };
},
}),
};
return fn(tx);
};
findRunMock.mockResolvedValue(PAUSED_RUN);
findReminderMock.mockResolvedValue(REMINDER); // recurring
findAccountMock.mockResolvedValue(ACCOUNT);
await cancelReminderRunAction({ runId: PAUSED_RUN.id });
// Last set call is on the reminders table — status flips to active.
const calls = setSpy.mock.calls;
const lastPayload = calls[calls.length - 1]?.[0] as Record<string, unknown>;
expect(lastPayload.status).toBe("active");
});
it("one-off reminder: lifecycle ends (no future occurrence to wait for)", async () => {
const setSpy = vi.fn();
const { db } = await import("@/lib/db");
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(db as any).transaction = async (fn: (tx: unknown) => Promise<unknown>) => {
const tx = {
update: () => ({
set: (payload: unknown) => {
setSpy(payload);
return { where: async () => undefined };
},
}),
};
return fn(tx);
};
findRunMock.mockResolvedValue(PAUSED_RUN);
findReminderMock.mockResolvedValue(REMINDER_ONE_OFF);
findAccountMock.mockResolvedValue(ACCOUNT);
await cancelReminderRunAction({ runId: PAUSED_RUN.id });
const calls = setSpy.mock.calls;
const lastPayload = calls[calls.length - 1]?.[0] as Record<string, unknown>;
expect(lastPayload.status).toBe("ended");
});
});

View File

@ -6,7 +6,13 @@ import { headers } from "next/headers";
import { eq } from "drizzle-orm";
import { z } from "zod";
import { DateTime } from "luxon";
import { reminders, reminderTargets, reminderMessages } from "@cmbot/db";
import {
reminders,
reminderTargets,
reminderMessages,
reminderRuns,
reminderRunTargets,
} from "@cmbot/db";
import { DEFAULT_TIMEZONE, isCronRule, nextOccurrence, validateMinInterval } from "@cmbot/shared";
import { db } from "@/lib/db";
import { getSeededOperator } from "@/lib/operator";
@ -538,3 +544,129 @@ export async function updateReminderAction(
revalidatePath(`/reminders/${reminderId}`);
return { ok: true, reminderId };
}
// ---------------------------------------------------------------------------
// Resume / cancel a paused run
// ---------------------------------------------------------------------------
const runIdSchema = z.object({ runId: z.string().uuid() });
export type ResumeReminderRunResult = { ok: true } | { ok: false; error: string };
/**
* Re-enqueue a paused reminder run. The bot picks it up, attaches to the
* existing run row, and only re-tries the rows still in `pending` state.
*
* Validates that the operator owns the underlying reminder + account
* pair and that the run is actually in 'paused' state anything else
* is a no-op (so a stale UI button doesn't double-fire a run).
*/
export async function resumeReminderRunAction(input: {
runId: string;
}): Promise<ResumeReminderRunResult> {
const op = await getSeededOperator();
const parsed = runIdSchema.safeParse(input);
if (!parsed.success) {
return { ok: false, error: "Invalid runId" };
}
const run = await db.query.reminderRuns.findFirst({
where: (r, { eq: dEq }) => dEq(r.id, parsed.data.runId),
});
if (!run || !run.reminderId) return { ok: false, error: "Run not found" };
const reminder = await db.query.reminders.findFirst({
where: (r, { eq: dEq }) => dEq(r.id, run.reminderId!),
});
if (!reminder) return { ok: false, error: "Reminder not found" };
// Operator must own the account the reminder belongs to.
const owned = await db.query.whatsappAccounts.findFirst({
where: (a, { eq: dEq, and: dAnd }) =>
dAnd(dEq(a.id, reminder.accountId), dEq(a.operatorId, op.id)),
});
if (!owned) return { ok: false, error: "Run not yours" };
if (run.status !== "paused") {
return { ok: false, error: `Cannot resume a ${run.status} run` };
}
await pgNotifyBot({
type: "reminder.resume",
reminderId: reminder.id,
runId: run.id,
});
revalidatePath("/activity");
revalidatePath(`/reminders/${reminder.id}`);
return { ok: true };
}
export type CancelReminderRunResult = { ok: true } | { ok: false; error: string };
/**
* Permanently end a paused run. Remaining `pending` targets become
* `skipped` with a clear "canceled by operator" reason; the run row
* resolves to `partial`. The reminder lifecycle is lifted out of
* 'paused' recurring goes back to 'active' so the next occurrence
* fires; one-off ends.
*/
export async function cancelReminderRunAction(input: {
runId: string;
}): Promise<CancelReminderRunResult> {
const op = await getSeededOperator();
const parsed = runIdSchema.safeParse(input);
if (!parsed.success) {
return { ok: false, error: "Invalid runId" };
}
const run = await db.query.reminderRuns.findFirst({
where: (r, { eq: dEq }) => dEq(r.id, parsed.data.runId),
});
if (!run || !run.reminderId) return { ok: false, error: "Run not found" };
const reminder = await db.query.reminders.findFirst({
where: (r, { eq: dEq }) => dEq(r.id, run.reminderId!),
});
if (!reminder) return { ok: false, error: "Reminder not found" };
const owned = await db.query.whatsappAccounts.findFirst({
where: (a, { eq: dEq, and: dAnd }) =>
dAnd(dEq(a.id, reminder.accountId), dEq(a.operatorId, op.id)),
});
if (!owned) return { ok: false, error: "Run not yours" };
if (run.status !== "paused") {
return { ok: false, error: `Cannot cancel a ${run.status} run` };
}
await db.transaction(async (tx) => {
// Pending → skipped with a clear cause.
await tx
.update(reminderRunTargets)
.set({ status: "skipped", error: "canceled by operator" })
.where(eq(reminderRunTargets.runId, run.id));
await tx
.update(reminderRuns)
.set({
status: "partial",
errorSummary:
"Canceled by operator before all groups received the message.",
})
.where(eq(reminderRuns.id, run.id));
// Lift the reminder out of 'paused'. Recurring goes back to active
// so the next occurrence can fire; one-off has no future occurrence.
await tx
.update(reminders)
.set({
status: reminder.scheduleKind === "recurring" ? "active" : "ended",
updatedAt: new Date(),
})
.where(eq(reminders.id, reminder.id));
});
revalidatePath("/activity");
revalidatePath(`/reminders/${reminder.id}`);
return { ok: true };
}

View File

@ -7,7 +7,8 @@ export type BotCommand =
| { type: "account.unpair"; accountId: string }
| { type: "account.sync_groups"; accountId: string }
| { type: "group.send_test"; groupId: string; text: string }
| { type: "reminder.schedule"; reminderId: string; scheduledAtIso: string };
| { type: "reminder.schedule"; reminderId: string; scheduledAtIso: string }
| { type: "reminder.resume"; reminderId: string; runId: string };
export async function pgNotifyBot(cmd: BotCommand): Promise<void> {
const json = JSON.stringify(cmd);