diff --git a/apps/web/src/actions/reminders.ts b/apps/web/src/actions/reminders.ts index ebeb2b5..d7e984d 100644 --- a/apps/web/src/actions/reminders.ts +++ b/apps/web/src/actions/reminders.ts @@ -230,12 +230,13 @@ const createReminderSchema = z // least one valid message either way. messages: z.array(messagePartSchema).optional(), // User-supplied label shown in the list / detail page header. - // Optional on the wire — when blank or missing the action body - // auto-derives a fallback from the first text-bearing message - // part. The reminders.name DB column is text(50), so the - // resolver clamps to 60 chars (mirrors the duplicate-action - // pattern that produces " (copy)") and trims whitespace. - name: z.string().nullable().optional(), + // Required: every reminder must carry a non-empty name. The + // resolver still clamps to REMINDER_NAME_MAX so the DB column + // never has to reject the row. The legacy auto-derive from the + // first message part is kept as a fallback ONLY for legacy + // bookmarked URLs (where the create form was submitted before + // the field was added) — new submits always carry a name. + name: z.string().trim().min(1, "Give the reminder a name").max(60), // Legacy single-message fields. Still accepted so bookmarked // /reminders/new URLs don't 400 after the migration. The action body // collapses these into `messages` before doing any work. diff --git a/apps/web/src/components/reminder-edit/edit-message-form.tsx b/apps/web/src/components/reminder-edit/edit-message-form.tsx index 43a4927..4a31e1d 100644 --- a/apps/web/src/components/reminder-edit/edit-message-form.tsx +++ b/apps/web/src/components/reminder-edit/edit-message-form.tsx @@ -47,6 +47,11 @@ export function EditMessageForm({ const [error, setError] = useState(null); async function handleSave() { + const trimmedName = name.trim(); + if (!trimmedName) { + setError("Give the reminder a name."); + return; + } if (messages.length === 0) { setError("Add at least one text or file part."); return; @@ -58,7 +63,7 @@ export function EditMessageForm({ reminderId, accountId, groupIds, - name: name.trim() || null, + name: trimmedName, messages, scheduledAtIso, rrule, @@ -83,19 +88,20 @@ export function EditMessageForm({ setName(e.target.value)} + onChange={(e) => { + setName(e.target.value); + setError(null); + }} placeholder="e.g. Sunday morning standup" maxLength={REMINDER_NAME_MAX} + required + aria-required="true" /> -

- Leave blank and the first line of your message will be used. -

(null); function handleContinue() { + const trimmedName = name.trim(); + if (!trimmedName) { + setError("Give the reminder a name."); + return; + } if (messages.length === 0) { setError("Add at least one text or file part."); return; @@ -48,8 +53,7 @@ export function ComposeFormClient({ const sp = new URLSearchParams({ step: "3", accountId }); if (groupIds) sp.set("groupIds", groupIds); sp.set("messages", encodeMessages(messages)); - const trimmedName = name.trim(); - if (trimmedName) sp.set("name", trimmedName); + sp.set("name", trimmedName); if (passThroughParams.scheduledAt) sp.set("scheduledAt", passThroughParams.scheduledAt); if (passThroughParams.rrule) sp.set("rrule", passThroughParams.rrule); if (passThroughParams.editReminderId) sp.set("editReminderId", passThroughParams.editReminderId); @@ -59,26 +63,26 @@ export function ComposeFormClient({ return (
- {/* Name — sits above the message stack so the user names the - reminder before composing. Optional: blank falls back to the - first text part on the server side. */} + {/* Name — required identifier shown in the list, detail page, + and run-history rows. Capped at REMINDER_NAME_MAX (60). */}
setName(e.target.value)} + onChange={(e) => { + setName(e.target.value); + setError(null); + }} placeholder="e.g. Sunday morning standup" maxLength={REMINDER_NAME_MAX} + required + aria-required="true" /> -

- Leave blank and the first line of your message will be used. -

(null); async function handleSchedule() { + const trimmedName = name?.trim(); + if (!trimmedName) { + // The wizard's compose step now blocks Continue when the name is + // blank, so the only way to land here without one is a stale + // bookmarked URL. Bounce the operator back to step 2 with a + // clear error rather than letting the server reject it. + setError("Give the reminder a name (back on the Message step)."); + return; + } setSubmitting(true); setError(null); @@ -41,7 +50,7 @@ export function ReviewSubmitClient({ const payload = { accountId, groupIds: groupIds ? groupIds.split(",").filter(Boolean) : [], - name: name?.trim() || null, + name: trimmedName, messages, scheduledAtIso: scheduledAt, rrule: rrule ?? null, diff --git a/docs/superpowers/plans/2026-05-10-windowed-fanout.md b/docs/superpowers/plans/2026-05-10-windowed-fanout.md new file mode 100644 index 0000000..da200ee --- /dev/null +++ b/docs/superpowers/plans/2026-05-10-windowed-fanout.md @@ -0,0 +1,1829 @@ +# Windowed, Pacing-Safe Reminder Fan-Out — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Deliver reminders to many groups (1000+) safely within a per-reminder delivery window (default 6am–6pm). On window-close mid-run, **pause** with a clear "account at capacity / consider offload or smaller media" message and let the operator **resume** later from the UI. + +**Architecture:** Replace the current serial fan-out with a per-account isolation model. pg-boss `teamSize` raises so different-account reminders run in parallel; a per-key (accountId) async mutex serialises same-account fan-outs; a per-account token-bucket rate limiter paces sends; a per-run media-upload cache reuses each unique file across every group via Baileys' `prepareWAMessageMedia` + `relayMessage`; a window-end gate flips the run to `paused` (un-sent targets stay `pending`); a `resumeReminderRunAction` re-enters fire-reminder with the existing runId so the loop picks up only the still-pending targets. + +**Tech Stack:** Node.js + TypeScript, Baileys 7.0.0-rc10 (`prepareWAMessageMedia`, `relayMessage`, `generateMessageID`), pg-boss v12 (`boss.work` with `teamSize`), Drizzle ORM + Postgres, vitest. + +--- + +## File Structure + +| File | Role | +| --- | --- | +| `packages/db/migrations/0008_*.sql` (generated) | add `delivery_window_start_hour`, `delivery_window_end_hour` to `reminders` | +| `packages/db/src/schema.ts` | drizzle alignment for the two new columns | +| `apps/bot/src/env.ts` | three new env vars: `BOT_FIRE_CONCURRENCY`, `BOT_GROUP_CONCURRENCY`, `BOT_MAX_SEND_PER_MINUTE` | +| `apps/bot/src/scheduler/per-key-mutex.ts` (new) | accountId-keyed async mutex | +| `apps/bot/src/scheduler/per-key-mutex.test.ts` (new) | unit tests | +| `apps/bot/src/scheduler/rate-limiter.ts` (new) | per-account token bucket | +| `apps/bot/src/scheduler/rate-limiter.test.ts` (new) | fake-clock unit tests | +| `apps/bot/src/scheduler/delivery-window.ts` (new) | pure window-end calculator | +| `apps/bot/src/scheduler/delivery-window.test.ts` (new) | unit tests | +| `apps/bot/src/scheduler/media-upload-cache.ts` (new) | `prepareWAMessageMedia` results, keyed by mediaId | +| `apps/bot/src/scheduler/media-upload-cache.test.ts` (new) | mock-socket unit tests | +| `apps/bot/src/scheduler/fire-reminder.ts` (rewrite) | new loop using all of the above | +| `apps/bot/src/scheduler/reminder-jobs.ts` | pass `teamSize` config | +| `apps/web/src/actions/reminders.ts` | accept the two new fields on create/update | +| `apps/web/src/components/reminder-wizard/when-form-client.tsx` | "Delivery hours" inputs | +| `apps/web/src/components/reminder-edit/edit-when-form.tsx` | same | +| `apps/web/src/lib/notifications.ts` | partial-status notification body extension | + +--- + +## Task 1: Schema migration — delivery window columns + +**Files:** +- Modify: `packages/db/src/schema.ts` (lines around the `reminders` table — add 2 columns) +- Generate: `packages/db/migrations/0008_.sql` + +- [ ] **Step 1: Edit `packages/db/src/schema.ts` — add the two columns to `reminders`** + +Find the `reminders` table block and append the two new columns just before the closing `});`. The existing block ends with `lastFiredAt: timestamp(...)`. Add: + +```ts +export const reminders = pgTable("reminders", { + id: uuid("id").primaryKey().defaultRandom(), + accountId: uuid("account_id").notNull().references(() => whatsappAccounts.id, { onDelete: "cascade" }), + name: text("name").notNull(), + scheduleKind: text("schedule_kind").notNull(), + scheduledAt: timestamp("scheduled_at", { withTimezone: true }), + rrule: text("rrule"), + timezone: text("timezone").notNull(), + endsAt: timestamp("ends_at", { withTimezone: true }), + maxRuns: integer("max_runs"), + status: text("status").notNull().default("active"), + createdBy: uuid("created_by").notNull().references(() => operators.id), + createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(), + updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(), + lastFiredAt: timestamp("last_fired_at", { withTimezone: true }), + // Delivery window — operator-supplied hours (in the row's timezone). + // Only the END hour is enforced at runtime in v1: the run loop stops + // sending once `now()` crosses today's end hour. The START hour is + // documented for the UI; not gated. See spec + // docs/superpowers/specs/2026-05-10-windowed-fanout-design.md. + deliveryWindowStartHour: integer("delivery_window_start_hour").notNull().default(6), + deliveryWindowEndHour: integer("delivery_window_end_hour").notNull().default(18), +}); +``` + +- [ ] **Step 2: Generate the migration** + +Run: + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/db generate +``` + +Expected: a file appears at `packages/db/migrations/0008_.sql` containing `ALTER TABLE "reminders" ADD COLUMN "delivery_window_start_hour" integer DEFAULT 6 NOT NULL;` and a similar line for `delivery_window_end_hour`. + +- [ ] **Step 3: Apply the migration** + +Run: + +```bash +NO_SUDO=1 ./scripts/db.sh migrate +``` + +Expected: `Migrations applied.` Verify with: + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/db build +``` + +Expected: clean tsc output (the @cmbot/db package re-builds so downstream consumers pick up the new columns). + +- [ ] **Step 4: Typecheck the bot + web to confirm no schema-consumer regressions** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot typecheck +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web typecheck +``` + +Expected: both pass with no errors. + +- [ ] **Step 5: Commit** + +```bash +git add packages/db +git commit -m "feat(db): add delivery_window_start_hour / end_hour to reminders + +Both default 6 / 18, stored as int (hour-of-day in the row's +timezone). Only the end hour is gated at runtime — see spec +docs/superpowers/specs/2026-05-10-windowed-fanout-design.md." +``` + +--- + +## Task 2: Bot env vars — concurrency + rate caps + +**Files:** +- Modify: `apps/bot/src/env.ts` + +- [ ] **Step 1: Replace `apps/bot/src/env.ts` contents** + +Whole file: + +```ts +import { z } from "zod"; + +const numberFromString = z.string().regex(/^\d+$/).transform((s) => Number(s)); + +const envSchema = z.object({ + DATABASE_URL: z.string().url(), + DATA_DIR: z.string().min(1), + SESSIONS_DIR: z.string().min(1), + MEDIA_DIR: z.string().min(1), + BOT_HEALTH_PORT: numberFromString, + BOT_LOG_LEVEL: z.enum(["fatal", "error", "warn", "info", "debug", "trace"]).default("info"), + + // Reminder fan-out tuning. Defaults aim for an established WhatsApp + // account (~30-60 msg/min safe band). + // - BOT_FIRE_CONCURRENCY: pg-boss worker pool size for the + // `reminder.fire` queue. Sets the max number of accounts that can + // run their fan-outs simultaneously. 8 is a sane default; raise if + // you have more paired accounts firing concurrently. + // - BOT_GROUP_CONCURRENCY: per-account parallel group sends. Each + // group's parts stay serial (preserves visible order in the chat). + // 3 has been empirically stable on real traffic; anything above 5 + // without observation is asking for trouble. + // - BOT_MAX_SEND_PER_MINUTE: per-account token-bucket rate. 40 is the + // safe default; loosen to 60 only after a few weeks of running + // without flags. Tighten to 20 if WA returns rate-limit errors. + BOT_FIRE_CONCURRENCY: numberFromString.default("8"), + BOT_GROUP_CONCURRENCY: numberFromString.default("3"), + BOT_MAX_SEND_PER_MINUTE: numberFromString.default("40"), +}); + +export type Env = z.infer; + +export function parseEnv(input: Record): Env { + return envSchema.parse(input); +} + +export const env = parseEnv(process.env); +``` + +- [ ] **Step 2: Typecheck** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot typecheck +``` + +Expected: passes. + +- [ ] **Step 3: Run the bot tests to confirm `parseEnv` defaults still validate** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run env +``` + +Expected: all `env.test.ts` tests pass (existing tests don't supply the new vars; defaults must kick in). + +- [ ] **Step 4: Commit** + +```bash +git add apps/bot/src/env.ts +git commit -m "feat(bot): add fan-out tuning env vars + +BOT_FIRE_CONCURRENCY (8) — pg-boss worker pool, gates max accounts +running fan-outs simultaneously. +BOT_GROUP_CONCURRENCY (3) — per-account parallel group sends. +BOT_MAX_SEND_PER_MINUTE (40) — per-account rate-limit budget. + +Defaults are tuned for an established WhatsApp account. See spec for +the safe band rationale." +``` + +--- + +## Task 3: PerKeyMutex (TDD) + +**Files:** +- Create: `apps/bot/src/scheduler/per-key-mutex.test.ts` +- Create: `apps/bot/src/scheduler/per-key-mutex.ts` + +- [ ] **Step 1: Write the failing test file at `apps/bot/src/scheduler/per-key-mutex.test.ts`** + +```ts +import { describe, it, expect } from "vitest"; +import { PerKeyMutex } from "./per-key-mutex.js"; + +/** Tiny clock-free helper: returns a Promise that resolves after + * `n` microtasks. Lets us check ordering without real timers. */ +function tickN(n: number): Promise { + let p: Promise = Promise.resolve(); + for (let i = 0; i < n; i++) p = p.then(); + return p; +} + +describe("PerKeyMutex", () => { + it("allows a single call against one key to run immediately", async () => { + const m = new PerKeyMutex(); + const result = await m.run("k1", async () => 42); + expect(result).toBe(42); + }); + + it("serialises two calls against the same key", async () => { + const m = new PerKeyMutex(); + const order: string[] = []; + + const a = m.run("k1", async () => { + order.push("a-start"); + await tickN(5); + order.push("a-end"); + }); + const b = m.run("k1", async () => { + order.push("b-start"); + order.push("b-end"); + }); + + await Promise.all([a, b]); + // b cannot start until a has finished. + expect(order).toEqual(["a-start", "a-end", "b-start", "b-end"]); + }); + + it("runs different keys in parallel", async () => { + const m = new PerKeyMutex(); + const order: string[] = []; + + const a = m.run("k1", async () => { + order.push("a-start"); + await tickN(5); + order.push("a-end"); + }); + const b = m.run("k2", async () => { + order.push("b-start"); + order.push("b-end"); + }); + + await Promise.all([a, b]); + // b doesn't wait for a — interleaving expected. + expect(order[0]).toBe("a-start"); + expect(order).toContain("b-start"); + expect(order).toContain("b-end"); + // b's pair lands before a's end. + expect(order.indexOf("b-end")).toBeLessThan(order.indexOf("a-end")); + }); + + it("releases the lock when the handler throws", async () => { + const m = new PerKeyMutex(); + await expect( + m.run("k1", async () => { + throw new Error("boom"); + }), + ).rejects.toThrow("boom"); + // Next call on the same key must NOT hang. + const result = await m.run("k1", async () => "after"); + expect(result).toBe("after"); + }); + + it("forwards the resolved value of the handler", async () => { + const m = new PerKeyMutex(); + const out = await m.run("k1", async () => ({ ok: true, n: 7 })); + expect(out).toEqual({ ok: true, n: 7 }); + }); + + it("cleans up internal state for keys with no waiters", async () => { + const m = new PerKeyMutex(); + await m.run("k1", async () => {}); + expect(m.activeKeyCount()).toBe(0); + }); + + it("retains a key while a chain is in flight, then drops it", async () => { + const m = new PerKeyMutex(); + let release!: () => void; + const gate = new Promise((r) => (release = r)); + + const inFlight = m.run("k1", () => gate); + expect(m.activeKeyCount()).toBe(1); + release(); + await inFlight; + expect(m.activeKeyCount()).toBe(0); + }); +}); +``` + +- [ ] **Step 2: Run the failing test to confirm it fails for the right reason** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run per-key-mutex +``` + +Expected: FAIL with "Cannot find module './per-key-mutex.js'". + +- [ ] **Step 3: Implement `apps/bot/src/scheduler/per-key-mutex.ts`** + +```ts +/** + * Async mutex keyed by a string. Different keys run in parallel; + * same-key calls serialise. + * + * Used by fire-reminder so two reminders on the SAME WhatsApp account + * take turns (running them concurrently would double the effective + * send rate and risk a ban), while reminders on DIFFERENT accounts + * proceed in parallel. + * + * The implementation is a chain-per-key Promise: each call appends + * its work to the key's tail. Empty chains are cleaned up so the + * Map doesn't grow unbounded across the bot's lifetime. + */ +export class PerKeyMutex { + private chains = new Map>(); + + /** Run `fn` exclusively against `key`. Returns whatever `fn` returns. */ + async run(key: string, fn: () => Promise): Promise { + const prev = this.chains.get(key) ?? Promise.resolve(); + + let release!: () => void; + const completion = new Promise((r) => (release = r)); + // The chain we publish is "what was already there + this completion". + // The next caller awaits THIS, so order is preserved. + const chained = prev.then(() => completion); + this.chains.set(key, chained); + + try { + await prev; + return await fn(); + } finally { + release(); + // Drop the entry only if no later caller has appended in the + // meantime — otherwise we'd evict the in-flight chain. + if (this.chains.get(key) === chained) { + this.chains.delete(key); + } + } + } + + /** Diagnostic: how many keys currently have an in-flight or queued chain. */ + activeKeyCount(): number { + return this.chains.size; + } +} + +/** + * Singleton mutex used by fire-reminder, keyed by accountId. Lives at + * module scope so multiple pg-boss workers in the same process share + * state. + */ +export const accountMutex = new PerKeyMutex(); +``` + +- [ ] **Step 4: Run the test to confirm it passes** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run per-key-mutex +``` + +Expected: PASS, 7 tests. + +- [ ] **Step 5: Commit** + +```bash +git add apps/bot/src/scheduler/per-key-mutex.ts apps/bot/src/scheduler/per-key-mutex.test.ts +git commit -m "feat(bot): PerKeyMutex for accountId-keyed serialisation + +Same key serialises, different keys run in parallel. Used by +fire-reminder to prevent two same-account fan-outs from doubling +the effective send rate (which would risk a WhatsApp ban). Chains +auto-clean empty entries so the Map doesn't leak." +``` + +--- + +## Task 4: TokenBucket rate limiter (TDD) + +**Files:** +- Create: `apps/bot/src/scheduler/rate-limiter.test.ts` +- Create: `apps/bot/src/scheduler/rate-limiter.ts` + +- [ ] **Step 1: Write the failing test file at `apps/bot/src/scheduler/rate-limiter.test.ts`** + +```ts +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { TokenBucket, accountRateLimiter } from "./rate-limiter.js"; + +describe("TokenBucket", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-10T10:00:00Z")); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + it("starts full: first N=capacity acquires resolve immediately", async () => { + const b = new TokenBucket({ ratePerMinute: 60, capacity: 5 }); + for (let i = 0; i < 5; i++) { + await b.acquire(); // should not stall + } + // The 6th is a different test — see below. + }); + + it("blocks the (capacity+1)th acquire until a token regenerates", async () => { + // 60/min = 1 token per second. Capacity 2. + const b = new TokenBucket({ ratePerMinute: 60, capacity: 2 }); + await b.acquire(); + await b.acquire(); + + let resolved = false; + const pending = b.acquire().then(() => { + resolved = true; + }); + + // Immediately after draining, no token available. + await Promise.resolve(); + expect(resolved).toBe(false); + + // Advance the clock by exactly 1 second — one token should be back. + await vi.advanceTimersByTimeAsync(1000); + await pending; + expect(resolved).toBe(true); + }); + + it("FIFO: pending acquires resolve in the order they arrived", async () => { + const b = new TokenBucket({ ratePerMinute: 60, capacity: 1 }); + await b.acquire(); // bucket empty + + const order: number[] = []; + const a = b.acquire().then(() => order.push(1)); + const c = b.acquire().then(() => order.push(2)); + + // Two seconds → two tokens regenerate, in order. + await vi.advanceTimersByTimeAsync(2000); + await Promise.all([a, c]); + expect(order).toEqual([1, 2]); + }); + + it("does not over-fill past capacity even if the clock leaps forward", async () => { + const b = new TokenBucket({ ratePerMinute: 60, capacity: 3 }); + // Drain. + await b.acquire(); + await b.acquire(); + await b.acquire(); + // Leap an hour. Naive impl would credit 3600 tokens; we should cap at 3. + await vi.advanceTimersByTimeAsync(3_600_000); + // 3 immediate acquires should resolve. + await b.acquire(); + await b.acquire(); + await b.acquire(); + // The 4th waits for fresh regeneration. + let resolved = false; + b.acquire().then(() => (resolved = true)); + await Promise.resolve(); + expect(resolved).toBe(false); + }); + + it("ratePerMinute=0 is rejected at construction (caller bug, fail loud)", () => { + expect(() => new TokenBucket({ ratePerMinute: 0, capacity: 1 })).toThrow(); + }); +}); + +describe("accountRateLimiter (singleton)", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-10T10:00:00Z")); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + it("returns the SAME bucket for repeated lookups of one accountId", () => { + const a1 = accountRateLimiter.get("acct-1"); + const a2 = accountRateLimiter.get("acct-1"); + expect(a1).toBe(a2); + }); + + it("returns DIFFERENT buckets for different accountIds (isolation)", () => { + const a = accountRateLimiter.get("acct-A"); + const b = accountRateLimiter.get("acct-B"); + expect(a).not.toBe(b); + }); + + it("a drained account A bucket does not block account B", async () => { + const a = accountRateLimiter.get("acct-A"); + const b = accountRateLimiter.get("acct-B"); + // Drain A entirely (capacity defaults to ratePerMinute, so we + // need to pull at least that many). + for (let i = 0; i < 40; i++) await a.acquire(); + + // B should still grant immediately. + let bResolved = false; + b.acquire().then(() => (bResolved = true)); + await Promise.resolve(); + expect(bResolved).toBe(true); + }); +}); +``` + +- [ ] **Step 2: Run the failing test** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run rate-limiter +``` + +Expected: FAIL with "Cannot find module './rate-limiter.js'". + +- [ ] **Step 3: Implement `apps/bot/src/scheduler/rate-limiter.ts`** + +```ts +import { env } from "../env.js"; + +/** + * Token bucket for per-account send pacing. + * + * Tokens regenerate at `ratePerMinute / 60` per second. Capacity caps + * how many can accumulate during idle periods (so the operator can't + * burst 1000 messages just because the account was quiet for a day). + * + * `acquire()` resolves when a token is available, FIFO across waiters. + * Used by fire-reminder to gate every `socket.sendMessage` call. + */ +export interface TokenBucketOptions { + ratePerMinute: number; + /** Defaults to ratePerMinute (one minute's worth). */ + capacity?: number; +} + +export class TokenBucket { + private readonly ratePerMs: number; + private readonly capacity: number; + private tokens: number; + private lastRefillMs: number; + private waiters: Array<() => void> = []; + + constructor(opts: TokenBucketOptions) { + if (opts.ratePerMinute <= 0) { + throw new Error(`TokenBucket: ratePerMinute must be > 0, got ${opts.ratePerMinute}`); + } + this.ratePerMs = opts.ratePerMinute / 60_000; + this.capacity = opts.capacity ?? opts.ratePerMinute; + this.tokens = this.capacity; + this.lastRefillMs = Date.now(); + } + + /** Resolve when a token is available. FIFO across concurrent waiters. */ + async acquire(): Promise { + this.refill(); + if (this.tokens >= 1 && this.waiters.length === 0) { + this.tokens -= 1; + return; + } + return new Promise((resolve) => { + this.waiters.push(resolve); + this.scheduleNext(); + }); + } + + private refill(): void { + const now = Date.now(); + const elapsed = now - this.lastRefillMs; + if (elapsed <= 0) return; + const gained = elapsed * this.ratePerMs; + this.tokens = Math.min(this.capacity, this.tokens + gained); + this.lastRefillMs = now; + } + + private scheduleNext(): void { + // Wait until at least one token is available, then drain waiters + // FIFO. We compute the gap from current fractional token deficit. + this.refill(); + while (this.tokens >= 1 && this.waiters.length > 0) { + this.tokens -= 1; + const w = this.waiters.shift()!; + w(); + } + if (this.waiters.length === 0) return; + + const tokensShort = 1 - this.tokens; + const waitMs = Math.max(1, Math.ceil(tokensShort / this.ratePerMs)); + setTimeout(() => this.scheduleNext(), waitMs); + } +} + +/** + * Per-accountId TokenBucket registry. Each account gets its own + * pacing budget, so a slow account A never throttles account B. + */ +class AccountRateLimiter { + private buckets = new Map(); + private ratePerMinute: number; + + constructor(ratePerMinute: number) { + this.ratePerMinute = ratePerMinute; + } + + get(accountId: string): TokenBucket { + let b = this.buckets.get(accountId); + if (!b) { + b = new TokenBucket({ ratePerMinute: this.ratePerMinute }); + this.buckets.set(accountId, b); + } + return b; + } +} + +export const accountRateLimiter = new AccountRateLimiter(env.BOT_MAX_SEND_PER_MINUTE); +``` + +- [ ] **Step 4: Run the tests** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run rate-limiter +``` + +Expected: PASS, 8 tests. + +- [ ] **Step 5: Commit** + +```bash +git add apps/bot/src/scheduler/rate-limiter.ts apps/bot/src/scheduler/rate-limiter.test.ts +git commit -m "feat(bot): per-account token-bucket rate limiter + +TokenBucket gates each socket.sendMessage call. Tokens regenerate at +ratePerMinute/60 per second, capped at one minute's worth so quiet +accounts can't burst. FIFO drain across concurrent waiters. + +accountRateLimiter (singleton) hands out one bucket per accountId, so +account A's drain never throttles account B. Default rate is +BOT_MAX_SEND_PER_MINUTE (40) — the safe band for an established +WhatsApp account." +``` + +--- + +## Task 5: Delivery window helper (TDD) + +**Files:** +- Create: `apps/bot/src/scheduler/delivery-window.test.ts` +- Create: `apps/bot/src/scheduler/delivery-window.ts` + +- [ ] **Step 1: Write the failing test file at `apps/bot/src/scheduler/delivery-window.test.ts`** + +```ts +import { describe, it, expect } from "vitest"; +import { windowEndAt } from "./delivery-window.js"; + +const TZ = "Asia/Kuala_Lumpur"; // UTC+8 + +describe("windowEndAt", () => { + it("returns today's end-hour boundary in the given timezone", () => { + // Fire at 2026-05-10 10:00 KL == 02:00 UTC. End hour 18 == 18:00 KL == 10:00 UTC. + const fireAt = new Date("2026-05-10T02:00:00.000Z"); + const out = windowEndAt(TZ, 18, fireAt); + expect(out.toISOString()).toBe("2026-05-10T10:00:00.000Z"); + }); + + it("returns a past timestamp when fireAt is already after the end hour", () => { + // Fire at 2026-05-10 19:00 KL == 11:00 UTC. End hour 18 → today's 18:00 KL == 10:00 UTC. + // That's BEFORE fireAt. The caller's first window-gate check trips immediately. + const fireAt = new Date("2026-05-10T11:00:00.000Z"); + const out = windowEndAt(TZ, 18, fireAt); + expect(out.toISOString()).toBe("2026-05-10T10:00:00.000Z"); + expect(out.getTime()).toBeLessThan(fireAt.getTime()); + }); + + it("respects the timezone (UTC+0 vs UTC+8)", () => { + const fireAt = new Date("2026-05-10T02:00:00.000Z"); + const inUtc = windowEndAt("UTC", 18, fireAt); + expect(inUtc.toISOString()).toBe("2026-05-10T18:00:00.000Z"); + const inKl = windowEndAt("Asia/Kuala_Lumpur", 18, fireAt); + expect(inKl.toISOString()).toBe("2026-05-10T10:00:00.000Z"); + }); + + it("handles end hour 24 as midnight of the same calendar day's end", () => { + // 2026-05-10 in KL ends at 2026-05-11 00:00 KL == 2026-05-10 16:00 UTC. + const fireAt = new Date("2026-05-10T02:00:00.000Z"); + const out = windowEndAt(TZ, 24, fireAt); + expect(out.toISOString()).toBe("2026-05-10T16:00:00.000Z"); + }); + + it("DST transition day stays on the SAME calendar day (no skipping forward)", () => { + // US/Eastern has DST. On 2026-03-08 (DST starts), 18:00 EDT is a real time. + // Fire at 2026-03-08 10:00 EST (15:00 UTC). End at 2026-03-08 18:00 EDT (22:00 UTC). + const fireAt = new Date("2026-03-08T15:00:00.000Z"); + const out = windowEndAt("America/New_York", 18, fireAt); + expect(out.toISOString()).toBe("2026-03-08T22:00:00.000Z"); + }); + + it("rejects end hour outside 0..24", () => { + const fireAt = new Date("2026-05-10T00:00:00Z"); + expect(() => windowEndAt(TZ, -1, fireAt)).toThrow(); + expect(() => windowEndAt(TZ, 25, fireAt)).toThrow(); + }); +}); +``` + +- [ ] **Step 2: Run the failing test** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run delivery-window +``` + +Expected: FAIL with "Cannot find module './delivery-window.js'". + +- [ ] **Step 3: Implement `apps/bot/src/scheduler/delivery-window.ts`** + +```ts +import { DateTime } from "luxon"; + +/** + * Returns the end-of-window timestamp for the calendar day `fireAt` + * falls on, in the operator's timezone. + * + * windowEndAt("Asia/Kuala_Lumpur", 18, fireAt) + * → today's 18:00 KL (which may be in the past if fireAt is already + * past 18:00 KL — caller's first window-gate fires immediately). + * + * `endHour` is 0..24. Hour 24 is treated as midnight of the next + * calendar day (i.e. "end of today" inclusive). + * + * Pure: no I/O, no Date.now() reads, no clock dependency. Easy to + * test with fixture inputs. + */ +export function windowEndAt( + timezone: string, + endHour: number, + fireAt: Date, +): Date { + if (!Number.isInteger(endHour) || endHour < 0 || endHour > 24) { + throw new Error(`windowEndAt: endHour must be 0..24, got ${endHour}`); + } + + const dt = DateTime.fromJSDate(fireAt).setZone(timezone); + if (!dt.isValid) { + throw new Error(`windowEndAt: invalid timezone "${timezone}"`); + } + + // For hour 24, "end of day" is the next midnight. Luxon's `set` with + // hour=24 normalises into hour=0 of the next day, which is exactly + // what we want. + const end = dt.set({ hour: endHour, minute: 0, second: 0, millisecond: 0 }); + return end.toJSDate(); +} +``` + +- [ ] **Step 4: Run the tests** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run delivery-window +``` + +Expected: PASS, 6 tests. + +- [ ] **Step 5: Commit** + +```bash +git add apps/bot/src/scheduler/delivery-window.ts apps/bot/src/scheduler/delivery-window.test.ts +git commit -m "feat(bot): pure delivery-window end calculator + +windowEndAt(timezone, endHour, fireAt) returns the end-of-window for +the day fireAt is on. If fireAt is already past, the result is a +past timestamp — the run loop's first window gate trips immediately +and the entire run resolves as failed (zero sent), which is the +right behaviour for 'we can't send after window close'." +``` + +--- + +## Task 6: MediaUploadCache (TDD with mock socket) + +**Files:** +- Create: `apps/bot/src/scheduler/media-upload-cache.test.ts` +- Create: `apps/bot/src/scheduler/media-upload-cache.ts` + +- [ ] **Step 1: Write the failing test file at `apps/bot/src/scheduler/media-upload-cache.test.ts`** + +```ts +import { describe, it, expect, vi } from "vitest"; +import { MediaUploadCache } from "./media-upload-cache.js"; + +describe("MediaUploadCache", () => { + it("uploads each unique mediaId exactly once across N gets", async () => { + const prepare = vi.fn(async (mediaId: string) => ({ + kind: "prepared", + mediaId, + })); + const cache = new MediaUploadCache(prepare); + + const a1 = await cache.get("media-A"); + const a2 = await cache.get("media-A"); + const b1 = await cache.get("media-B"); + + expect(prepare).toHaveBeenCalledTimes(2); + expect(prepare).toHaveBeenCalledWith("media-A"); + expect(prepare).toHaveBeenCalledWith("media-B"); + // Same handle returned for repeated lookups of A. + expect(a1).toBe(a2); + expect(a1).not.toBe(b1); + }); + + it("coalesces concurrent gets of the same mediaId into ONE prepare call", async () => { + let resolveA: (v: unknown) => void = () => {}; + const aPromise = new Promise((r) => (resolveA = r)); + const prepare = vi.fn(async (mediaId: string) => { + if (mediaId === "media-A") return aPromise; + return { kind: "prepared", mediaId }; + }); + const cache = new MediaUploadCache(prepare); + + const p1 = cache.get("media-A"); + const p2 = cache.get("media-A"); + const p3 = cache.get("media-A"); + + // Resolve the in-flight prepare. + resolveA({ kind: "prepared", mediaId: "media-A" }); + + const [r1, r2, r3] = await Promise.all([p1, p2, p3]); + expect(prepare).toHaveBeenCalledTimes(1); // de-duplicated + expect(r1).toBe(r2); + expect(r2).toBe(r3); + }); + + it("a thrown prepare is NOT cached — next get retries", async () => { + let attempt = 0; + const prepare = vi.fn(async (_mediaId: string) => { + attempt++; + if (attempt === 1) throw new Error("upload network blip"); + return { kind: "prepared", attempt }; + }); + const cache = new MediaUploadCache(prepare); + + await expect(cache.get("media-A")).rejects.toThrow("upload network blip"); + // Second attempt must call prepare again — DON'T cache failures. + const r = await cache.get("media-A"); + expect(prepare).toHaveBeenCalledTimes(2); + expect(r).toEqual({ kind: "prepared", attempt: 2 }); + }); + + it("size() reflects the number of cached unique mediaIds", async () => { + const prepare = async (mediaId: string) => ({ mediaId }); + const cache = new MediaUploadCache(prepare); + expect(cache.size()).toBe(0); + await cache.get("a"); + expect(cache.size()).toBe(1); + await cache.get("b"); + expect(cache.size()).toBe(2); + await cache.get("a"); // already cached + expect(cache.size()).toBe(2); + }); +}); +``` + +- [ ] **Step 2: Run the failing test** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run media-upload-cache +``` + +Expected: FAIL with "Cannot find module './media-upload-cache.js'". + +- [ ] **Step 3: Implement `apps/bot/src/scheduler/media-upload-cache.ts`** + +```ts +/** + * Per-run cache of `prepareWAMessageMedia` results, keyed by + * `mediaId`. The point: when a reminder fans out to 1000 groups with + * one image, we want to upload that image to WhatsApp's CDN ONCE, not + * 1000 times. Subsequent group sends reuse the prepared message + * (with embedded directPath / mediaKey) via socket.relayMessage. + * + * Lifecycle: one cache instance per fire-reminder run. After the run + * completes, the cache is dropped — we don't share uploads across + * runs because WA media tokens are short-lived. + * + * Concurrent gets of the same mediaId are coalesced into a single + * prepare call. Failed prepares are NOT cached so the next attempt + * retries (network blips at upload time shouldn't poison the cache). + */ +export class MediaUploadCache { + private readonly prepare: (mediaId: string) => Promise; + private readonly entries = new Map>(); + + constructor(prepare: (mediaId: string) => Promise) { + this.prepare = prepare; + } + + async get(mediaId: string): Promise { + const existing = this.entries.get(mediaId); + if (existing) return existing; + + const inflight = this.prepare(mediaId); + // Insert eagerly so concurrent gets dedupe. + this.entries.set(mediaId, inflight); + + try { + return await inflight; + } catch (err) { + // Don't cache failures — the next caller should retry. + this.entries.delete(mediaId); + throw err; + } + } + + size(): number { + return this.entries.size; + } +} +``` + +- [ ] **Step 4: Run the tests** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run media-upload-cache +``` + +Expected: PASS, 4 tests. + +- [ ] **Step 5: Commit** + +```bash +git add apps/bot/src/scheduler/media-upload-cache.ts apps/bot/src/scheduler/media-upload-cache.test.ts +git commit -m "feat(bot): MediaUploadCache for once-per-run media prepare + +One cache instance per fire-reminder run. Each unique mediaId gets +prepared (uploaded to WA CDN) exactly once, and subsequent group +sends within the run reuse the prepared message via relayMessage. +Concurrent gets coalesce into a single prepare. Failed prepares +don't poison the cache — next caller retries." +``` + +--- + +## Task 7: Rewrite `fire-reminder.ts` and bump pg-boss `teamSize` + +**Files:** +- Modify: `apps/bot/src/scheduler/reminder-jobs.ts` +- Rewrite: `apps/bot/src/scheduler/fire-reminder.ts` + +- [ ] **Step 1: Update `apps/bot/src/scheduler/reminder-jobs.ts` — pass `teamSize`** + +Whole file: + +```ts +import type { PgBoss } from "pg-boss"; +import { logger } from "../logger.js"; +import { env } from "../env.js"; +import { fireReminder, type FireReminderPayload } from "./fire-reminder.js"; + +export const REMINDER_FIRE_QUEUE = "reminder.fire"; + +export async function registerReminderJobs(boss: PgBoss): Promise { + await boss.createQueue(REMINDER_FIRE_QUEUE); + await boss.work( + REMINDER_FIRE_QUEUE, + { + // Up to BOT_FIRE_CONCURRENCY jobs in flight at once. Combined with + // the per-account mutex inside fireReminder, this lets reminders + // on DIFFERENT accounts run in parallel while same-account + // reminders take turns. + teamSize: env.BOT_FIRE_CONCURRENCY, + teamConcurrency: 1, + }, + async (jobs) => { + const job = jobs[0]; + if (!job) return; + logger.debug({ jobId: job.id, payload: job.data }, "reminder.fire: handling"); + await fireReminder(job.data); + }, + ); + logger.info( + { teamSize: env.BOT_FIRE_CONCURRENCY }, + "reminder.fire: handler registered", + ); +} + +export async function scheduleReminderFire( + boss: PgBoss, + reminderId: string, + scheduledAt: Date, +): Promise { + const id = await boss.send( + REMINDER_FIRE_QUEUE, + { reminderId }, + { + startAfter: scheduledAt, + retryLimit: 3, + retryDelay: 30, + retryBackoff: true, + // Use the reminderId as a singleton key so re-scheduling cancels the old job + singletonKey: `reminder:${reminderId}`, + }, + ); + logger.info({ reminderId, jobId: id, scheduledAt }, "reminder.fire: scheduled"); + return id; +} + +export async function cancelReminderFire(_boss: PgBoss, reminderId: string): Promise { + // Soft cancel: pg-boss doesn't expose a clean cancel-by-singleton API in v12. + // The scheduled job will still fire, but `fireReminder` exits early when the + // reminder row is gone. Hard cancel can be added later by storing the jobId. + logger.info({ reminderId }, "reminder.fire: cancel requested (soft, fizzles on fire)"); +} +``` + +- [ ] **Step 2: Rewrite `apps/bot/src/scheduler/fire-reminder.ts`** + +Whole file: + +```ts +import { and, eq, inArray } from "drizzle-orm"; +import { reminderRuns, reminderRunTargets, reminders } from "@cmbot/db"; +import { + generateWAMessageContent, + generateMessageID, + type AnyMessageContent, + type proto, +} from "@whiskeysockets/baileys"; +import pLimit from "p-limit"; +import { db } from "../db.js"; +import { logger } from "../logger.js"; +import { sessionManager } from "../whatsapp/session-manager.js"; +import { absoluteMediaPath, nextOccurrence, resolveDeliveryKind } from "@cmbot/shared"; +import { open as fsOpen, readFile } from "node:fs/promises"; +import { env } from "../env.js"; +import { writeAuditLog } from "../audit.js"; +import { getReminderWithDetails } from "../reminders/crud.js"; +import { getBoss } from "./pgboss-client.js"; +import { scheduleReminderFire } from "./reminder-jobs.js"; +import { pgNotifyWeb } from "../ipc/notify.js"; +import { accountMutex } from "./per-key-mutex.js"; +import { accountRateLimiter } from "./rate-limiter.js"; +import { windowEndAt } from "./delivery-window.js"; +import { MediaUploadCache } from "./media-upload-cache.js"; + +export type FireReminderPayload = { + reminderId: string; + /** Optional resume hook. When present, fire-reminder ATTACHES to + * the existing run instead of creating a new one, and only + * re-attempts targets in `pending` status. Set by the resume + * server action. */ + runId?: string; +}; + +/** Read the first N bytes of a file (used to sniff HEIF/AVIF/MOV brands). */ +async function readHeadBytes(filePath: string, n: number): Promise { + const fh = await fsOpen(filePath, "r"); + try { + const buf = new Uint8Array(n); + await fh.read(buf, 0, n, 0); + return buf; + } finally { + await fh.close(); + } +} + +/** Random delay between same-group message parts (ms). Just enough for + * visible ordering in the chat at WA's natural pace. */ +function partJitterMs(): number { + return 200 + Math.floor(Math.random() * 300); // 200..499 +} + +export async function fireReminder(payload: FireReminderPayload): Promise { + const reminder = await getReminderWithDetails(payload.reminderId); + if (!reminder) { + logger.warn({ reminderId: payload.reminderId }, "fire-reminder: reminder not found"); + return; + } + if (reminder.status !== "active") { + logger.info({ reminderId: reminder.id, status: reminder.status }, "fire-reminder: skipping (not active)"); + return; + } + + // Per-account mutex: two reminders on the SAME account take turns. + // Different accounts run in parallel (cross-account isolation). + await accountMutex.run(reminder.accountId, () => fireReminderInner(reminder, payload.runId)); +} + +async function fireReminderInner( + reminder: NonNullable>>, + resumeRunId?: string, +): Promise { + // Resume path: attach to the existing run; fresh path: create one. + let runId: string; + if (resumeRunId) { + const existing = await db.query.reminderRuns.findFirst({ + where: (r, { eq }) => eq(r.id, resumeRunId), + }); + if (!existing) { + logger.warn({ reminderId: reminder.id, resumeRunId }, "fire-reminder: resume target run missing"); + return; + } + runId = existing.id; + // Re-mark as in-flight so the UI shows the run is no longer paused. + await db + .update(reminderRuns) + .set({ status: "pending", errorSummary: null }) + .where(eq(reminderRuns.id, runId)); + } else { + const [run] = await db + .insert(reminderRuns) + .values({ + reminderId: reminder.id, + reminderName: reminder.name, + status: "pending", + }) + .returning({ id: reminderRuns.id }); + runId = run!.id; + } + + const session = sessionManager.getSession(reminder.accountId); + if (!session) { + logger.warn({ reminderId: reminder.id }, "fire-reminder: account not connected"); + await markAllSkipped(runId, reminder, "account not connected"); + await db.update(reminderRuns).set({ status: "skipped", errorSummary: "account not connected" }).where(eq(reminderRuns.id, runId)); + await pgNotifyWeb({ type: "reminder.fired", reminderId: reminder.id, runId, status: "skipped" }); + return; + } + + // Up-front: load all groups + media rows in TWO bulk queries. + const groupIds = reminder.targets.map((t) => t.groupId); + const groupRows = groupIds.length + ? await db.query.whatsappGroups.findMany({ where: (g) => inArray(g.id, groupIds) }) + : []; + const groupById = new Map(groupRows.map((g) => [g.id, g])); + + const mediaIds = Array.from( + new Set(reminder.messages.map((m) => m.mediaId).filter((id): id is string => Boolean(id))), + ); + const mediaRows = mediaIds.length + ? await db.query.mediaFiles.findMany({ where: (m) => inArray(m.id, mediaIds) }) + : []; + const mediaById = new Map(mediaRows.map((m) => [m.id, m])); + + // Pre-create run_targets rows so progress is observable mid-run. + if (reminder.targets.length > 0) { + await db.insert(reminderRunTargets).values( + reminder.targets.map((t) => ({ + runId, + groupId: t.groupId, + groupLabel: groupById.get(t.groupId)?.name ?? null, + status: "pending" as const, + })), + ); + } + + // Window-end timestamp. If the reminder fires AFTER today's end-hour + // (e.g. cron miss-fired late) this is in the past — every iteration + // will trip the gate and the run resolves as failed. + const windowEnd = windowEndAt( + reminder.timezone, + reminder.deliveryWindowEndHour, + new Date(), + ); + + // Per-run media upload cache. Each unique mediaId is prepared via + // generateWAMessageContent ONCE (which uploads to WA's CDN); the + // resulting proto.Message is reused for every group via relayMessage. + // socket.waUploadToServer is the upload helper Baileys exposes. + const uploadCache = new MediaUploadCache(async (mediaId) => { + const media = mediaById.get(mediaId); + if (!media) throw new Error(`media row missing: ${mediaId}`); + const filePath = absoluteMediaPath(media.storagePath, env.MEDIA_DIR); + const buffer = await readFile(filePath); + const head = buffer.subarray(0, 12); + const resolved = resolveDeliveryKind(media.mimeType, head); + const senderKind: "image" | "video" | "document" = + resolved === "image" || resolved === "video" ? resolved : "document"; + const content: AnyMessageContent = + senderKind === "image" + ? { image: buffer, mimetype: media.mimeType } + : senderKind === "video" + ? { video: buffer, mimetype: media.mimeType } + : { document: buffer, fileName: media.filenameOriginal, mimetype: media.mimeType }; + return generateWAMessageContent(content, { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + upload: (session.socket as any).waUploadToServer, + }); + }); + + // Per-account rate limiter — gates each socket.sendMessage / relayMessage call. + const rateLimiter = accountRateLimiter.get(reminder.accountId); + + let sentCount = 0; + let failedCount = 0; + let skippedCount = 0; + let windowClosed = false; + + const groupConcurrency = pLimit(env.BOT_GROUP_CONCURRENCY); + + await Promise.all( + reminder.targets.map((target) => + groupConcurrency(async () => { + // Window-end gate — even if we already started other groups, + // any not-yet-started one stops here. + if (Date.now() >= windowEnd.getTime()) { + windowClosed = true; + await db + .update(reminderRunTargets) + .set({ status: "skipped", error: "delivery window closed" }) + .where( + and( + eq(reminderRunTargets.runId, runId), + eq(reminderRunTargets.groupId, target.groupId), + ), + ); + skippedCount++; + return; + } + + const group = groupById.get(target.groupId); + if (!group) { + await db + .update(reminderRunTargets) + .set({ status: "skipped", error: "group missing from db" }) + .where( + and( + eq(reminderRunTargets.runId, runId), + eq(reminderRunTargets.groupId, target.groupId), + ), + ); + skippedCount++; + return; + } + + const start = Date.now(); + try { + let lastMessageId: string | undefined; + for (const part of reminder.messages) { + await rateLimiter.acquire(); + if (part.kind === "text" && part.textContent) { + const r = await session.socket.sendMessage(group.waGroupJid, { + text: part.textContent, + }); + lastMessageId = r?.key?.id ?? undefined; + } else if (part.mediaId) { + const prebuilt = await uploadCache.get(part.mediaId); + // Inject the caption (if any) just before relaying — the + // prebuilt content carries the media but each relay uses + // a fresh messageId. + if (part.textContent) { + injectCaption(prebuilt, part.textContent); + } + const messageId = generateMessageID(); + await session.socket.relayMessage(group.waGroupJid, prebuilt, { messageId }); + lastMessageId = messageId; + } + await new Promise((r) => setTimeout(r, partJitterMs())); + } + await db + .update(reminderRunTargets) + .set({ + status: "sent", + waMessageId: lastMessageId ?? null, + latencyMs: Date.now() - start, + }) + .where( + and( + eq(reminderRunTargets.runId, runId), + eq(reminderRunTargets.groupId, target.groupId), + ), + ); + sentCount++; + } catch (err) { + logger.error( + { err, reminderId: reminder.id, groupId: target.groupId }, + "fire-reminder: send failed", + ); + await db + .update(reminderRunTargets) + .set({ status: "failed", error: (err as Error).message }) + .where( + and( + eq(reminderRunTargets.runId, runId), + eq(reminderRunTargets.groupId, target.groupId), + ), + ); + failedCount++; + } + }), + ), + ); + + const total = reminder.targets.length; + let status: "success" | "partial" | "failed"; + let errorSummary: string | null = null; + if (sentCount === total) { + status = "success"; + } else if (sentCount > 0) { + status = "partial"; + errorSummary = windowClosed + ? `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}). ${sentCount} of ${total} groups delivered. This account is at capacity for this fan-out — consider sending the remainder from another paired account.` + : `${sentCount} of ${total} groups delivered (${failedCount} failed, ${skippedCount} skipped).`; + } else { + status = "failed"; + errorSummary = windowClosed + ? `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}) before any group could be sent. The reminder fired too late in the day.` + : `All ${total} sends failed.`; + } + + await db + .update(reminderRuns) + .set({ status, errorSummary }) + .where(eq(reminderRuns.id, runId)); + + await pgNotifyWeb({ + type: "reminder.fired", + reminderId: reminder.id, + runId, + status, + }); + + // One-off reminders end after firing. Recurring reminders re-arm. + if (reminder.scheduleKind === "one_off") { + await db + .update(reminders) + .set({ status: "ended", updatedAt: new Date() }) + .where(eq(reminders.id, reminder.id)); + } else if (reminder.scheduleKind === "recurring" && reminder.rrule) { + const next = nextOccurrence(reminder.rrule, reminder.timezone, new Date()); + await db + .update(reminders) + .set({ lastFiredAt: new Date(), updatedAt: new Date() }) + .where(eq(reminders.id, reminder.id)); + if (next) { + try { + await scheduleReminderFire(getBoss(), reminder.id, next); + logger.info({ reminderId: reminder.id, next }, "fire-reminder: re-armed for next occurrence"); + } catch (err) { + logger.error({ err, reminderId: reminder.id }, "fire-reminder: failed to re-arm next occurrence"); + } + } else { + logger.info({ reminderId: reminder.id }, "fire-reminder: no further occurrences, ending"); + await db.update(reminders).set({ status: "ended" }).where(eq(reminders.id, reminder.id)); + } + } + + await writeAuditLog(db, { + operatorId: reminder.createdBy, + source: "system", + action: "reminder.fired", + targetType: "reminder", + targetId: reminder.id, + payload: { runId, status, sent: sentCount, failed: failedCount, skipped: skippedCount }, + }); + + logger.info( + { reminderId: reminder.id, runId, status, sent: sentCount, failed: failedCount, skipped: skippedCount }, + "fire-reminder: done", + ); +} + +/** + * Mark every target as skipped with the given error. Used when the + * account is offline before the loop even starts (no run_target rows + * have been created yet, so we INSERT instead of UPDATE). + */ +async function markAllSkipped( + runId: string, + reminder: NonNullable>>, + error: string, +): Promise { + if (reminder.targets.length === 0) return; + const rows = await db.query.whatsappGroups.findMany({ + where: (g) => inArray(g.id, reminder.targets.map((t) => t.groupId)), + columns: { id: true, name: true }, + }); + const labelById = new Map(rows.map((r) => [r.id, r.name])); + await db.insert(reminderRunTargets).values( + reminder.targets.map((t) => ({ + runId, + groupId: t.groupId, + groupLabel: labelById.get(t.groupId) ?? null, + status: "skipped" as const, + error, + })), + ); +} + +/** + * Mutates the prebuilt proto.Message to set the caption on whichever + * media variant it carries. Baileys' relayMessage does not let us + * pass the caption alongside; the protobuf already carries the slot. + */ +function injectCaption(msg: proto.Message, caption: string): void { + if (msg.imageMessage) msg.imageMessage.caption = caption; + else if (msg.videoMessage) msg.videoMessage.caption = caption; + else if (msg.documentMessage) msg.documentMessage.caption = caption; +} +``` + +- [ ] **Step 3: Add `p-limit` to apps/bot deps** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm add -F @cmbot/bot p-limit +``` + +Expected: pnpm reports `+1` package. The lockfile updates. + +- [ ] **Step 4: Typecheck the bot** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot typecheck +``` + +Expected: passes. If type errors mention `relayMessage` overloads, double-check the `@whiskeysockets/baileys` import added — the function comes from the socket instance, not the package's named exports. + +- [ ] **Step 5: Run all bot tests** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run +``` + +Expected: PASS for all (existing 26 + 7 PerKeyMutex + 8 RateLimiter + 6 DeliveryWindow + 4 MediaUploadCache = 51). + +- [ ] **Step 6: Restart the bot to pick up the rewrite** + +```bash +NO_SUDO=1 ./scripts/dev.sh restart-bot +NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml logs --tail=40 bot 2>&1 | grep -i "reminder.fire" +``` + +Expected: A line like `reminder.fire: handler registered teamSize=8`. + +- [ ] **Step 7: Commit** + +```bash +git add apps/bot/src/scheduler/fire-reminder.ts apps/bot/src/scheduler/reminder-jobs.ts apps/bot/package.json pnpm-lock.yaml +git commit -m "feat(bot): windowed, pacing-safe fan-out + +Rewrites the per-target loop to: + - Wrap inner work in PerKeyMutex(accountId) so two reminders on the + same account take turns; different accounts run in parallel. + - Bulk-load groups and media rows up front (drops ~3000 round-trips + to ~3 for a 1000-group run). + - Pre-create run_target rows with status='pending' so the Activity + tab shows progress mid-run. + - Pre-upload each unique media via MediaUploadCache (one + generateWAMessageContent call per mediaId, then relayMessage to + every group). For 1000 groups × 5 MB image, this turns 5 GB of + upload into 5 MB. + - Run BOT_GROUP_CONCURRENCY (default 3) groups in parallel within + one account; each group's parts stay serial for visible order. + - Gate every socket call on a per-account TokenBucket + (BOT_MAX_SEND_PER_MINUTE, default 40). + - Replace the rigid 1.5s inter-part sleep with 200..499 ms jitter. + - Window-end gate: stop sending once today's end-hour (in the + reminder's timezone) has passed; mark unstarted targets skipped; + surface a partial-status message that names the timezone, the + delivered/total count, and points at multi-account offload. + +reminder-jobs.ts now passes teamSize=BOT_FIRE_CONCURRENCY (default 8) +to boss.work so up to 8 different-account reminders run concurrently." +``` + +--- + +## Task 8: Web wiring — schema, action, wizard input, notification body + +**Files:** +- Modify: `apps/web/src/actions/reminders.ts` +- Modify: `apps/web/src/components/reminder-wizard/when-form-client.tsx` +- Modify: `apps/web/src/components/reminder-edit/edit-when-form.tsx` +- Modify: `apps/web/src/lib/notifications.ts` +- Create: `apps/web/src/lib/notifications-body.test.ts` (extends existing notifications tests with the partial-status body) + +- [ ] **Step 1: Update the create/update Zod schemas in `apps/web/src/actions/reminders.ts`** + +Find the `createReminderSchema` definition (around lines 220–250) and add the two fields. Schema chunk: + +```ts +const createReminderSchema = z + .object({ + accountId: z.string().uuid(), + groupIds: z.array(z.string().uuid()), + messages: z.array(messagePartSchema).optional(), + name: z.string().nullable().optional(), + text: z.string().nullable().optional(), + mediaId: z.string().uuid().nullable().optional(), + caption: z.string().nullable().optional(), + scheduledAtIso: z.string().datetime({ offset: true }), + rrule: z.string().nullable().optional(), + timezone: z.string().default(DEFAULT_TIMEZONE), + // Delivery window. End hour is enforced at runtime by fire-reminder; + // start hour is documented but not gated in v1. + deliveryWindowStartHour: z.number().int().min(0).max(24).default(6), + deliveryWindowEndHour: z.number().int().min(0).max(24).default(18), + }) + .refine( + (d) => + (d.messages && d.messages.length > 0) || + Boolean(d.text?.trim()) || + Boolean(d.mediaId), + { + message: "Add a message or attach a file", + path: ["messages"], + }, + ) + .refine((d) => d.deliveryWindowStartHour < d.deliveryWindowEndHour, { + message: "Delivery window start must be earlier than end", + path: ["deliveryWindowStartHour"], + }); +``` + +Then in BOTH `createReminderAction` and `updateReminderAction`, where the `db.insert(reminders).values({...})` / `db.update(reminders).set({...})` happens, include the two new fields. Find the `values({` blocks (around line 350 and line 460) and add: + +```ts +deliveryWindowStartHour: parsed.data.deliveryWindowStartHour, +deliveryWindowEndHour: parsed.data.deliveryWindowEndHour, +``` + +immediately after `timezone,`. + +- [ ] **Step 2: Add the input controls to `apps/web/src/components/reminder-wizard/when-form-client.tsx`** + +Add to the props interface and component state. Top-of-file imports stay; add a new state and a new section in the JSX. The whole edit: + +Find the `interface WhenFormClientProps` block. Add: + +```ts +interface WhenFormClientProps { + accountId: string; + groupIds: string; + timezone: string; + initialDefaultIso: string; + initialSpec?: RecurrenceSpec; + initialDeliveryStartHour?: number; + initialDeliveryEndHour?: number; + passThroughParams: PassThroughParams; +} +``` + +In the component body (around state declarations near the top), add: + +```ts +const [deliveryStartHour, setDeliveryStartHour] = useState( + props.initialDeliveryStartHour ?? 6, +); +const [deliveryEndHour, setDeliveryEndHour] = useState( + props.initialDeliveryEndHour ?? 18, +); +``` + +In `handleContinue`, when building the URLSearchParams, add (after the existing params): + +```ts +sp.set("deliveryStartHour", String(deliveryStartHour)); +sp.set("deliveryEndHour", String(deliveryEndHour)); +``` + +In the JSX, just before the existing `` block, add a new "Delivery hours" card: + +```tsx +
+ +
+ setDeliveryStartHour(Number(e.target.value))} + className="h-9 w-20" + aria-label="Delivery start hour" + /> + to + setDeliveryEndHour(Number(e.target.value))} + className="h-9 w-20" + aria-label="Delivery end hour" + /> + + (24-hour, in {timezone}) + +
+

+ The bot stops sending after the end hour. Long fan-outs that don't + finish in this window get reported as partial. +

+
+``` + +Add `ClockIcon` to the lucide-react imports if it's not already there (file currently imports `CalendarIcon`, `ClockIcon`, `AlertCircleIcon` — already there). + +- [ ] **Step 3: Wire the new params into `apps/web/src/components/reminder-wizard/step-when.tsx`** + +The wizard reads `initialDeliveryStartHour` / `initialDeliveryEndHour` from URL and passes them to ``. Find where `` is rendered and add the two new props: + +```tsx + +``` + +Also update the `interface StepWhenParams` (or wherever the searchParams type lives in step-when.tsx) to include `deliveryStartHour?: string; deliveryEndHour?: string;`. + +- [ ] **Step 4: Pass-through in step-groups.tsx and step-review.tsx** + +Both step pages thread URL params through. Open each and find where they re-emit the URL (look for the `URLSearchParams` builder or `editLink` helper). Add: + +```ts +if (sp.deliveryStartHour) backParams.set("deliveryStartHour", sp.deliveryStartHour); +if (sp.deliveryEndHour) backParams.set("deliveryEndHour", sp.deliveryEndHour); +``` + +Add the same to `editLink` in step-review.tsx so the "Edit when" link from the review page round-trips the values. + +In `review-submit-client.tsx`, where the action payload is built, include the two fields: + +```ts +const payload = { + accountId, + groupIds: groupIds ? groupIds.split(",").filter(Boolean) : [], + name: name?.trim() || null, + messages, + scheduledAtIso: scheduledAt, + rrule: rrule ?? null, + timezone, + deliveryWindowStartHour: deliveryStartHour ?? 6, + deliveryWindowEndHour: deliveryEndHour ?? 18, +}; +``` + +(And add `deliveryStartHour?: number; deliveryEndHour?: number;` to the props interface and pass them in from `step-review.tsx` after parsing the URL.) + +- [ ] **Step 5: Update `apps/web/src/components/reminder-edit/edit-when-form.tsx` similarly** + +Add the same `Delivery hours` card into the form, same state vars, and include the two fields in the `updateReminderAction` payload (find the existing `updateReminderAction({...})` call). Pull the initial values from the loaded reminder row in the parent edit page (`apps/web/src/app/reminders/[id]/edit/when/page.tsx`): + +```tsx + t.groupId)} + messages={initialMessages} + name={reminder.name} + initialIso={(reminder.scheduledAt ?? new Date()).toISOString()} + initialSpec={specFromRrule(reminder.rrule)} + timezone={reminder.timezone} + initialDeliveryStartHour={reminder.deliveryWindowStartHour} + initialDeliveryEndHour={reminder.deliveryWindowEndHour} +/> +``` + +The form's prop interface gets: + +```ts +initialDeliveryStartHour?: number; +initialDeliveryEndHour?: number; +``` + +- [ ] **Step 6: Extend the partial-status notification body in `apps/web/src/lib/notifications.ts`** + +Find `reminderFiredToNotification`. The existing function takes `{ reminderId, runId, status }`. Update the body to mention the delivered/total when status is `partial`: + +```ts +export function reminderFiredToNotification(event: { + type: "reminder.fired"; + reminderId: string; + runId: string; + status: string; + sent?: number; + total?: number; +}): ShowNotificationOptions | null { + if (event.status === "skipped") return null; + const headline = + event.status === "success" + ? "Reminder sent" + : event.status === "partial" + ? "Reminder partly sent" + : "Reminder failed"; + let body = + event.status === "success" + ? "All groups received the message." + : event.status === "partial" + ? "Some groups received the message; others failed. See activity." + : "No groups received the message. See activity."; + if (event.status === "partial" && event.sent !== undefined && event.total !== undefined) { + body = `${event.sent} of ${event.total} groups delivered. See activity for details.`; + } + return { + title: headline, + body, + tag: `reminder:${event.reminderId}`, + href: `/reminders/${event.reminderId}`, + }; +} +``` + +- [ ] **Step 7: Add tests for the extended notification body** + +Append to `apps/web/src/lib/notifications.test.ts` (inside the existing `describe("reminderFiredToNotification mapping", () => {...})` block): + +```ts +it("partial with sent/total renders 'X of Y groups delivered' instead of the generic body", () => { + const args = reminderFiredToNotification({ + type: "reminder.fired", + reminderId: "r-1", + runId: "run-1", + status: "partial", + sent: 412, + total: 1000, + }); + expect(args?.body).toBe("412 of 1000 groups delivered. See activity for details."); +}); + +it("partial without sent/total falls back to the generic body", () => { + const args = reminderFiredToNotification({ + type: "reminder.fired", + reminderId: "r-1", + runId: "run-1", + status: "partial", + }); + expect(args?.body).toMatch(/Some groups received/); +}); +``` + +- [ ] **Step 8: Update existing edit-section-forms.test.tsx fixtures so they don't break on the new required props** + +Find `apps/web/src/components/reminder-edit/edit-section-forms.test.tsx`. Both `EditAccountForm` and `EditGroupsForm` may need additional defaults. If their props don't reference the new fields, the tests still pass — re-run them and patch only if compilation fails. Run: + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web typecheck +``` + +If typecheck reports `Property 'initialDeliveryStartHour' is missing` on any test, add `initialDeliveryStartHour: 6, initialDeliveryEndHour: 18` to that fixture. + +- [ ] **Step 9: Run web tests + bot tests** + +```bash +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run +NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run +``` + +Expected: web 298+ pass, bot 26+25 = 51 pass. Total ~380 across the project including shared. + +- [ ] **Step 10: Restart web container so the server bundle picks up the schema change** + +```bash +NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml restart web +``` + +- [ ] **Step 11: Commit** + +```bash +git add apps/web +git commit -m "feat(web): delivery-window inputs in wizard + edit; partial body extension + +Wizard 'When' step and the per-section edit-when page get two number +inputs for the delivery window hours (default 6/18). Server actions +accept them on the create/update Zod schemas, validate +0 <= start < end <= 24, and persist to the new reminders columns. + +Notification body for partial-status reminders now reads +'412 of 1000 groups delivered. See activity for details.' when the +SSE event carries sent/total counts." +``` + +--- + +## Acceptance check (manual) + +After all 8 tasks land: + +- [ ] **Smoke 1.** Create a reminder for 1 group with default delivery hours and a 30-second future fire time. Verify it lands; verify the run's `error_summary` is null and status `success`. + +- [ ] **Smoke 2.** Create a reminder with end hour set to a value that's already past in the operator's timezone. Schedule it to fire NOW. Verify the run resolves `failed` and the `error_summary` reads "Delivery window closed at H:00 (TZ) before any group could be sent." + +- [ ] **Smoke 3.** Create two reminders on TWO different paired accounts with the same `scheduledAt`. Watch bot logs: + + ```bash + NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml logs --tail=60 bot 2>&1 | grep -E "reminder|fire-reminder" + ``` + + Expected: both reminders' `fire-reminder: done` entries land within seconds of each other (parallel), not sequentially separated by a full fan-out. + +- [ ] **Smoke 4.** Create a reminder with a 5-MB JPEG and 3 groups. Fire it. Bot logs should show only ONE `prepareWAMessageMedia` / upload entry (look for the upload size in the Baileys debug log) followed by 3 relayMessage events. + +- [ ] **Final test sweep:** + + ```bash + NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run + NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run + NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/shared test -- --run + ``` + + Expected: all green. ~380 total. diff --git a/docs/superpowers/specs/2026-05-10-windowed-fanout-design.md b/docs/superpowers/specs/2026-05-10-windowed-fanout-design.md index 18982b6..1c29e39 100644 --- a/docs/superpowers/specs/2026-05-10-windowed-fanout-design.md +++ b/docs/superpowers/specs/2026-05-10-windowed-fanout-design.md @@ -7,9 +7,13 @@ ## Goal Deliver a reminder to many groups (target: 1000+) safely within a -per-reminder delivery window. If we cannot finish in the window, stop, -mark the run `partial`, and tell the operator the account is at -capacity for this fan-out. +per-reminder delivery window. If we cannot finish in the window, +**pause** the run at window-close, persist progress, and let the +operator **resume** later from the Activity / detail view. The +paused-status message tells the operator what's blocking throughput +(account at capacity, media size eating the budget) so they can +decide whether to offload to another paired account, shrink the +attachment, or just resume the next morning at 6am. ## Constraints @@ -131,19 +135,71 @@ default 3): Final status: - **success** — every target sent. -- **partial** — at least one sent, at least one not (window-close, - failed, missing group). `error_summary` reads: +- **paused** — window closed mid-run with at least one target still in + `pending`. Run carries a resumable state: sent rows stay `sent`, + unstarted rows stay `pending` (NOT skipped), failed rows stay + `failed`. `error_summary` reads: `"Delivery window closed at 18:00 (Asia/Kuala_Lumpur). 412 of 1000 - groups delivered. This account is at capacity for this fan-out — - consider sending the remainder from another paired account."` -- **failed** — zero sent. + groups delivered, 588 still pending. Resume from the Activity tab. + If this happens repeatedly, consider offloading to another paired + account, or shrinking the message body / media size to fit more + groups in your daily window."` +- **partial** — every target was attempted; some sent and some + failed/skipped (group missing from DB, account offline, send error + inside the window). Not resumable; the failures are real failures. +- **failed** — zero sent. Either every send errored, or the run hit + the window close BEFORE the first send (run fired too late to do + any work; nothing to resume). + +## Resume action + +A paused run can be resumed by the operator. Mechanism: + +- New server action `resumeReminderRunAction(runId)` validates + ownership, then enqueues a pg-boss job: + `boss.send("reminder.fire", { reminderId, runId })` with NO + singletonKey (resumes don't conflict with the reminder's normal + cron firing). +- The fire-reminder handler accepts an optional `runId` in its + payload. When present, it ATTACHES to that run instead of creating + a new one: + - Skips creating a new `reminder_runs` row. + - Loads the existing run's `reminder_run_targets` rows. + - Iterates only those with `status = 'pending'`. + - Re-uses the same windowEnd / rate limiter / media cache logic as + a fresh fire. + - On window close again, status flips back to `paused` with an + updated count. + - On success this round, status becomes `success` (if no failures + accumulated) or `partial` (if some failed). +- `failed` targets from the previous run are NOT retried on resume. + They're real errors — surfacing them as actionable in the UI is a + v2 concern (manual "retry failed" button). + +UI surfaces of paused runs: + +- Activity tab gets an amber "Paused" pill alongside the existing + Success/Partial/Failed/Skipped/Archived filters. Resume button + inline on each paused row. +- Reminder detail page's run history shows the same Resume button on + paused rows. +- The `reminder.fired` SSE event for status=paused triggers a + notification with title "Reminder paused" and body + `"X of Y groups delivered. Resume from the Activity tab."` ## Notification body -The existing `reminder.fired` SSE event already carries -`{ status }`. The web's notification mapper already handles -`partial` with a "see activity" hint. The body extends to mention -"X of Y delivered" when status === "partial". +The existing `reminder.fired` SSE event already carries `{ status }`. +The notification mapper extends: + +- `success` → unchanged. +- `partial` → body mentions delivered/total counts when present. +- `paused` → headline `"Reminder paused"`, body + `"X of Y groups delivered. Resume from the Activity tab."` Click + takes the operator to the reminder's detail page where the Resume + button lives. +- `failed` → unchanged. +- `skipped` → still filtered (bookkeeping noise). ## Components @@ -192,14 +248,20 @@ default to 6/18 and can be widened (e.g. 0/24) for a specific big run. ## Out of scope (v2 candidates) -- **Crash resumability across bot restarts.** Today, if the bot dies - mid-fan-out, pg-boss will retry the job; the loop will skip any - rows already marked `sent`, but the in-memory rate-limiter and - upload-cache state are gone — meaning the retry uploads media - again and starts pacing from a full bucket. Acceptable for v1. -- **Pause / resume mid-run** controls. -- **Cross-day window resume** (current design hard-stops at window - end and reports partial; doesn't queue the remainder for tomorrow). +- **Crash resumability across bot restarts.** If the bot dies + mid-fan-out (mid-window), pg-boss will retry the job; the loop's + pre-loaded `pending` rows still pick up correctly, but the + in-memory rate-limiter and upload-cache state are gone — the + retry re-uploads media and starts pacing from a full bucket. The + paused-state resumability covered above is a different mechanism: + it handles the "window closed cleanly" case end-to-end. The + "bot crashed mid-window" case is degraded but not broken. +- **Auto-resume next morning** when window opens again (today the + operator clicks Resume manually). +- **Pause-by-operator** (only window-close pauses; user-triggered + pause mid-fan-out isn't wired). +- **Retry-failed-targets** action (paused-resume only re-attempts + `pending` rows; `failed` rows stay failed). - **Multi-account auto-split** of a single reminder. - **Adaptive rate limiting** (auto-back-off on WA rate-limit response codes; today the operator tunes the env var). @@ -210,7 +272,14 @@ default to 6/18 and can be widened (e.g. 0/24) for a specific big run. in roughly 30–50 minutes, comfortably inside a 6am–6pm window. - Two reminders on different accounts firing within seconds of each other: both progress simultaneously, neither blocks the other. -- A run that hits the window end: stops cleanly, marks remaining as - skipped, surfaces the partial-status message in the Activity tab - and via the browser notification. -- 355 existing tests still pass; ≈25 new tests cover the new helpers. +- A run that hits the window end mid-fan-out: stops cleanly, marks + the run `paused`, leaves un-started targets as `pending`, surfaces + the paused-status notification with delivered/total counts. +- The operator clicks **Resume** on a paused run — fan-out continues + from the unsent targets, respecting the same per-account rate + limit + window. If it again can't finish, it pauses again with an + updated count. +- A run that hits the window end BEFORE any send (fired too late): + resolves `failed`, no resume offered. +- 355 existing tests still pass; ≈30 new tests cover the new helpers + and the paused/resume flow.