From d5b8c0beebd06638bede071c9db03905dc9a01cd Mon Sep 17 00:00:00 2001 From: yiekheng Date: Sun, 10 May 2026 14:15:16 +0800 Subject: [PATCH] feat(reminders): name is required (was optional with auto-derive) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously the name field auto-derived from the first text part when the operator left it blank. That's brittle once reminders carry multiple parts of varying provenance, and confusing in lists where "Reminder" or partial sentences crowd in. Now: every reminder must carry a non-empty name, capped at 60 chars. - Zod schema on createReminder/updateReminder: name moves from `z.string().nullable().optional()` to `z.string().trim().min(1, "Give the reminder a name").max(60)`. Stale-URL legacy callers that omit it now get a clear server error. - Wizard compose step: input has `required` + `aria-required`, placeholder + label simplified ("(optional)" tag and the helper paragraph removed), Continue blocks on empty. - Edit-message form: same — required, aria-required, save blocked on empty, the "leave blank and we'll auto-derive" hint dropped. - Review-submit client: defensive fail-fast for stale-bookmark URLs that arrive at step 5 without a name — bounces back with "Give the reminder a name (back on the Message step)" instead of letting the server reject. The resolveReminderName helper stays put — duplicateReminderAction and any future caller still benefit from the trim+clamp+fallback chain. Helper unit tests unaffected (they test the resolver in isolation, the policy-tightening lives at the schema layer above). 298 web tests still passing. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/web/src/actions/reminders.ts | 13 +- .../reminder-edit/edit-message-form.tsx | 18 +- .../reminder-wizard/compose-form-client.tsx | 24 +- .../reminder-wizard/review-submit-client.tsx | 11 +- .../plans/2026-05-10-windowed-fanout.md | 1829 +++++++++++++++++ .../2026-05-10-windowed-fanout-design.md | 117 +- 6 files changed, 1965 insertions(+), 47 deletions(-) create mode 100644 docs/superpowers/plans/2026-05-10-windowed-fanout.md diff --git a/apps/web/src/actions/reminders.ts b/apps/web/src/actions/reminders.ts index ebeb2b5..d7e984d 100644 --- a/apps/web/src/actions/reminders.ts +++ b/apps/web/src/actions/reminders.ts @@ -230,12 +230,13 @@ const createReminderSchema = z // least one valid message either way. messages: z.array(messagePartSchema).optional(), // User-supplied label shown in the list / detail page header. - // Optional on the wire — when blank or missing the action body - // auto-derives a fallback from the first text-bearing message - // part. The reminders.name DB column is text(50), so the - // resolver clamps to 60 chars (mirrors the duplicate-action - // pattern that produces " (copy)") and trims whitespace. - name: z.string().nullable().optional(), + // Required: every reminder must carry a non-empty name. The + // resolver still clamps to REMINDER_NAME_MAX so the DB column + // never has to reject the row. The legacy auto-derive from the + // first message part is kept as a fallback ONLY for legacy + // bookmarked URLs (where the create form was submitted before + // the field was added) — new submits always carry a name. + name: z.string().trim().min(1, "Give the reminder a name").max(60), // Legacy single-message fields. Still accepted so bookmarked // /reminders/new URLs don't 400 after the migration. The action body // collapses these into `messages` before doing any work. diff --git a/apps/web/src/components/reminder-edit/edit-message-form.tsx b/apps/web/src/components/reminder-edit/edit-message-form.tsx index 43a4927..4a31e1d 100644 --- a/apps/web/src/components/reminder-edit/edit-message-form.tsx +++ b/apps/web/src/components/reminder-edit/edit-message-form.tsx @@ -47,6 +47,11 @@ export function EditMessageForm({ const [error, setError] = useState(null); async function handleSave() { + const trimmedName = name.trim(); + if (!trimmedName) { + setError("Give the reminder a name."); + return; + } if (messages.length === 0) { setError("Add at least one text or file part."); return; @@ -58,7 +63,7 @@ export function EditMessageForm({ reminderId, accountId, groupIds, - name: name.trim() || null, + name: trimmedName, messages, scheduledAtIso, rrule, @@ -83,19 +88,20 @@ export function EditMessageForm({ setName(e.target.value)} + onChange={(e) => { + setName(e.target.value); + setError(null); + }} placeholder="e.g. Sunday morning standup" maxLength={REMINDER_NAME_MAX} + required + aria-required="true" /> -

- Leave blank and the first line of your message will be used. -

(null); function handleContinue() { + const trimmedName = name.trim(); + if (!trimmedName) { + setError("Give the reminder a name."); + return; + } if (messages.length === 0) { setError("Add at least one text or file part."); return; @@ -48,8 +53,7 @@ export function ComposeFormClient({ const sp = new URLSearchParams({ step: "3", accountId }); if (groupIds) sp.set("groupIds", groupIds); sp.set("messages", encodeMessages(messages)); - const trimmedName = name.trim(); - if (trimmedName) sp.set("name", trimmedName); + sp.set("name", trimmedName); if (passThroughParams.scheduledAt) sp.set("scheduledAt", passThroughParams.scheduledAt); if (passThroughParams.rrule) sp.set("rrule", passThroughParams.rrule); if (passThroughParams.editReminderId) sp.set("editReminderId", passThroughParams.editReminderId); @@ -59,26 +63,26 @@ export function ComposeFormClient({ return (
- {/* Name — sits above the message stack so the user names the - reminder before composing. Optional: blank falls back to the - first text part on the server side. */} + {/* Name — required identifier shown in the list, detail page, + and run-history rows. Capped at REMINDER_NAME_MAX (60). */}
setName(e.target.value)} + onChange={(e) => { + setName(e.target.value); + setError(null); + }} placeholder="e.g. Sunday morning standup" maxLength={REMINDER_NAME_MAX} + required + aria-required="true" /> -

- Leave blank and the first line of your message will be used. -

(null); async function handleSchedule() { + const trimmedName = name?.trim(); + if (!trimmedName) { + // The wizard's compose step now blocks Continue when the name is + // blank, so the only way to land here without one is a stale + // bookmarked URL. Bounce the operator back to step 2 with a + // clear error rather than letting the server reject it. + setError("Give the reminder a name (back on the Message step)."); + return; + } setSubmitting(true); setError(null); @@ -41,7 +50,7 @@ export function ReviewSubmitClient({ const payload = { accountId, groupIds: groupIds ? groupIds.split(",").filter(Boolean) : [], - name: name?.trim() || null, + name: trimmedName, messages, scheduledAtIso: scheduledAt, rrule: rrule ?? null, diff --git a/docs/superpowers/plans/2026-05-10-windowed-fanout.md b/docs/superpowers/plans/2026-05-10-windowed-fanout.md new file mode 100644 index 0000000..da200ee --- /dev/null +++ b/docs/superpowers/plans/2026-05-10-windowed-fanout.md @@ -0,0 +1,1829 @@ +# Windowed, Pacing-Safe Reminder Fan-Out — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Deliver reminders to many groups (1000+) safely within a per-reminder delivery window (default 6am–6pm). On window-close mid-run, **pause** with a clear "account at capacity / consider offload or smaller media" message and let the operator **resume** later from the UI. + +**Architecture:** Replace the current serial fan-out with a per-account isolation model. pg-boss `teamSize` raises so different-account reminders run in parallel; a per-key (accountId) async mutex serialises same-account fan-outs; a per-account token-bucket rate limiter paces sends; a per-run media-upload cache reuses each unique file across every group via Baileys' `prepareWAMessageMedia` + `relayMessage`; a window-end gate flips the run to `paused` (un-sent targets stay `pending`); a `resumeReminderRunAction` re-enters fire-reminder with the existing runId so the loop picks up only the still-pending targets. + +**Tech Stack:** Node.js + TypeScript, Baileys 7.0.0-rc10 (`prepareWAMessageMedia`, `relayMessage`, `generateMessageID`), pg-boss v12 (`boss.work` with `teamSize`), Drizzle ORM + Postgres, vitest. + +--- + +## File Structure + +| File | Role | +| --- | --- | +| `packages/db/migrations/0008_*.sql` (generated) | add `delivery_window_start_hour`, `delivery_window_end_hour` to `reminders` | +| `packages/db/src/schema.ts` | drizzle alignment for the two new columns | +| `apps/bot/src/env.ts` | three new env vars: `BOT_FIRE_CONCURRENCY`, `BOT_GROUP_CONCURRENCY`, `BOT_MAX_SEND_PER_MINUTE` | +| `apps/bot/src/scheduler/per-key-mutex.ts` (new) | accountId-keyed async mutex | +| `apps/bot/src/scheduler/per-key-mutex.test.ts` (new) | unit tests | +| `apps/bot/src/scheduler/rate-limiter.ts` (new) | per-account token bucket | +| `apps/bot/src/scheduler/rate-limiter.test.ts` (new) | fake-clock unit tests | +| `apps/bot/src/scheduler/delivery-window.ts` (new) | pure window-end calculator | +| `apps/bot/src/scheduler/delivery-window.test.ts` (new) | unit tests | +| `apps/bot/src/scheduler/media-upload-cache.ts` (new) | `prepareWAMessageMedia` results, keyed by mediaId | +| `apps/bot/src/scheduler/media-upload-cache.test.ts` (new) | mock-socket unit tests | +| `apps/bot/src/scheduler/fire-reminder.ts` (rewrite) | new loop using all of the above | +| `apps/bot/src/scheduler/reminder-jobs.ts` | pass `teamSize` config | +| `apps/web/src/actions/reminders.ts` | accept the two new fields on create/update | +| `apps/web/src/components/reminder-wizard/when-form-client.tsx` | "Delivery hours" inputs | +| `apps/web/src/components/reminder-edit/edit-when-form.tsx` | same | +| `apps/web/src/lib/notifications.ts` | partial-status notification body extension | + +--- + +## Task 1: Schema migration — delivery window columns + +**Files:** +- Modify: `packages/db/src/schema.ts` (lines around the `reminders` table — add 2 columns) +- Generate: `packages/db/migrations/0008_.sql` + +- [ ] **Step 1: Edit `packages/db/src/schema.ts` — add the two columns to `reminders`** + +Find the `reminders` table block and append the two new columns just before the closing `});`. The existing block ends with `lastFiredAt: timestamp(...)`. Add: + +```ts +export const reminders = pgTable("reminders", { + id: uuid("id").primaryKey().defaultRandom(), + accountId: uuid("account_id").notNull().references(() => whatsappAccounts.id, { onDelete: "cascade" }), + name: text("name").notNull(), + scheduleKind: text("schedule_kind").notNull(), + scheduledAt: timestamp("scheduled_at", { withTimezone: true }), + rrule: text("rrule"), + timezone: text("timezone").notNull(), + endsAt: timestamp("ends_at", { withTimezone: true }), + maxRuns: integer("max_runs"), + status: text("status").notNull().default("active"), + createdBy: uuid("created_by").notNull().references(() => operators.id), + createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(), + updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(), + lastFiredAt: timestamp("last_fired_at", { withTimezone: true }), + // Delivery window — operator-supplied hours (in the row's timezone). + // Only the END hour is enforced at runtime in v1: the run loop stops + // sending once `now()` crosses today's end hour. The START hour is + // documented for the UI; not gated. See spec + // docs/superpowers/specs/2026-05-10-windowed-fanout-design.md. + deliveryWindowStartHour: integer("delivery_window_start_hour").notNull().default(6), + deliveryWindowEndHour: integer("delivery_window_end_hour").notNull().default(18), +}); +``` + +- [ ] **Step 2: Generate the migration** + +Run: + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/db generate +``` + +Expected: a file appears at `packages/db/migrations/0008_.sql` containing `ALTER TABLE "reminders" ADD COLUMN "delivery_window_start_hour" integer DEFAULT 6 NOT NULL;` and a similar line for `delivery_window_end_hour`. + +- [ ] **Step 3: Apply the migration** + +Run: + +```bash +NO_SUDO=1 ./scripts/db.sh migrate +``` + +Expected: `Migrations applied.` Verify with: + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/db build +``` + +Expected: clean tsc output (the @cmbot/db package re-builds so downstream consumers pick up the new columns). + +- [ ] **Step 4: Typecheck the bot + web to confirm no schema-consumer regressions** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot typecheck +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web typecheck +``` + +Expected: both pass with no errors. + +- [ ] **Step 5: Commit** + +```bash +git add packages/db +git commit -m "feat(db): add delivery_window_start_hour / end_hour to reminders + +Both default 6 / 18, stored as int (hour-of-day in the row's +timezone). Only the end hour is gated at runtime — see spec +docs/superpowers/specs/2026-05-10-windowed-fanout-design.md." +``` + +--- + +## Task 2: Bot env vars — concurrency + rate caps + +**Files:** +- Modify: `apps/bot/src/env.ts` + +- [ ] **Step 1: Replace `apps/bot/src/env.ts` contents** + +Whole file: + +```ts +import { z } from "zod"; + +const numberFromString = z.string().regex(/^\d+$/).transform((s) => Number(s)); + +const envSchema = z.object({ + DATABASE_URL: z.string().url(), + DATA_DIR: z.string().min(1), + SESSIONS_DIR: z.string().min(1), + MEDIA_DIR: z.string().min(1), + BOT_HEALTH_PORT: numberFromString, + BOT_LOG_LEVEL: z.enum(["fatal", "error", "warn", "info", "debug", "trace"]).default("info"), + + // Reminder fan-out tuning. Defaults aim for an established WhatsApp + // account (~30-60 msg/min safe band). + // - BOT_FIRE_CONCURRENCY: pg-boss worker pool size for the + // `reminder.fire` queue. Sets the max number of accounts that can + // run their fan-outs simultaneously. 8 is a sane default; raise if + // you have more paired accounts firing concurrently. + // - BOT_GROUP_CONCURRENCY: per-account parallel group sends. Each + // group's parts stay serial (preserves visible order in the chat). + // 3 has been empirically stable on real traffic; anything above 5 + // without observation is asking for trouble. + // - BOT_MAX_SEND_PER_MINUTE: per-account token-bucket rate. 40 is the + // safe default; loosen to 60 only after a few weeks of running + // without flags. Tighten to 20 if WA returns rate-limit errors. + BOT_FIRE_CONCURRENCY: numberFromString.default("8"), + BOT_GROUP_CONCURRENCY: numberFromString.default("3"), + BOT_MAX_SEND_PER_MINUTE: numberFromString.default("40"), +}); + +export type Env = z.infer; + +export function parseEnv(input: Record): Env { + return envSchema.parse(input); +} + +export const env = parseEnv(process.env); +``` + +- [ ] **Step 2: Typecheck** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot typecheck +``` + +Expected: passes. + +- [ ] **Step 3: Run the bot tests to confirm `parseEnv` defaults still validate** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run env +``` + +Expected: all `env.test.ts` tests pass (existing tests don't supply the new vars; defaults must kick in). + +- [ ] **Step 4: Commit** + +```bash +git add apps/bot/src/env.ts +git commit -m "feat(bot): add fan-out tuning env vars + +BOT_FIRE_CONCURRENCY (8) — pg-boss worker pool, gates max accounts +running fan-outs simultaneously. +BOT_GROUP_CONCURRENCY (3) — per-account parallel group sends. +BOT_MAX_SEND_PER_MINUTE (40) — per-account rate-limit budget. + +Defaults are tuned for an established WhatsApp account. See spec for +the safe band rationale." +``` + +--- + +## Task 3: PerKeyMutex (TDD) + +**Files:** +- Create: `apps/bot/src/scheduler/per-key-mutex.test.ts` +- Create: `apps/bot/src/scheduler/per-key-mutex.ts` + +- [ ] **Step 1: Write the failing test file at `apps/bot/src/scheduler/per-key-mutex.test.ts`** + +```ts +import { describe, it, expect } from "vitest"; +import { PerKeyMutex } from "./per-key-mutex.js"; + +/** Tiny clock-free helper: returns a Promise that resolves after + * `n` microtasks. Lets us check ordering without real timers. */ +function tickN(n: number): Promise { + let p: Promise = Promise.resolve(); + for (let i = 0; i < n; i++) p = p.then(); + return p; +} + +describe("PerKeyMutex", () => { + it("allows a single call against one key to run immediately", async () => { + const m = new PerKeyMutex(); + const result = await m.run("k1", async () => 42); + expect(result).toBe(42); + }); + + it("serialises two calls against the same key", async () => { + const m = new PerKeyMutex(); + const order: string[] = []; + + const a = m.run("k1", async () => { + order.push("a-start"); + await tickN(5); + order.push("a-end"); + }); + const b = m.run("k1", async () => { + order.push("b-start"); + order.push("b-end"); + }); + + await Promise.all([a, b]); + // b cannot start until a has finished. + expect(order).toEqual(["a-start", "a-end", "b-start", "b-end"]); + }); + + it("runs different keys in parallel", async () => { + const m = new PerKeyMutex(); + const order: string[] = []; + + const a = m.run("k1", async () => { + order.push("a-start"); + await tickN(5); + order.push("a-end"); + }); + const b = m.run("k2", async () => { + order.push("b-start"); + order.push("b-end"); + }); + + await Promise.all([a, b]); + // b doesn't wait for a — interleaving expected. + expect(order[0]).toBe("a-start"); + expect(order).toContain("b-start"); + expect(order).toContain("b-end"); + // b's pair lands before a's end. + expect(order.indexOf("b-end")).toBeLessThan(order.indexOf("a-end")); + }); + + it("releases the lock when the handler throws", async () => { + const m = new PerKeyMutex(); + await expect( + m.run("k1", async () => { + throw new Error("boom"); + }), + ).rejects.toThrow("boom"); + // Next call on the same key must NOT hang. + const result = await m.run("k1", async () => "after"); + expect(result).toBe("after"); + }); + + it("forwards the resolved value of the handler", async () => { + const m = new PerKeyMutex(); + const out = await m.run("k1", async () => ({ ok: true, n: 7 })); + expect(out).toEqual({ ok: true, n: 7 }); + }); + + it("cleans up internal state for keys with no waiters", async () => { + const m = new PerKeyMutex(); + await m.run("k1", async () => {}); + expect(m.activeKeyCount()).toBe(0); + }); + + it("retains a key while a chain is in flight, then drops it", async () => { + const m = new PerKeyMutex(); + let release!: () => void; + const gate = new Promise((r) => (release = r)); + + const inFlight = m.run("k1", () => gate); + expect(m.activeKeyCount()).toBe(1); + release(); + await inFlight; + expect(m.activeKeyCount()).toBe(0); + }); +}); +``` + +- [ ] **Step 2: Run the failing test to confirm it fails for the right reason** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run per-key-mutex +``` + +Expected: FAIL with "Cannot find module './per-key-mutex.js'". + +- [ ] **Step 3: Implement `apps/bot/src/scheduler/per-key-mutex.ts`** + +```ts +/** + * Async mutex keyed by a string. Different keys run in parallel; + * same-key calls serialise. + * + * Used by fire-reminder so two reminders on the SAME WhatsApp account + * take turns (running them concurrently would double the effective + * send rate and risk a ban), while reminders on DIFFERENT accounts + * proceed in parallel. + * + * The implementation is a chain-per-key Promise: each call appends + * its work to the key's tail. Empty chains are cleaned up so the + * Map doesn't grow unbounded across the bot's lifetime. + */ +export class PerKeyMutex { + private chains = new Map>(); + + /** Run `fn` exclusively against `key`. Returns whatever `fn` returns. */ + async run(key: string, fn: () => Promise): Promise { + const prev = this.chains.get(key) ?? Promise.resolve(); + + let release!: () => void; + const completion = new Promise((r) => (release = r)); + // The chain we publish is "what was already there + this completion". + // The next caller awaits THIS, so order is preserved. + const chained = prev.then(() => completion); + this.chains.set(key, chained); + + try { + await prev; + return await fn(); + } finally { + release(); + // Drop the entry only if no later caller has appended in the + // meantime — otherwise we'd evict the in-flight chain. + if (this.chains.get(key) === chained) { + this.chains.delete(key); + } + } + } + + /** Diagnostic: how many keys currently have an in-flight or queued chain. */ + activeKeyCount(): number { + return this.chains.size; + } +} + +/** + * Singleton mutex used by fire-reminder, keyed by accountId. Lives at + * module scope so multiple pg-boss workers in the same process share + * state. + */ +export const accountMutex = new PerKeyMutex(); +``` + +- [ ] **Step 4: Run the test to confirm it passes** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run per-key-mutex +``` + +Expected: PASS, 7 tests. + +- [ ] **Step 5: Commit** + +```bash +git add apps/bot/src/scheduler/per-key-mutex.ts apps/bot/src/scheduler/per-key-mutex.test.ts +git commit -m "feat(bot): PerKeyMutex for accountId-keyed serialisation + +Same key serialises, different keys run in parallel. Used by +fire-reminder to prevent two same-account fan-outs from doubling +the effective send rate (which would risk a WhatsApp ban). Chains +auto-clean empty entries so the Map doesn't leak." +``` + +--- + +## Task 4: TokenBucket rate limiter (TDD) + +**Files:** +- Create: `apps/bot/src/scheduler/rate-limiter.test.ts` +- Create: `apps/bot/src/scheduler/rate-limiter.ts` + +- [ ] **Step 1: Write the failing test file at `apps/bot/src/scheduler/rate-limiter.test.ts`** + +```ts +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { TokenBucket, accountRateLimiter } from "./rate-limiter.js"; + +describe("TokenBucket", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-10T10:00:00Z")); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + it("starts full: first N=capacity acquires resolve immediately", async () => { + const b = new TokenBucket({ ratePerMinute: 60, capacity: 5 }); + for (let i = 0; i < 5; i++) { + await b.acquire(); // should not stall + } + // The 6th is a different test — see below. + }); + + it("blocks the (capacity+1)th acquire until a token regenerates", async () => { + // 60/min = 1 token per second. Capacity 2. + const b = new TokenBucket({ ratePerMinute: 60, capacity: 2 }); + await b.acquire(); + await b.acquire(); + + let resolved = false; + const pending = b.acquire().then(() => { + resolved = true; + }); + + // Immediately after draining, no token available. + await Promise.resolve(); + expect(resolved).toBe(false); + + // Advance the clock by exactly 1 second — one token should be back. + await vi.advanceTimersByTimeAsync(1000); + await pending; + expect(resolved).toBe(true); + }); + + it("FIFO: pending acquires resolve in the order they arrived", async () => { + const b = new TokenBucket({ ratePerMinute: 60, capacity: 1 }); + await b.acquire(); // bucket empty + + const order: number[] = []; + const a = b.acquire().then(() => order.push(1)); + const c = b.acquire().then(() => order.push(2)); + + // Two seconds → two tokens regenerate, in order. + await vi.advanceTimersByTimeAsync(2000); + await Promise.all([a, c]); + expect(order).toEqual([1, 2]); + }); + + it("does not over-fill past capacity even if the clock leaps forward", async () => { + const b = new TokenBucket({ ratePerMinute: 60, capacity: 3 }); + // Drain. + await b.acquire(); + await b.acquire(); + await b.acquire(); + // Leap an hour. Naive impl would credit 3600 tokens; we should cap at 3. + await vi.advanceTimersByTimeAsync(3_600_000); + // 3 immediate acquires should resolve. + await b.acquire(); + await b.acquire(); + await b.acquire(); + // The 4th waits for fresh regeneration. + let resolved = false; + b.acquire().then(() => (resolved = true)); + await Promise.resolve(); + expect(resolved).toBe(false); + }); + + it("ratePerMinute=0 is rejected at construction (caller bug, fail loud)", () => { + expect(() => new TokenBucket({ ratePerMinute: 0, capacity: 1 })).toThrow(); + }); +}); + +describe("accountRateLimiter (singleton)", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-10T10:00:00Z")); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + it("returns the SAME bucket for repeated lookups of one accountId", () => { + const a1 = accountRateLimiter.get("acct-1"); + const a2 = accountRateLimiter.get("acct-1"); + expect(a1).toBe(a2); + }); + + it("returns DIFFERENT buckets for different accountIds (isolation)", () => { + const a = accountRateLimiter.get("acct-A"); + const b = accountRateLimiter.get("acct-B"); + expect(a).not.toBe(b); + }); + + it("a drained account A bucket does not block account B", async () => { + const a = accountRateLimiter.get("acct-A"); + const b = accountRateLimiter.get("acct-B"); + // Drain A entirely (capacity defaults to ratePerMinute, so we + // need to pull at least that many). + for (let i = 0; i < 40; i++) await a.acquire(); + + // B should still grant immediately. + let bResolved = false; + b.acquire().then(() => (bResolved = true)); + await Promise.resolve(); + expect(bResolved).toBe(true); + }); +}); +``` + +- [ ] **Step 2: Run the failing test** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run rate-limiter +``` + +Expected: FAIL with "Cannot find module './rate-limiter.js'". + +- [ ] **Step 3: Implement `apps/bot/src/scheduler/rate-limiter.ts`** + +```ts +import { env } from "../env.js"; + +/** + * Token bucket for per-account send pacing. + * + * Tokens regenerate at `ratePerMinute / 60` per second. Capacity caps + * how many can accumulate during idle periods (so the operator can't + * burst 1000 messages just because the account was quiet for a day). + * + * `acquire()` resolves when a token is available, FIFO across waiters. + * Used by fire-reminder to gate every `socket.sendMessage` call. + */ +export interface TokenBucketOptions { + ratePerMinute: number; + /** Defaults to ratePerMinute (one minute's worth). */ + capacity?: number; +} + +export class TokenBucket { + private readonly ratePerMs: number; + private readonly capacity: number; + private tokens: number; + private lastRefillMs: number; + private waiters: Array<() => void> = []; + + constructor(opts: TokenBucketOptions) { + if (opts.ratePerMinute <= 0) { + throw new Error(`TokenBucket: ratePerMinute must be > 0, got ${opts.ratePerMinute}`); + } + this.ratePerMs = opts.ratePerMinute / 60_000; + this.capacity = opts.capacity ?? opts.ratePerMinute; + this.tokens = this.capacity; + this.lastRefillMs = Date.now(); + } + + /** Resolve when a token is available. FIFO across concurrent waiters. */ + async acquire(): Promise { + this.refill(); + if (this.tokens >= 1 && this.waiters.length === 0) { + this.tokens -= 1; + return; + } + return new Promise((resolve) => { + this.waiters.push(resolve); + this.scheduleNext(); + }); + } + + private refill(): void { + const now = Date.now(); + const elapsed = now - this.lastRefillMs; + if (elapsed <= 0) return; + const gained = elapsed * this.ratePerMs; + this.tokens = Math.min(this.capacity, this.tokens + gained); + this.lastRefillMs = now; + } + + private scheduleNext(): void { + // Wait until at least one token is available, then drain waiters + // FIFO. We compute the gap from current fractional token deficit. + this.refill(); + while (this.tokens >= 1 && this.waiters.length > 0) { + this.tokens -= 1; + const w = this.waiters.shift()!; + w(); + } + if (this.waiters.length === 0) return; + + const tokensShort = 1 - this.tokens; + const waitMs = Math.max(1, Math.ceil(tokensShort / this.ratePerMs)); + setTimeout(() => this.scheduleNext(), waitMs); + } +} + +/** + * Per-accountId TokenBucket registry. Each account gets its own + * pacing budget, so a slow account A never throttles account B. + */ +class AccountRateLimiter { + private buckets = new Map(); + private ratePerMinute: number; + + constructor(ratePerMinute: number) { + this.ratePerMinute = ratePerMinute; + } + + get(accountId: string): TokenBucket { + let b = this.buckets.get(accountId); + if (!b) { + b = new TokenBucket({ ratePerMinute: this.ratePerMinute }); + this.buckets.set(accountId, b); + } + return b; + } +} + +export const accountRateLimiter = new AccountRateLimiter(env.BOT_MAX_SEND_PER_MINUTE); +``` + +- [ ] **Step 4: Run the tests** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run rate-limiter +``` + +Expected: PASS, 8 tests. + +- [ ] **Step 5: Commit** + +```bash +git add apps/bot/src/scheduler/rate-limiter.ts apps/bot/src/scheduler/rate-limiter.test.ts +git commit -m "feat(bot): per-account token-bucket rate limiter + +TokenBucket gates each socket.sendMessage call. Tokens regenerate at +ratePerMinute/60 per second, capped at one minute's worth so quiet +accounts can't burst. FIFO drain across concurrent waiters. + +accountRateLimiter (singleton) hands out one bucket per accountId, so +account A's drain never throttles account B. Default rate is +BOT_MAX_SEND_PER_MINUTE (40) — the safe band for an established +WhatsApp account." +``` + +--- + +## Task 5: Delivery window helper (TDD) + +**Files:** +- Create: `apps/bot/src/scheduler/delivery-window.test.ts` +- Create: `apps/bot/src/scheduler/delivery-window.ts` + +- [ ] **Step 1: Write the failing test file at `apps/bot/src/scheduler/delivery-window.test.ts`** + +```ts +import { describe, it, expect } from "vitest"; +import { windowEndAt } from "./delivery-window.js"; + +const TZ = "Asia/Kuala_Lumpur"; // UTC+8 + +describe("windowEndAt", () => { + it("returns today's end-hour boundary in the given timezone", () => { + // Fire at 2026-05-10 10:00 KL == 02:00 UTC. End hour 18 == 18:00 KL == 10:00 UTC. + const fireAt = new Date("2026-05-10T02:00:00.000Z"); + const out = windowEndAt(TZ, 18, fireAt); + expect(out.toISOString()).toBe("2026-05-10T10:00:00.000Z"); + }); + + it("returns a past timestamp when fireAt is already after the end hour", () => { + // Fire at 2026-05-10 19:00 KL == 11:00 UTC. End hour 18 → today's 18:00 KL == 10:00 UTC. + // That's BEFORE fireAt. The caller's first window-gate check trips immediately. + const fireAt = new Date("2026-05-10T11:00:00.000Z"); + const out = windowEndAt(TZ, 18, fireAt); + expect(out.toISOString()).toBe("2026-05-10T10:00:00.000Z"); + expect(out.getTime()).toBeLessThan(fireAt.getTime()); + }); + + it("respects the timezone (UTC+0 vs UTC+8)", () => { + const fireAt = new Date("2026-05-10T02:00:00.000Z"); + const inUtc = windowEndAt("UTC", 18, fireAt); + expect(inUtc.toISOString()).toBe("2026-05-10T18:00:00.000Z"); + const inKl = windowEndAt("Asia/Kuala_Lumpur", 18, fireAt); + expect(inKl.toISOString()).toBe("2026-05-10T10:00:00.000Z"); + }); + + it("handles end hour 24 as midnight of the same calendar day's end", () => { + // 2026-05-10 in KL ends at 2026-05-11 00:00 KL == 2026-05-10 16:00 UTC. + const fireAt = new Date("2026-05-10T02:00:00.000Z"); + const out = windowEndAt(TZ, 24, fireAt); + expect(out.toISOString()).toBe("2026-05-10T16:00:00.000Z"); + }); + + it("DST transition day stays on the SAME calendar day (no skipping forward)", () => { + // US/Eastern has DST. On 2026-03-08 (DST starts), 18:00 EDT is a real time. + // Fire at 2026-03-08 10:00 EST (15:00 UTC). End at 2026-03-08 18:00 EDT (22:00 UTC). + const fireAt = new Date("2026-03-08T15:00:00.000Z"); + const out = windowEndAt("America/New_York", 18, fireAt); + expect(out.toISOString()).toBe("2026-03-08T22:00:00.000Z"); + }); + + it("rejects end hour outside 0..24", () => { + const fireAt = new Date("2026-05-10T00:00:00Z"); + expect(() => windowEndAt(TZ, -1, fireAt)).toThrow(); + expect(() => windowEndAt(TZ, 25, fireAt)).toThrow(); + }); +}); +``` + +- [ ] **Step 2: Run the failing test** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run delivery-window +``` + +Expected: FAIL with "Cannot find module './delivery-window.js'". + +- [ ] **Step 3: Implement `apps/bot/src/scheduler/delivery-window.ts`** + +```ts +import { DateTime } from "luxon"; + +/** + * Returns the end-of-window timestamp for the calendar day `fireAt` + * falls on, in the operator's timezone. + * + * windowEndAt("Asia/Kuala_Lumpur", 18, fireAt) + * → today's 18:00 KL (which may be in the past if fireAt is already + * past 18:00 KL — caller's first window-gate fires immediately). + * + * `endHour` is 0..24. Hour 24 is treated as midnight of the next + * calendar day (i.e. "end of today" inclusive). + * + * Pure: no I/O, no Date.now() reads, no clock dependency. Easy to + * test with fixture inputs. + */ +export function windowEndAt( + timezone: string, + endHour: number, + fireAt: Date, +): Date { + if (!Number.isInteger(endHour) || endHour < 0 || endHour > 24) { + throw new Error(`windowEndAt: endHour must be 0..24, got ${endHour}`); + } + + const dt = DateTime.fromJSDate(fireAt).setZone(timezone); + if (!dt.isValid) { + throw new Error(`windowEndAt: invalid timezone "${timezone}"`); + } + + // For hour 24, "end of day" is the next midnight. Luxon's `set` with + // hour=24 normalises into hour=0 of the next day, which is exactly + // what we want. + const end = dt.set({ hour: endHour, minute: 0, second: 0, millisecond: 0 }); + return end.toJSDate(); +} +``` + +- [ ] **Step 4: Run the tests** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run delivery-window +``` + +Expected: PASS, 6 tests. + +- [ ] **Step 5: Commit** + +```bash +git add apps/bot/src/scheduler/delivery-window.ts apps/bot/src/scheduler/delivery-window.test.ts +git commit -m "feat(bot): pure delivery-window end calculator + +windowEndAt(timezone, endHour, fireAt) returns the end-of-window for +the day fireAt is on. If fireAt is already past, the result is a +past timestamp — the run loop's first window gate trips immediately +and the entire run resolves as failed (zero sent), which is the +right behaviour for 'we can't send after window close'." +``` + +--- + +## Task 6: MediaUploadCache (TDD with mock socket) + +**Files:** +- Create: `apps/bot/src/scheduler/media-upload-cache.test.ts` +- Create: `apps/bot/src/scheduler/media-upload-cache.ts` + +- [ ] **Step 1: Write the failing test file at `apps/bot/src/scheduler/media-upload-cache.test.ts`** + +```ts +import { describe, it, expect, vi } from "vitest"; +import { MediaUploadCache } from "./media-upload-cache.js"; + +describe("MediaUploadCache", () => { + it("uploads each unique mediaId exactly once across N gets", async () => { + const prepare = vi.fn(async (mediaId: string) => ({ + kind: "prepared", + mediaId, + })); + const cache = new MediaUploadCache(prepare); + + const a1 = await cache.get("media-A"); + const a2 = await cache.get("media-A"); + const b1 = await cache.get("media-B"); + + expect(prepare).toHaveBeenCalledTimes(2); + expect(prepare).toHaveBeenCalledWith("media-A"); + expect(prepare).toHaveBeenCalledWith("media-B"); + // Same handle returned for repeated lookups of A. + expect(a1).toBe(a2); + expect(a1).not.toBe(b1); + }); + + it("coalesces concurrent gets of the same mediaId into ONE prepare call", async () => { + let resolveA: (v: unknown) => void = () => {}; + const aPromise = new Promise((r) => (resolveA = r)); + const prepare = vi.fn(async (mediaId: string) => { + if (mediaId === "media-A") return aPromise; + return { kind: "prepared", mediaId }; + }); + const cache = new MediaUploadCache(prepare); + + const p1 = cache.get("media-A"); + const p2 = cache.get("media-A"); + const p3 = cache.get("media-A"); + + // Resolve the in-flight prepare. + resolveA({ kind: "prepared", mediaId: "media-A" }); + + const [r1, r2, r3] = await Promise.all([p1, p2, p3]); + expect(prepare).toHaveBeenCalledTimes(1); // de-duplicated + expect(r1).toBe(r2); + expect(r2).toBe(r3); + }); + + it("a thrown prepare is NOT cached — next get retries", async () => { + let attempt = 0; + const prepare = vi.fn(async (_mediaId: string) => { + attempt++; + if (attempt === 1) throw new Error("upload network blip"); + return { kind: "prepared", attempt }; + }); + const cache = new MediaUploadCache(prepare); + + await expect(cache.get("media-A")).rejects.toThrow("upload network blip"); + // Second attempt must call prepare again — DON'T cache failures. + const r = await cache.get("media-A"); + expect(prepare).toHaveBeenCalledTimes(2); + expect(r).toEqual({ kind: "prepared", attempt: 2 }); + }); + + it("size() reflects the number of cached unique mediaIds", async () => { + const prepare = async (mediaId: string) => ({ mediaId }); + const cache = new MediaUploadCache(prepare); + expect(cache.size()).toBe(0); + await cache.get("a"); + expect(cache.size()).toBe(1); + await cache.get("b"); + expect(cache.size()).toBe(2); + await cache.get("a"); // already cached + expect(cache.size()).toBe(2); + }); +}); +``` + +- [ ] **Step 2: Run the failing test** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run media-upload-cache +``` + +Expected: FAIL with "Cannot find module './media-upload-cache.js'". + +- [ ] **Step 3: Implement `apps/bot/src/scheduler/media-upload-cache.ts`** + +```ts +/** + * Per-run cache of `prepareWAMessageMedia` results, keyed by + * `mediaId`. The point: when a reminder fans out to 1000 groups with + * one image, we want to upload that image to WhatsApp's CDN ONCE, not + * 1000 times. Subsequent group sends reuse the prepared message + * (with embedded directPath / mediaKey) via socket.relayMessage. + * + * Lifecycle: one cache instance per fire-reminder run. After the run + * completes, the cache is dropped — we don't share uploads across + * runs because WA media tokens are short-lived. + * + * Concurrent gets of the same mediaId are coalesced into a single + * prepare call. Failed prepares are NOT cached so the next attempt + * retries (network blips at upload time shouldn't poison the cache). + */ +export class MediaUploadCache { + private readonly prepare: (mediaId: string) => Promise; + private readonly entries = new Map>(); + + constructor(prepare: (mediaId: string) => Promise) { + this.prepare = prepare; + } + + async get(mediaId: string): Promise { + const existing = this.entries.get(mediaId); + if (existing) return existing; + + const inflight = this.prepare(mediaId); + // Insert eagerly so concurrent gets dedupe. + this.entries.set(mediaId, inflight); + + try { + return await inflight; + } catch (err) { + // Don't cache failures — the next caller should retry. + this.entries.delete(mediaId); + throw err; + } + } + + size(): number { + return this.entries.size; + } +} +``` + +- [ ] **Step 4: Run the tests** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run media-upload-cache +``` + +Expected: PASS, 4 tests. + +- [ ] **Step 5: Commit** + +```bash +git add apps/bot/src/scheduler/media-upload-cache.ts apps/bot/src/scheduler/media-upload-cache.test.ts +git commit -m "feat(bot): MediaUploadCache for once-per-run media prepare + +One cache instance per fire-reminder run. Each unique mediaId gets +prepared (uploaded to WA CDN) exactly once, and subsequent group +sends within the run reuse the prepared message via relayMessage. +Concurrent gets coalesce into a single prepare. Failed prepares +don't poison the cache — next caller retries." +``` + +--- + +## Task 7: Rewrite `fire-reminder.ts` and bump pg-boss `teamSize` + +**Files:** +- Modify: `apps/bot/src/scheduler/reminder-jobs.ts` +- Rewrite: `apps/bot/src/scheduler/fire-reminder.ts` + +- [ ] **Step 1: Update `apps/bot/src/scheduler/reminder-jobs.ts` — pass `teamSize`** + +Whole file: + +```ts +import type { PgBoss } from "pg-boss"; +import { logger } from "../logger.js"; +import { env } from "../env.js"; +import { fireReminder, type FireReminderPayload } from "./fire-reminder.js"; + +export const REMINDER_FIRE_QUEUE = "reminder.fire"; + +export async function registerReminderJobs(boss: PgBoss): Promise { + await boss.createQueue(REMINDER_FIRE_QUEUE); + await boss.work( + REMINDER_FIRE_QUEUE, + { + // Up to BOT_FIRE_CONCURRENCY jobs in flight at once. Combined with + // the per-account mutex inside fireReminder, this lets reminders + // on DIFFERENT accounts run in parallel while same-account + // reminders take turns. + teamSize: env.BOT_FIRE_CONCURRENCY, + teamConcurrency: 1, + }, + async (jobs) => { + const job = jobs[0]; + if (!job) return; + logger.debug({ jobId: job.id, payload: job.data }, "reminder.fire: handling"); + await fireReminder(job.data); + }, + ); + logger.info( + { teamSize: env.BOT_FIRE_CONCURRENCY }, + "reminder.fire: handler registered", + ); +} + +export async function scheduleReminderFire( + boss: PgBoss, + reminderId: string, + scheduledAt: Date, +): Promise { + const id = await boss.send( + REMINDER_FIRE_QUEUE, + { reminderId }, + { + startAfter: scheduledAt, + retryLimit: 3, + retryDelay: 30, + retryBackoff: true, + // Use the reminderId as a singleton key so re-scheduling cancels the old job + singletonKey: `reminder:${reminderId}`, + }, + ); + logger.info({ reminderId, jobId: id, scheduledAt }, "reminder.fire: scheduled"); + return id; +} + +export async function cancelReminderFire(_boss: PgBoss, reminderId: string): Promise { + // Soft cancel: pg-boss doesn't expose a clean cancel-by-singleton API in v12. + // The scheduled job will still fire, but `fireReminder` exits early when the + // reminder row is gone. Hard cancel can be added later by storing the jobId. + logger.info({ reminderId }, "reminder.fire: cancel requested (soft, fizzles on fire)"); +} +``` + +- [ ] **Step 2: Rewrite `apps/bot/src/scheduler/fire-reminder.ts`** + +Whole file: + +```ts +import { and, eq, inArray } from "drizzle-orm"; +import { reminderRuns, reminderRunTargets, reminders } from "@cmbot/db"; +import { + generateWAMessageContent, + generateMessageID, + type AnyMessageContent, + type proto, +} from "@whiskeysockets/baileys"; +import pLimit from "p-limit"; +import { db } from "../db.js"; +import { logger } from "../logger.js"; +import { sessionManager } from "../whatsapp/session-manager.js"; +import { absoluteMediaPath, nextOccurrence, resolveDeliveryKind } from "@cmbot/shared"; +import { open as fsOpen, readFile } from "node:fs/promises"; +import { env } from "../env.js"; +import { writeAuditLog } from "../audit.js"; +import { getReminderWithDetails } from "../reminders/crud.js"; +import { getBoss } from "./pgboss-client.js"; +import { scheduleReminderFire } from "./reminder-jobs.js"; +import { pgNotifyWeb } from "../ipc/notify.js"; +import { accountMutex } from "./per-key-mutex.js"; +import { accountRateLimiter } from "./rate-limiter.js"; +import { windowEndAt } from "./delivery-window.js"; +import { MediaUploadCache } from "./media-upload-cache.js"; + +export type FireReminderPayload = { + reminderId: string; + /** Optional resume hook. When present, fire-reminder ATTACHES to + * the existing run instead of creating a new one, and only + * re-attempts targets in `pending` status. Set by the resume + * server action. */ + runId?: string; +}; + +/** Read the first N bytes of a file (used to sniff HEIF/AVIF/MOV brands). */ +async function readHeadBytes(filePath: string, n: number): Promise { + const fh = await fsOpen(filePath, "r"); + try { + const buf = new Uint8Array(n); + await fh.read(buf, 0, n, 0); + return buf; + } finally { + await fh.close(); + } +} + +/** Random delay between same-group message parts (ms). Just enough for + * visible ordering in the chat at WA's natural pace. */ +function partJitterMs(): number { + return 200 + Math.floor(Math.random() * 300); // 200..499 +} + +export async function fireReminder(payload: FireReminderPayload): Promise { + const reminder = await getReminderWithDetails(payload.reminderId); + if (!reminder) { + logger.warn({ reminderId: payload.reminderId }, "fire-reminder: reminder not found"); + return; + } + if (reminder.status !== "active") { + logger.info({ reminderId: reminder.id, status: reminder.status }, "fire-reminder: skipping (not active)"); + return; + } + + // Per-account mutex: two reminders on the SAME account take turns. + // Different accounts run in parallel (cross-account isolation). + await accountMutex.run(reminder.accountId, () => fireReminderInner(reminder, payload.runId)); +} + +async function fireReminderInner( + reminder: NonNullable>>, + resumeRunId?: string, +): Promise { + // Resume path: attach to the existing run; fresh path: create one. + let runId: string; + if (resumeRunId) { + const existing = await db.query.reminderRuns.findFirst({ + where: (r, { eq }) => eq(r.id, resumeRunId), + }); + if (!existing) { + logger.warn({ reminderId: reminder.id, resumeRunId }, "fire-reminder: resume target run missing"); + return; + } + runId = existing.id; + // Re-mark as in-flight so the UI shows the run is no longer paused. + await db + .update(reminderRuns) + .set({ status: "pending", errorSummary: null }) + .where(eq(reminderRuns.id, runId)); + } else { + const [run] = await db + .insert(reminderRuns) + .values({ + reminderId: reminder.id, + reminderName: reminder.name, + status: "pending", + }) + .returning({ id: reminderRuns.id }); + runId = run!.id; + } + + const session = sessionManager.getSession(reminder.accountId); + if (!session) { + logger.warn({ reminderId: reminder.id }, "fire-reminder: account not connected"); + await markAllSkipped(runId, reminder, "account not connected"); + await db.update(reminderRuns).set({ status: "skipped", errorSummary: "account not connected" }).where(eq(reminderRuns.id, runId)); + await pgNotifyWeb({ type: "reminder.fired", reminderId: reminder.id, runId, status: "skipped" }); + return; + } + + // Up-front: load all groups + media rows in TWO bulk queries. + const groupIds = reminder.targets.map((t) => t.groupId); + const groupRows = groupIds.length + ? await db.query.whatsappGroups.findMany({ where: (g) => inArray(g.id, groupIds) }) + : []; + const groupById = new Map(groupRows.map((g) => [g.id, g])); + + const mediaIds = Array.from( + new Set(reminder.messages.map((m) => m.mediaId).filter((id): id is string => Boolean(id))), + ); + const mediaRows = mediaIds.length + ? await db.query.mediaFiles.findMany({ where: (m) => inArray(m.id, mediaIds) }) + : []; + const mediaById = new Map(mediaRows.map((m) => [m.id, m])); + + // Pre-create run_targets rows so progress is observable mid-run. + if (reminder.targets.length > 0) { + await db.insert(reminderRunTargets).values( + reminder.targets.map((t) => ({ + runId, + groupId: t.groupId, + groupLabel: groupById.get(t.groupId)?.name ?? null, + status: "pending" as const, + })), + ); + } + + // Window-end timestamp. If the reminder fires AFTER today's end-hour + // (e.g. cron miss-fired late) this is in the past — every iteration + // will trip the gate and the run resolves as failed. + const windowEnd = windowEndAt( + reminder.timezone, + reminder.deliveryWindowEndHour, + new Date(), + ); + + // Per-run media upload cache. Each unique mediaId is prepared via + // generateWAMessageContent ONCE (which uploads to WA's CDN); the + // resulting proto.Message is reused for every group via relayMessage. + // socket.waUploadToServer is the upload helper Baileys exposes. + const uploadCache = new MediaUploadCache(async (mediaId) => { + const media = mediaById.get(mediaId); + if (!media) throw new Error(`media row missing: ${mediaId}`); + const filePath = absoluteMediaPath(media.storagePath, env.MEDIA_DIR); + const buffer = await readFile(filePath); + const head = buffer.subarray(0, 12); + const resolved = resolveDeliveryKind(media.mimeType, head); + const senderKind: "image" | "video" | "document" = + resolved === "image" || resolved === "video" ? resolved : "document"; + const content: AnyMessageContent = + senderKind === "image" + ? { image: buffer, mimetype: media.mimeType } + : senderKind === "video" + ? { video: buffer, mimetype: media.mimeType } + : { document: buffer, fileName: media.filenameOriginal, mimetype: media.mimeType }; + return generateWAMessageContent(content, { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + upload: (session.socket as any).waUploadToServer, + }); + }); + + // Per-account rate limiter — gates each socket.sendMessage / relayMessage call. + const rateLimiter = accountRateLimiter.get(reminder.accountId); + + let sentCount = 0; + let failedCount = 0; + let skippedCount = 0; + let windowClosed = false; + + const groupConcurrency = pLimit(env.BOT_GROUP_CONCURRENCY); + + await Promise.all( + reminder.targets.map((target) => + groupConcurrency(async () => { + // Window-end gate — even if we already started other groups, + // any not-yet-started one stops here. + if (Date.now() >= windowEnd.getTime()) { + windowClosed = true; + await db + .update(reminderRunTargets) + .set({ status: "skipped", error: "delivery window closed" }) + .where( + and( + eq(reminderRunTargets.runId, runId), + eq(reminderRunTargets.groupId, target.groupId), + ), + ); + skippedCount++; + return; + } + + const group = groupById.get(target.groupId); + if (!group) { + await db + .update(reminderRunTargets) + .set({ status: "skipped", error: "group missing from db" }) + .where( + and( + eq(reminderRunTargets.runId, runId), + eq(reminderRunTargets.groupId, target.groupId), + ), + ); + skippedCount++; + return; + } + + const start = Date.now(); + try { + let lastMessageId: string | undefined; + for (const part of reminder.messages) { + await rateLimiter.acquire(); + if (part.kind === "text" && part.textContent) { + const r = await session.socket.sendMessage(group.waGroupJid, { + text: part.textContent, + }); + lastMessageId = r?.key?.id ?? undefined; + } else if (part.mediaId) { + const prebuilt = await uploadCache.get(part.mediaId); + // Inject the caption (if any) just before relaying — the + // prebuilt content carries the media but each relay uses + // a fresh messageId. + if (part.textContent) { + injectCaption(prebuilt, part.textContent); + } + const messageId = generateMessageID(); + await session.socket.relayMessage(group.waGroupJid, prebuilt, { messageId }); + lastMessageId = messageId; + } + await new Promise((r) => setTimeout(r, partJitterMs())); + } + await db + .update(reminderRunTargets) + .set({ + status: "sent", + waMessageId: lastMessageId ?? null, + latencyMs: Date.now() - start, + }) + .where( + and( + eq(reminderRunTargets.runId, runId), + eq(reminderRunTargets.groupId, target.groupId), + ), + ); + sentCount++; + } catch (err) { + logger.error( + { err, reminderId: reminder.id, groupId: target.groupId }, + "fire-reminder: send failed", + ); + await db + .update(reminderRunTargets) + .set({ status: "failed", error: (err as Error).message }) + .where( + and( + eq(reminderRunTargets.runId, runId), + eq(reminderRunTargets.groupId, target.groupId), + ), + ); + failedCount++; + } + }), + ), + ); + + const total = reminder.targets.length; + let status: "success" | "partial" | "failed"; + let errorSummary: string | null = null; + if (sentCount === total) { + status = "success"; + } else if (sentCount > 0) { + status = "partial"; + errorSummary = windowClosed + ? `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}). ${sentCount} of ${total} groups delivered. This account is at capacity for this fan-out — consider sending the remainder from another paired account.` + : `${sentCount} of ${total} groups delivered (${failedCount} failed, ${skippedCount} skipped).`; + } else { + status = "failed"; + errorSummary = windowClosed + ? `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}) before any group could be sent. The reminder fired too late in the day.` + : `All ${total} sends failed.`; + } + + await db + .update(reminderRuns) + .set({ status, errorSummary }) + .where(eq(reminderRuns.id, runId)); + + await pgNotifyWeb({ + type: "reminder.fired", + reminderId: reminder.id, + runId, + status, + }); + + // One-off reminders end after firing. Recurring reminders re-arm. + if (reminder.scheduleKind === "one_off") { + await db + .update(reminders) + .set({ status: "ended", updatedAt: new Date() }) + .where(eq(reminders.id, reminder.id)); + } else if (reminder.scheduleKind === "recurring" && reminder.rrule) { + const next = nextOccurrence(reminder.rrule, reminder.timezone, new Date()); + await db + .update(reminders) + .set({ lastFiredAt: new Date(), updatedAt: new Date() }) + .where(eq(reminders.id, reminder.id)); + if (next) { + try { + await scheduleReminderFire(getBoss(), reminder.id, next); + logger.info({ reminderId: reminder.id, next }, "fire-reminder: re-armed for next occurrence"); + } catch (err) { + logger.error({ err, reminderId: reminder.id }, "fire-reminder: failed to re-arm next occurrence"); + } + } else { + logger.info({ reminderId: reminder.id }, "fire-reminder: no further occurrences, ending"); + await db.update(reminders).set({ status: "ended" }).where(eq(reminders.id, reminder.id)); + } + } + + await writeAuditLog(db, { + operatorId: reminder.createdBy, + source: "system", + action: "reminder.fired", + targetType: "reminder", + targetId: reminder.id, + payload: { runId, status, sent: sentCount, failed: failedCount, skipped: skippedCount }, + }); + + logger.info( + { reminderId: reminder.id, runId, status, sent: sentCount, failed: failedCount, skipped: skippedCount }, + "fire-reminder: done", + ); +} + +/** + * Mark every target as skipped with the given error. Used when the + * account is offline before the loop even starts (no run_target rows + * have been created yet, so we INSERT instead of UPDATE). + */ +async function markAllSkipped( + runId: string, + reminder: NonNullable>>, + error: string, +): Promise { + if (reminder.targets.length === 0) return; + const rows = await db.query.whatsappGroups.findMany({ + where: (g) => inArray(g.id, reminder.targets.map((t) => t.groupId)), + columns: { id: true, name: true }, + }); + const labelById = new Map(rows.map((r) => [r.id, r.name])); + await db.insert(reminderRunTargets).values( + reminder.targets.map((t) => ({ + runId, + groupId: t.groupId, + groupLabel: labelById.get(t.groupId) ?? null, + status: "skipped" as const, + error, + })), + ); +} + +/** + * Mutates the prebuilt proto.Message to set the caption on whichever + * media variant it carries. Baileys' relayMessage does not let us + * pass the caption alongside; the protobuf already carries the slot. + */ +function injectCaption(msg: proto.Message, caption: string): void { + if (msg.imageMessage) msg.imageMessage.caption = caption; + else if (msg.videoMessage) msg.videoMessage.caption = caption; + else if (msg.documentMessage) msg.documentMessage.caption = caption; +} +``` + +- [ ] **Step 3: Add `p-limit` to apps/bot deps** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm add -F @cmbot/bot p-limit +``` + +Expected: pnpm reports `+1` package. The lockfile updates. + +- [ ] **Step 4: Typecheck the bot** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot typecheck +``` + +Expected: passes. If type errors mention `relayMessage` overloads, double-check the `@whiskeysockets/baileys` import added — the function comes from the socket instance, not the package's named exports. + +- [ ] **Step 5: Run all bot tests** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run +``` + +Expected: PASS for all (existing 26 + 7 PerKeyMutex + 8 RateLimiter + 6 DeliveryWindow + 4 MediaUploadCache = 51). + +- [ ] **Step 6: Restart the bot to pick up the rewrite** + +```bash +NO_SUDO=1 ./scripts/dev.sh restart-bot +NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml logs --tail=40 bot 2>&1 | grep -i "reminder.fire" +``` + +Expected: A line like `reminder.fire: handler registered teamSize=8`. + +- [ ] **Step 7: Commit** + +```bash +git add apps/bot/src/scheduler/fire-reminder.ts apps/bot/src/scheduler/reminder-jobs.ts apps/bot/package.json pnpm-lock.yaml +git commit -m "feat(bot): windowed, pacing-safe fan-out + +Rewrites the per-target loop to: + - Wrap inner work in PerKeyMutex(accountId) so two reminders on the + same account take turns; different accounts run in parallel. + - Bulk-load groups and media rows up front (drops ~3000 round-trips + to ~3 for a 1000-group run). + - Pre-create run_target rows with status='pending' so the Activity + tab shows progress mid-run. + - Pre-upload each unique media via MediaUploadCache (one + generateWAMessageContent call per mediaId, then relayMessage to + every group). For 1000 groups × 5 MB image, this turns 5 GB of + upload into 5 MB. + - Run BOT_GROUP_CONCURRENCY (default 3) groups in parallel within + one account; each group's parts stay serial for visible order. + - Gate every socket call on a per-account TokenBucket + (BOT_MAX_SEND_PER_MINUTE, default 40). + - Replace the rigid 1.5s inter-part sleep with 200..499 ms jitter. + - Window-end gate: stop sending once today's end-hour (in the + reminder's timezone) has passed; mark unstarted targets skipped; + surface a partial-status message that names the timezone, the + delivered/total count, and points at multi-account offload. + +reminder-jobs.ts now passes teamSize=BOT_FIRE_CONCURRENCY (default 8) +to boss.work so up to 8 different-account reminders run concurrently." +``` + +--- + +## Task 8: Web wiring — schema, action, wizard input, notification body + +**Files:** +- Modify: `apps/web/src/actions/reminders.ts` +- Modify: `apps/web/src/components/reminder-wizard/when-form-client.tsx` +- Modify: `apps/web/src/components/reminder-edit/edit-when-form.tsx` +- Modify: `apps/web/src/lib/notifications.ts` +- Create: `apps/web/src/lib/notifications-body.test.ts` (extends existing notifications tests with the partial-status body) + +- [ ] **Step 1: Update the create/update Zod schemas in `apps/web/src/actions/reminders.ts`** + +Find the `createReminderSchema` definition (around lines 220–250) and add the two fields. Schema chunk: + +```ts +const createReminderSchema = z + .object({ + accountId: z.string().uuid(), + groupIds: z.array(z.string().uuid()), + messages: z.array(messagePartSchema).optional(), + name: z.string().nullable().optional(), + text: z.string().nullable().optional(), + mediaId: z.string().uuid().nullable().optional(), + caption: z.string().nullable().optional(), + scheduledAtIso: z.string().datetime({ offset: true }), + rrule: z.string().nullable().optional(), + timezone: z.string().default(DEFAULT_TIMEZONE), + // Delivery window. End hour is enforced at runtime by fire-reminder; + // start hour is documented but not gated in v1. + deliveryWindowStartHour: z.number().int().min(0).max(24).default(6), + deliveryWindowEndHour: z.number().int().min(0).max(24).default(18), + }) + .refine( + (d) => + (d.messages && d.messages.length > 0) || + Boolean(d.text?.trim()) || + Boolean(d.mediaId), + { + message: "Add a message or attach a file", + path: ["messages"], + }, + ) + .refine((d) => d.deliveryWindowStartHour < d.deliveryWindowEndHour, { + message: "Delivery window start must be earlier than end", + path: ["deliveryWindowStartHour"], + }); +``` + +Then in BOTH `createReminderAction` and `updateReminderAction`, where the `db.insert(reminders).values({...})` / `db.update(reminders).set({...})` happens, include the two new fields. Find the `values({` blocks (around line 350 and line 460) and add: + +```ts +deliveryWindowStartHour: parsed.data.deliveryWindowStartHour, +deliveryWindowEndHour: parsed.data.deliveryWindowEndHour, +``` + +immediately after `timezone,`. + +- [ ] **Step 2: Add the input controls to `apps/web/src/components/reminder-wizard/when-form-client.tsx`** + +Add to the props interface and component state. Top-of-file imports stay; add a new state and a new section in the JSX. The whole edit: + +Find the `interface WhenFormClientProps` block. Add: + +```ts +interface WhenFormClientProps { + accountId: string; + groupIds: string; + timezone: string; + initialDefaultIso: string; + initialSpec?: RecurrenceSpec; + initialDeliveryStartHour?: number; + initialDeliveryEndHour?: number; + passThroughParams: PassThroughParams; +} +``` + +In the component body (around state declarations near the top), add: + +```ts +const [deliveryStartHour, setDeliveryStartHour] = useState( + props.initialDeliveryStartHour ?? 6, +); +const [deliveryEndHour, setDeliveryEndHour] = useState( + props.initialDeliveryEndHour ?? 18, +); +``` + +In `handleContinue`, when building the URLSearchParams, add (after the existing params): + +```ts +sp.set("deliveryStartHour", String(deliveryStartHour)); +sp.set("deliveryEndHour", String(deliveryEndHour)); +``` + +In the JSX, just before the existing `` block, add a new "Delivery hours" card: + +```tsx +
+ +
+ setDeliveryStartHour(Number(e.target.value))} + className="h-9 w-20" + aria-label="Delivery start hour" + /> + to + setDeliveryEndHour(Number(e.target.value))} + className="h-9 w-20" + aria-label="Delivery end hour" + /> + + (24-hour, in {timezone}) + +
+

+ The bot stops sending after the end hour. Long fan-outs that don't + finish in this window get reported as partial. +

+
+``` + +Add `ClockIcon` to the lucide-react imports if it's not already there (file currently imports `CalendarIcon`, `ClockIcon`, `AlertCircleIcon` — already there). + +- [ ] **Step 3: Wire the new params into `apps/web/src/components/reminder-wizard/step-when.tsx`** + +The wizard reads `initialDeliveryStartHour` / `initialDeliveryEndHour` from URL and passes them to ``. Find where `` is rendered and add the two new props: + +```tsx + +``` + +Also update the `interface StepWhenParams` (or wherever the searchParams type lives in step-when.tsx) to include `deliveryStartHour?: string; deliveryEndHour?: string;`. + +- [ ] **Step 4: Pass-through in step-groups.tsx and step-review.tsx** + +Both step pages thread URL params through. Open each and find where they re-emit the URL (look for the `URLSearchParams` builder or `editLink` helper). Add: + +```ts +if (sp.deliveryStartHour) backParams.set("deliveryStartHour", sp.deliveryStartHour); +if (sp.deliveryEndHour) backParams.set("deliveryEndHour", sp.deliveryEndHour); +``` + +Add the same to `editLink` in step-review.tsx so the "Edit when" link from the review page round-trips the values. + +In `review-submit-client.tsx`, where the action payload is built, include the two fields: + +```ts +const payload = { + accountId, + groupIds: groupIds ? groupIds.split(",").filter(Boolean) : [], + name: name?.trim() || null, + messages, + scheduledAtIso: scheduledAt, + rrule: rrule ?? null, + timezone, + deliveryWindowStartHour: deliveryStartHour ?? 6, + deliveryWindowEndHour: deliveryEndHour ?? 18, +}; +``` + +(And add `deliveryStartHour?: number; deliveryEndHour?: number;` to the props interface and pass them in from `step-review.tsx` after parsing the URL.) + +- [ ] **Step 5: Update `apps/web/src/components/reminder-edit/edit-when-form.tsx` similarly** + +Add the same `Delivery hours` card into the form, same state vars, and include the two fields in the `updateReminderAction` payload (find the existing `updateReminderAction({...})` call). Pull the initial values from the loaded reminder row in the parent edit page (`apps/web/src/app/reminders/[id]/edit/when/page.tsx`): + +```tsx + t.groupId)} + messages={initialMessages} + name={reminder.name} + initialIso={(reminder.scheduledAt ?? new Date()).toISOString()} + initialSpec={specFromRrule(reminder.rrule)} + timezone={reminder.timezone} + initialDeliveryStartHour={reminder.deliveryWindowStartHour} + initialDeliveryEndHour={reminder.deliveryWindowEndHour} +/> +``` + +The form's prop interface gets: + +```ts +initialDeliveryStartHour?: number; +initialDeliveryEndHour?: number; +``` + +- [ ] **Step 6: Extend the partial-status notification body in `apps/web/src/lib/notifications.ts`** + +Find `reminderFiredToNotification`. The existing function takes `{ reminderId, runId, status }`. Update the body to mention the delivered/total when status is `partial`: + +```ts +export function reminderFiredToNotification(event: { + type: "reminder.fired"; + reminderId: string; + runId: string; + status: string; + sent?: number; + total?: number; +}): ShowNotificationOptions | null { + if (event.status === "skipped") return null; + const headline = + event.status === "success" + ? "Reminder sent" + : event.status === "partial" + ? "Reminder partly sent" + : "Reminder failed"; + let body = + event.status === "success" + ? "All groups received the message." + : event.status === "partial" + ? "Some groups received the message; others failed. See activity." + : "No groups received the message. See activity."; + if (event.status === "partial" && event.sent !== undefined && event.total !== undefined) { + body = `${event.sent} of ${event.total} groups delivered. See activity for details.`; + } + return { + title: headline, + body, + tag: `reminder:${event.reminderId}`, + href: `/reminders/${event.reminderId}`, + }; +} +``` + +- [ ] **Step 7: Add tests for the extended notification body** + +Append to `apps/web/src/lib/notifications.test.ts` (inside the existing `describe("reminderFiredToNotification mapping", () => {...})` block): + +```ts +it("partial with sent/total renders 'X of Y groups delivered' instead of the generic body", () => { + const args = reminderFiredToNotification({ + type: "reminder.fired", + reminderId: "r-1", + runId: "run-1", + status: "partial", + sent: 412, + total: 1000, + }); + expect(args?.body).toBe("412 of 1000 groups delivered. See activity for details."); +}); + +it("partial without sent/total falls back to the generic body", () => { + const args = reminderFiredToNotification({ + type: "reminder.fired", + reminderId: "r-1", + runId: "run-1", + status: "partial", + }); + expect(args?.body).toMatch(/Some groups received/); +}); +``` + +- [ ] **Step 8: Update existing edit-section-forms.test.tsx fixtures so they don't break on the new required props** + +Find `apps/web/src/components/reminder-edit/edit-section-forms.test.tsx`. Both `EditAccountForm` and `EditGroupsForm` may need additional defaults. If their props don't reference the new fields, the tests still pass — re-run them and patch only if compilation fails. Run: + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web typecheck +``` + +If typecheck reports `Property 'initialDeliveryStartHour' is missing` on any test, add `initialDeliveryStartHour: 6, initialDeliveryEndHour: 18` to that fixture. + +- [ ] **Step 9: Run web tests + bot tests** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run +``` + +Expected: web 298+ pass, bot 26+25 = 51 pass. Total ~380 across the project including shared. + +- [ ] **Step 10: Restart web container so the server bundle picks up the schema change** + +```bash +NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml restart web +``` + +- [ ] **Step 11: Commit** + +```bash +git add apps/web +git commit -m "feat(web): delivery-window inputs in wizard + edit; partial body extension + +Wizard 'When' step and the per-section edit-when page get two number +inputs for the delivery window hours (default 6/18). Server actions +accept them on the create/update Zod schemas, validate +0 <= start < end <= 24, and persist to the new reminders columns. + +Notification body for partial-status reminders now reads +'412 of 1000 groups delivered. See activity for details.' when the +SSE event carries sent/total counts." +``` + +--- + +## Acceptance check (manual) + +After all 8 tasks land: + +- [ ] **Smoke 1.** Create a reminder for 1 group with default delivery hours and a 30-second future fire time. Verify it lands; verify the run's `error_summary` is null and status `success`. + +- [ ] **Smoke 2.** Create a reminder with end hour set to a value that's already past in the operator's timezone. Schedule it to fire NOW. Verify the run resolves `failed` and the `error_summary` reads "Delivery window closed at H:00 (TZ) before any group could be sent." + +- [ ] **Smoke 3.** Create two reminders on TWO different paired accounts with the same `scheduledAt`. Watch bot logs: + + ```bash + NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml logs --tail=60 bot 2>&1 | grep -E "reminder|fire-reminder" + ``` + + Expected: both reminders' `fire-reminder: done` entries land within seconds of each other (parallel), not sequentially separated by a full fan-out. + +- [ ] **Smoke 4.** Create a reminder with a 5-MB JPEG and 3 groups. Fire it. Bot logs should show only ONE `prepareWAMessageMedia` / upload entry (look for the upload size in the Baileys debug log) followed by 3 relayMessage events. + +- [ ] **Final test sweep:** + + ```bash + NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run + NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run + NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/shared test -- --run + ``` + + Expected: all green. ~380 total. diff --git a/docs/superpowers/specs/2026-05-10-windowed-fanout-design.md b/docs/superpowers/specs/2026-05-10-windowed-fanout-design.md index 18982b6..1c29e39 100644 --- a/docs/superpowers/specs/2026-05-10-windowed-fanout-design.md +++ b/docs/superpowers/specs/2026-05-10-windowed-fanout-design.md @@ -7,9 +7,13 @@ ## Goal Deliver a reminder to many groups (target: 1000+) safely within a -per-reminder delivery window. If we cannot finish in the window, stop, -mark the run `partial`, and tell the operator the account is at -capacity for this fan-out. +per-reminder delivery window. If we cannot finish in the window, +**pause** the run at window-close, persist progress, and let the +operator **resume** later from the Activity / detail view. The +paused-status message tells the operator what's blocking throughput +(account at capacity, media size eating the budget) so they can +decide whether to offload to another paired account, shrink the +attachment, or just resume the next morning at 6am. ## Constraints @@ -131,19 +135,71 @@ default 3): Final status: - **success** — every target sent. -- **partial** — at least one sent, at least one not (window-close, - failed, missing group). `error_summary` reads: +- **paused** — window closed mid-run with at least one target still in + `pending`. Run carries a resumable state: sent rows stay `sent`, + unstarted rows stay `pending` (NOT skipped), failed rows stay + `failed`. `error_summary` reads: `"Delivery window closed at 18:00 (Asia/Kuala_Lumpur). 412 of 1000 - groups delivered. This account is at capacity for this fan-out — - consider sending the remainder from another paired account."` -- **failed** — zero sent. + groups delivered, 588 still pending. Resume from the Activity tab. + If this happens repeatedly, consider offloading to another paired + account, or shrinking the message body / media size to fit more + groups in your daily window."` +- **partial** — every target was attempted; some sent and some + failed/skipped (group missing from DB, account offline, send error + inside the window). Not resumable; the failures are real failures. +- **failed** — zero sent. Either every send errored, or the run hit + the window close BEFORE the first send (run fired too late to do + any work; nothing to resume). + +## Resume action + +A paused run can be resumed by the operator. Mechanism: + +- New server action `resumeReminderRunAction(runId)` validates + ownership, then enqueues a pg-boss job: + `boss.send("reminder.fire", { reminderId, runId })` with NO + singletonKey (resumes don't conflict with the reminder's normal + cron firing). +- The fire-reminder handler accepts an optional `runId` in its + payload. When present, it ATTACHES to that run instead of creating + a new one: + - Skips creating a new `reminder_runs` row. + - Loads the existing run's `reminder_run_targets` rows. + - Iterates only those with `status = 'pending'`. + - Re-uses the same windowEnd / rate limiter / media cache logic as + a fresh fire. + - On window close again, status flips back to `paused` with an + updated count. + - On success this round, status becomes `success` (if no failures + accumulated) or `partial` (if some failed). +- `failed` targets from the previous run are NOT retried on resume. + They're real errors — surfacing them as actionable in the UI is a + v2 concern (manual "retry failed" button). + +UI surfaces of paused runs: + +- Activity tab gets an amber "Paused" pill alongside the existing + Success/Partial/Failed/Skipped/Archived filters. Resume button + inline on each paused row. +- Reminder detail page's run history shows the same Resume button on + paused rows. +- The `reminder.fired` SSE event for status=paused triggers a + notification with title "Reminder paused" and body + `"X of Y groups delivered. Resume from the Activity tab."` ## Notification body -The existing `reminder.fired` SSE event already carries -`{ status }`. The web's notification mapper already handles -`partial` with a "see activity" hint. The body extends to mention -"X of Y delivered" when status === "partial". +The existing `reminder.fired` SSE event already carries `{ status }`. +The notification mapper extends: + +- `success` → unchanged. +- `partial` → body mentions delivered/total counts when present. +- `paused` → headline `"Reminder paused"`, body + `"X of Y groups delivered. Resume from the Activity tab."` Click + takes the operator to the reminder's detail page where the Resume + button lives. +- `failed` → unchanged. +- `skipped` → still filtered (bookkeeping noise). ## Components @@ -192,14 +248,20 @@ default to 6/18 and can be widened (e.g. 0/24) for a specific big run. ## Out of scope (v2 candidates) -- **Crash resumability across bot restarts.** Today, if the bot dies - mid-fan-out, pg-boss will retry the job; the loop will skip any - rows already marked `sent`, but the in-memory rate-limiter and - upload-cache state are gone — meaning the retry uploads media - again and starts pacing from a full bucket. Acceptable for v1. -- **Pause / resume mid-run** controls. -- **Cross-day window resume** (current design hard-stops at window - end and reports partial; doesn't queue the remainder for tomorrow). +- **Crash resumability across bot restarts.** If the bot dies + mid-fan-out (mid-window), pg-boss will retry the job; the loop's + pre-loaded `pending` rows still pick up correctly, but the + in-memory rate-limiter and upload-cache state are gone — the + retry re-uploads media and starts pacing from a full bucket. The + paused-state resumability covered above is a different mechanism: + it handles the "window closed cleanly" case end-to-end. The + "bot crashed mid-window" case is degraded but not broken. +- **Auto-resume next morning** when window opens again (today the + operator clicks Resume manually). +- **Pause-by-operator** (only window-close pauses; user-triggered + pause mid-fan-out isn't wired). +- **Retry-failed-targets** action (paused-resume only re-attempts + `pending` rows; `failed` rows stay failed). - **Multi-account auto-split** of a single reminder. - **Adaptive rate limiting** (auto-back-off on WA rate-limit response codes; today the operator tunes the env var). @@ -210,7 +272,14 @@ default to 6/18 and can be widened (e.g. 0/24) for a specific big run. in roughly 30–50 minutes, comfortably inside a 6am–6pm window. - Two reminders on different accounts firing within seconds of each other: both progress simultaneously, neither blocks the other. -- A run that hits the window end: stops cleanly, marks remaining as - skipped, surfaces the partial-status message in the Activity tab - and via the browser notification. -- 355 existing tests still pass; ≈25 new tests cover the new helpers. +- A run that hits the window end mid-fan-out: stops cleanly, marks + the run `paused`, leaves un-started targets as `pending`, surfaces + the paused-status notification with delivered/total counts. +- The operator clicks **Resume** on a paused run — fan-out continues + from the unsent targets, respecting the same per-account rate + limit + window. If it again can't finish, it pauses again with an + updated count. +- A run that hits the window end BEFORE any send (fired too late): + resolves `failed`, no resume offered. +- 355 existing tests still pass; ≈30 new tests cover the new helpers + and the paused/resume flow.