Previously the name field auto-derived from the first text part when
the operator left it blank. That's brittle once reminders carry
multiple parts of varying provenance, and confusing in lists where
"Reminder" or partial sentences crowd in.
Now: every reminder must carry a non-empty name, capped at 60 chars.
- Zod schema on createReminder/updateReminder: name moves from
`z.string().nullable().optional()` to
`z.string().trim().min(1, "Give the reminder a name").max(60)`.
Stale-URL legacy callers that omit it now get a clear server error.
- Wizard compose step: input has `required` + `aria-required`,
placeholder + label simplified ("(optional)" tag and the helper
paragraph removed), Continue blocks on empty.
- Edit-message form: same — required, aria-required, save blocked
on empty, the "leave blank and we'll auto-derive" hint dropped.
- Review-submit client: defensive fail-fast for stale-bookmark URLs
that arrive at step 5 without a name — bounces back with
"Give the reminder a name (back on the Message step)" instead of
letting the server reject.
The resolveReminderName helper stays put — duplicateReminderAction
and any future caller still benefit from the trim+clamp+fallback
chain. Helper unit tests unaffected (they test the resolver in
isolation, the policy-tightening lives at the schema layer above).
298 web tests still passing.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
64 KiB
Windowed, Pacing-Safe Reminder Fan-Out — Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Deliver reminders to many groups (1000+) safely within a per-reminder delivery window (default 6am–6pm). On window-close mid-run, pause with a clear "account at capacity / consider offload or smaller media" message and let the operator resume later from the UI.
Architecture: Replace the current serial fan-out with a per-account isolation model. pg-boss teamSize raises so different-account reminders run in parallel; a per-key (accountId) async mutex serialises same-account fan-outs; a per-account token-bucket rate limiter paces sends; a per-run media-upload cache reuses each unique file across every group via Baileys' prepareWAMessageMedia + relayMessage; a window-end gate flips the run to paused (un-sent targets stay pending); a resumeReminderRunAction re-enters fire-reminder with the existing runId so the loop picks up only the still-pending targets.
Tech Stack: Node.js + TypeScript, Baileys 7.0.0-rc10 (prepareWAMessageMedia, relayMessage, generateMessageID), pg-boss v12 (boss.work with teamSize), Drizzle ORM + Postgres, vitest.
File Structure
| File | Role |
|---|---|
packages/db/migrations/0008_*.sql (generated) |
add delivery_window_start_hour, delivery_window_end_hour to reminders |
packages/db/src/schema.ts |
drizzle alignment for the two new columns |
apps/bot/src/env.ts |
three new env vars: BOT_FIRE_CONCURRENCY, BOT_GROUP_CONCURRENCY, BOT_MAX_SEND_PER_MINUTE |
apps/bot/src/scheduler/per-key-mutex.ts (new) |
accountId-keyed async mutex |
apps/bot/src/scheduler/per-key-mutex.test.ts (new) |
unit tests |
apps/bot/src/scheduler/rate-limiter.ts (new) |
per-account token bucket |
apps/bot/src/scheduler/rate-limiter.test.ts (new) |
fake-clock unit tests |
apps/bot/src/scheduler/delivery-window.ts (new) |
pure window-end calculator |
apps/bot/src/scheduler/delivery-window.test.ts (new) |
unit tests |
apps/bot/src/scheduler/media-upload-cache.ts (new) |
prepareWAMessageMedia results, keyed by mediaId |
apps/bot/src/scheduler/media-upload-cache.test.ts (new) |
mock-socket unit tests |
apps/bot/src/scheduler/fire-reminder.ts (rewrite) |
new loop using all of the above |
apps/bot/src/scheduler/reminder-jobs.ts |
pass teamSize config |
apps/web/src/actions/reminders.ts |
accept the two new fields on create/update |
apps/web/src/components/reminder-wizard/when-form-client.tsx |
"Delivery hours" inputs |
apps/web/src/components/reminder-edit/edit-when-form.tsx |
same |
apps/web/src/lib/notifications.ts |
partial-status notification body extension |
Task 1: Schema migration — delivery window columns
Files:
-
Modify:
packages/db/src/schema.ts(lines around thereminderstable — add 2 columns) -
Generate:
packages/db/migrations/0008_<name>.sql -
Step 1: Edit
packages/db/src/schema.ts— add the two columns toreminders
Find the reminders table block and append the two new columns just before the closing });. The existing block ends with lastFiredAt: timestamp(...). Add:
export const reminders = pgTable("reminders", {
id: uuid("id").primaryKey().defaultRandom(),
accountId: uuid("account_id").notNull().references(() => whatsappAccounts.id, { onDelete: "cascade" }),
name: text("name").notNull(),
scheduleKind: text("schedule_kind").notNull(),
scheduledAt: timestamp("scheduled_at", { withTimezone: true }),
rrule: text("rrule"),
timezone: text("timezone").notNull(),
endsAt: timestamp("ends_at", { withTimezone: true }),
maxRuns: integer("max_runs"),
status: text("status").notNull().default("active"),
createdBy: uuid("created_by").notNull().references(() => operators.id),
createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(),
updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(),
lastFiredAt: timestamp("last_fired_at", { withTimezone: true }),
// Delivery window — operator-supplied hours (in the row's timezone).
// Only the END hour is enforced at runtime in v1: the run loop stops
// sending once `now()` crosses today's end hour. The START hour is
// documented for the UI; not gated. See spec
// docs/superpowers/specs/2026-05-10-windowed-fanout-design.md.
deliveryWindowStartHour: integer("delivery_window_start_hour").notNull().default(6),
deliveryWindowEndHour: integer("delivery_window_end_hour").notNull().default(18),
});
- Step 2: Generate the migration
Run:
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/db generate
Expected: a file appears at packages/db/migrations/0008_<name>.sql containing ALTER TABLE "reminders" ADD COLUMN "delivery_window_start_hour" integer DEFAULT 6 NOT NULL; and a similar line for delivery_window_end_hour.
- Step 3: Apply the migration
Run:
NO_SUDO=1 ./scripts/db.sh migrate
Expected: Migrations applied. Verify with:
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/db build
Expected: clean tsc output (the @cmbot/db package re-builds so downstream consumers pick up the new columns).
- Step 4: Typecheck the bot + web to confirm no schema-consumer regressions
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot typecheck
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web typecheck
Expected: both pass with no errors.
- Step 5: Commit
git add packages/db
git commit -m "feat(db): add delivery_window_start_hour / end_hour to reminders
Both default 6 / 18, stored as int (hour-of-day in the row's
timezone). Only the end hour is gated at runtime — see spec
docs/superpowers/specs/2026-05-10-windowed-fanout-design.md."
Task 2: Bot env vars — concurrency + rate caps
Files:
-
Modify:
apps/bot/src/env.ts -
Step 1: Replace
apps/bot/src/env.tscontents
Whole file:
import { z } from "zod";
const numberFromString = z.string().regex(/^\d+$/).transform((s) => Number(s));
const envSchema = z.object({
DATABASE_URL: z.string().url(),
DATA_DIR: z.string().min(1),
SESSIONS_DIR: z.string().min(1),
MEDIA_DIR: z.string().min(1),
BOT_HEALTH_PORT: numberFromString,
BOT_LOG_LEVEL: z.enum(["fatal", "error", "warn", "info", "debug", "trace"]).default("info"),
// Reminder fan-out tuning. Defaults aim for an established WhatsApp
// account (~30-60 msg/min safe band).
// - BOT_FIRE_CONCURRENCY: pg-boss worker pool size for the
// `reminder.fire` queue. Sets the max number of accounts that can
// run their fan-outs simultaneously. 8 is a sane default; raise if
// you have more paired accounts firing concurrently.
// - BOT_GROUP_CONCURRENCY: per-account parallel group sends. Each
// group's parts stay serial (preserves visible order in the chat).
// 3 has been empirically stable on real traffic; anything above 5
// without observation is asking for trouble.
// - BOT_MAX_SEND_PER_MINUTE: per-account token-bucket rate. 40 is the
// safe default; loosen to 60 only after a few weeks of running
// without flags. Tighten to 20 if WA returns rate-limit errors.
BOT_FIRE_CONCURRENCY: numberFromString.default("8"),
BOT_GROUP_CONCURRENCY: numberFromString.default("3"),
BOT_MAX_SEND_PER_MINUTE: numberFromString.default("40"),
});
export type Env = z.infer<typeof envSchema>;
export function parseEnv(input: Record<string, string | undefined>): Env {
return envSchema.parse(input);
}
export const env = parseEnv(process.env);
- Step 2: Typecheck
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot typecheck
Expected: passes.
- Step 3: Run the bot tests to confirm
parseEnvdefaults still validate
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run env
Expected: all env.test.ts tests pass (existing tests don't supply the new vars; defaults must kick in).
- Step 4: Commit
git add apps/bot/src/env.ts
git commit -m "feat(bot): add fan-out tuning env vars
BOT_FIRE_CONCURRENCY (8) — pg-boss worker pool, gates max accounts
running fan-outs simultaneously.
BOT_GROUP_CONCURRENCY (3) — per-account parallel group sends.
BOT_MAX_SEND_PER_MINUTE (40) — per-account rate-limit budget.
Defaults are tuned for an established WhatsApp account. See spec for
the safe band rationale."
Task 3: PerKeyMutex (TDD)
Files:
-
Create:
apps/bot/src/scheduler/per-key-mutex.test.ts -
Create:
apps/bot/src/scheduler/per-key-mutex.ts -
Step 1: Write the failing test file at
apps/bot/src/scheduler/per-key-mutex.test.ts
import { describe, it, expect } from "vitest";
import { PerKeyMutex } from "./per-key-mutex.js";
/** Tiny clock-free helper: returns a Promise that resolves after
* `n` microtasks. Lets us check ordering without real timers. */
function tickN(n: number): Promise<void> {
let p: Promise<void> = Promise.resolve();
for (let i = 0; i < n; i++) p = p.then();
return p;
}
describe("PerKeyMutex", () => {
it("allows a single call against one key to run immediately", async () => {
const m = new PerKeyMutex();
const result = await m.run("k1", async () => 42);
expect(result).toBe(42);
});
it("serialises two calls against the same key", async () => {
const m = new PerKeyMutex();
const order: string[] = [];
const a = m.run("k1", async () => {
order.push("a-start");
await tickN(5);
order.push("a-end");
});
const b = m.run("k1", async () => {
order.push("b-start");
order.push("b-end");
});
await Promise.all([a, b]);
// b cannot start until a has finished.
expect(order).toEqual(["a-start", "a-end", "b-start", "b-end"]);
});
it("runs different keys in parallel", async () => {
const m = new PerKeyMutex();
const order: string[] = [];
const a = m.run("k1", async () => {
order.push("a-start");
await tickN(5);
order.push("a-end");
});
const b = m.run("k2", async () => {
order.push("b-start");
order.push("b-end");
});
await Promise.all([a, b]);
// b doesn't wait for a — interleaving expected.
expect(order[0]).toBe("a-start");
expect(order).toContain("b-start");
expect(order).toContain("b-end");
// b's pair lands before a's end.
expect(order.indexOf("b-end")).toBeLessThan(order.indexOf("a-end"));
});
it("releases the lock when the handler throws", async () => {
const m = new PerKeyMutex();
await expect(
m.run("k1", async () => {
throw new Error("boom");
}),
).rejects.toThrow("boom");
// Next call on the same key must NOT hang.
const result = await m.run("k1", async () => "after");
expect(result).toBe("after");
});
it("forwards the resolved value of the handler", async () => {
const m = new PerKeyMutex();
const out = await m.run("k1", async () => ({ ok: true, n: 7 }));
expect(out).toEqual({ ok: true, n: 7 });
});
it("cleans up internal state for keys with no waiters", async () => {
const m = new PerKeyMutex();
await m.run("k1", async () => {});
expect(m.activeKeyCount()).toBe(0);
});
it("retains a key while a chain is in flight, then drops it", async () => {
const m = new PerKeyMutex();
let release!: () => void;
const gate = new Promise<void>((r) => (release = r));
const inFlight = m.run("k1", () => gate);
expect(m.activeKeyCount()).toBe(1);
release();
await inFlight;
expect(m.activeKeyCount()).toBe(0);
});
});
- Step 2: Run the failing test to confirm it fails for the right reason
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run per-key-mutex
Expected: FAIL with "Cannot find module './per-key-mutex.js'".
- Step 3: Implement
apps/bot/src/scheduler/per-key-mutex.ts
/**
* Async mutex keyed by a string. Different keys run in parallel;
* same-key calls serialise.
*
* Used by fire-reminder so two reminders on the SAME WhatsApp account
* take turns (running them concurrently would double the effective
* send rate and risk a ban), while reminders on DIFFERENT accounts
* proceed in parallel.
*
* The implementation is a chain-per-key Promise: each call appends
* its work to the key's tail. Empty chains are cleaned up so the
* Map doesn't grow unbounded across the bot's lifetime.
*/
export class PerKeyMutex {
private chains = new Map<string, Promise<void>>();
/** Run `fn` exclusively against `key`. Returns whatever `fn` returns. */
async run<T>(key: string, fn: () => Promise<T>): Promise<T> {
const prev = this.chains.get(key) ?? Promise.resolve();
let release!: () => void;
const completion = new Promise<void>((r) => (release = r));
// The chain we publish is "what was already there + this completion".
// The next caller awaits THIS, so order is preserved.
const chained = prev.then(() => completion);
this.chains.set(key, chained);
try {
await prev;
return await fn();
} finally {
release();
// Drop the entry only if no later caller has appended in the
// meantime — otherwise we'd evict the in-flight chain.
if (this.chains.get(key) === chained) {
this.chains.delete(key);
}
}
}
/** Diagnostic: how many keys currently have an in-flight or queued chain. */
activeKeyCount(): number {
return this.chains.size;
}
}
/**
* Singleton mutex used by fire-reminder, keyed by accountId. Lives at
* module scope so multiple pg-boss workers in the same process share
* state.
*/
export const accountMutex = new PerKeyMutex();
- Step 4: Run the test to confirm it passes
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run per-key-mutex
Expected: PASS, 7 tests.
- Step 5: Commit
git add apps/bot/src/scheduler/per-key-mutex.ts apps/bot/src/scheduler/per-key-mutex.test.ts
git commit -m "feat(bot): PerKeyMutex for accountId-keyed serialisation
Same key serialises, different keys run in parallel. Used by
fire-reminder to prevent two same-account fan-outs from doubling
the effective send rate (which would risk a WhatsApp ban). Chains
auto-clean empty entries so the Map doesn't leak."
Task 4: TokenBucket rate limiter (TDD)
Files:
-
Create:
apps/bot/src/scheduler/rate-limiter.test.ts -
Create:
apps/bot/src/scheduler/rate-limiter.ts -
Step 1: Write the failing test file at
apps/bot/src/scheduler/rate-limiter.test.ts
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import { TokenBucket, accountRateLimiter } from "./rate-limiter.js";
describe("TokenBucket", () => {
beforeEach(() => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-10T10:00:00Z"));
});
afterEach(() => {
vi.useRealTimers();
});
it("starts full: first N=capacity acquires resolve immediately", async () => {
const b = new TokenBucket({ ratePerMinute: 60, capacity: 5 });
for (let i = 0; i < 5; i++) {
await b.acquire(); // should not stall
}
// The 6th is a different test — see below.
});
it("blocks the (capacity+1)th acquire until a token regenerates", async () => {
// 60/min = 1 token per second. Capacity 2.
const b = new TokenBucket({ ratePerMinute: 60, capacity: 2 });
await b.acquire();
await b.acquire();
let resolved = false;
const pending = b.acquire().then(() => {
resolved = true;
});
// Immediately after draining, no token available.
await Promise.resolve();
expect(resolved).toBe(false);
// Advance the clock by exactly 1 second — one token should be back.
await vi.advanceTimersByTimeAsync(1000);
await pending;
expect(resolved).toBe(true);
});
it("FIFO: pending acquires resolve in the order they arrived", async () => {
const b = new TokenBucket({ ratePerMinute: 60, capacity: 1 });
await b.acquire(); // bucket empty
const order: number[] = [];
const a = b.acquire().then(() => order.push(1));
const c = b.acquire().then(() => order.push(2));
// Two seconds → two tokens regenerate, in order.
await vi.advanceTimersByTimeAsync(2000);
await Promise.all([a, c]);
expect(order).toEqual([1, 2]);
});
it("does not over-fill past capacity even if the clock leaps forward", async () => {
const b = new TokenBucket({ ratePerMinute: 60, capacity: 3 });
// Drain.
await b.acquire();
await b.acquire();
await b.acquire();
// Leap an hour. Naive impl would credit 3600 tokens; we should cap at 3.
await vi.advanceTimersByTimeAsync(3_600_000);
// 3 immediate acquires should resolve.
await b.acquire();
await b.acquire();
await b.acquire();
// The 4th waits for fresh regeneration.
let resolved = false;
b.acquire().then(() => (resolved = true));
await Promise.resolve();
expect(resolved).toBe(false);
});
it("ratePerMinute=0 is rejected at construction (caller bug, fail loud)", () => {
expect(() => new TokenBucket({ ratePerMinute: 0, capacity: 1 })).toThrow();
});
});
describe("accountRateLimiter (singleton)", () => {
beforeEach(() => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-10T10:00:00Z"));
});
afterEach(() => {
vi.useRealTimers();
});
it("returns the SAME bucket for repeated lookups of one accountId", () => {
const a1 = accountRateLimiter.get("acct-1");
const a2 = accountRateLimiter.get("acct-1");
expect(a1).toBe(a2);
});
it("returns DIFFERENT buckets for different accountIds (isolation)", () => {
const a = accountRateLimiter.get("acct-A");
const b = accountRateLimiter.get("acct-B");
expect(a).not.toBe(b);
});
it("a drained account A bucket does not block account B", async () => {
const a = accountRateLimiter.get("acct-A");
const b = accountRateLimiter.get("acct-B");
// Drain A entirely (capacity defaults to ratePerMinute, so we
// need to pull at least that many).
for (let i = 0; i < 40; i++) await a.acquire();
// B should still grant immediately.
let bResolved = false;
b.acquire().then(() => (bResolved = true));
await Promise.resolve();
expect(bResolved).toBe(true);
});
});
- Step 2: Run the failing test
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run rate-limiter
Expected: FAIL with "Cannot find module './rate-limiter.js'".
- Step 3: Implement
apps/bot/src/scheduler/rate-limiter.ts
import { env } from "../env.js";
/**
* Token bucket for per-account send pacing.
*
* Tokens regenerate at `ratePerMinute / 60` per second. Capacity caps
* how many can accumulate during idle periods (so the operator can't
* burst 1000 messages just because the account was quiet for a day).
*
* `acquire()` resolves when a token is available, FIFO across waiters.
* Used by fire-reminder to gate every `socket.sendMessage` call.
*/
export interface TokenBucketOptions {
ratePerMinute: number;
/** Defaults to ratePerMinute (one minute's worth). */
capacity?: number;
}
export class TokenBucket {
private readonly ratePerMs: number;
private readonly capacity: number;
private tokens: number;
private lastRefillMs: number;
private waiters: Array<() => void> = [];
constructor(opts: TokenBucketOptions) {
if (opts.ratePerMinute <= 0) {
throw new Error(`TokenBucket: ratePerMinute must be > 0, got ${opts.ratePerMinute}`);
}
this.ratePerMs = opts.ratePerMinute / 60_000;
this.capacity = opts.capacity ?? opts.ratePerMinute;
this.tokens = this.capacity;
this.lastRefillMs = Date.now();
}
/** Resolve when a token is available. FIFO across concurrent waiters. */
async acquire(): Promise<void> {
this.refill();
if (this.tokens >= 1 && this.waiters.length === 0) {
this.tokens -= 1;
return;
}
return new Promise<void>((resolve) => {
this.waiters.push(resolve);
this.scheduleNext();
});
}
private refill(): void {
const now = Date.now();
const elapsed = now - this.lastRefillMs;
if (elapsed <= 0) return;
const gained = elapsed * this.ratePerMs;
this.tokens = Math.min(this.capacity, this.tokens + gained);
this.lastRefillMs = now;
}
private scheduleNext(): void {
// Wait until at least one token is available, then drain waiters
// FIFO. We compute the gap from current fractional token deficit.
this.refill();
while (this.tokens >= 1 && this.waiters.length > 0) {
this.tokens -= 1;
const w = this.waiters.shift()!;
w();
}
if (this.waiters.length === 0) return;
const tokensShort = 1 - this.tokens;
const waitMs = Math.max(1, Math.ceil(tokensShort / this.ratePerMs));
setTimeout(() => this.scheduleNext(), waitMs);
}
}
/**
* Per-accountId TokenBucket registry. Each account gets its own
* pacing budget, so a slow account A never throttles account B.
*/
class AccountRateLimiter {
private buckets = new Map<string, TokenBucket>();
private ratePerMinute: number;
constructor(ratePerMinute: number) {
this.ratePerMinute = ratePerMinute;
}
get(accountId: string): TokenBucket {
let b = this.buckets.get(accountId);
if (!b) {
b = new TokenBucket({ ratePerMinute: this.ratePerMinute });
this.buckets.set(accountId, b);
}
return b;
}
}
export const accountRateLimiter = new AccountRateLimiter(env.BOT_MAX_SEND_PER_MINUTE);
- Step 4: Run the tests
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run rate-limiter
Expected: PASS, 8 tests.
- Step 5: Commit
git add apps/bot/src/scheduler/rate-limiter.ts apps/bot/src/scheduler/rate-limiter.test.ts
git commit -m "feat(bot): per-account token-bucket rate limiter
TokenBucket gates each socket.sendMessage call. Tokens regenerate at
ratePerMinute/60 per second, capped at one minute's worth so quiet
accounts can't burst. FIFO drain across concurrent waiters.
accountRateLimiter (singleton) hands out one bucket per accountId, so
account A's drain never throttles account B. Default rate is
BOT_MAX_SEND_PER_MINUTE (40) — the safe band for an established
WhatsApp account."
Task 5: Delivery window helper (TDD)
Files:
-
Create:
apps/bot/src/scheduler/delivery-window.test.ts -
Create:
apps/bot/src/scheduler/delivery-window.ts -
Step 1: Write the failing test file at
apps/bot/src/scheduler/delivery-window.test.ts
import { describe, it, expect } from "vitest";
import { windowEndAt } from "./delivery-window.js";
const TZ = "Asia/Kuala_Lumpur"; // UTC+8
describe("windowEndAt", () => {
it("returns today's end-hour boundary in the given timezone", () => {
// Fire at 2026-05-10 10:00 KL == 02:00 UTC. End hour 18 == 18:00 KL == 10:00 UTC.
const fireAt = new Date("2026-05-10T02:00:00.000Z");
const out = windowEndAt(TZ, 18, fireAt);
expect(out.toISOString()).toBe("2026-05-10T10:00:00.000Z");
});
it("returns a past timestamp when fireAt is already after the end hour", () => {
// Fire at 2026-05-10 19:00 KL == 11:00 UTC. End hour 18 → today's 18:00 KL == 10:00 UTC.
// That's BEFORE fireAt. The caller's first window-gate check trips immediately.
const fireAt = new Date("2026-05-10T11:00:00.000Z");
const out = windowEndAt(TZ, 18, fireAt);
expect(out.toISOString()).toBe("2026-05-10T10:00:00.000Z");
expect(out.getTime()).toBeLessThan(fireAt.getTime());
});
it("respects the timezone (UTC+0 vs UTC+8)", () => {
const fireAt = new Date("2026-05-10T02:00:00.000Z");
const inUtc = windowEndAt("UTC", 18, fireAt);
expect(inUtc.toISOString()).toBe("2026-05-10T18:00:00.000Z");
const inKl = windowEndAt("Asia/Kuala_Lumpur", 18, fireAt);
expect(inKl.toISOString()).toBe("2026-05-10T10:00:00.000Z");
});
it("handles end hour 24 as midnight of the same calendar day's end", () => {
// 2026-05-10 in KL ends at 2026-05-11 00:00 KL == 2026-05-10 16:00 UTC.
const fireAt = new Date("2026-05-10T02:00:00.000Z");
const out = windowEndAt(TZ, 24, fireAt);
expect(out.toISOString()).toBe("2026-05-10T16:00:00.000Z");
});
it("DST transition day stays on the SAME calendar day (no skipping forward)", () => {
// US/Eastern has DST. On 2026-03-08 (DST starts), 18:00 EDT is a real time.
// Fire at 2026-03-08 10:00 EST (15:00 UTC). End at 2026-03-08 18:00 EDT (22:00 UTC).
const fireAt = new Date("2026-03-08T15:00:00.000Z");
const out = windowEndAt("America/New_York", 18, fireAt);
expect(out.toISOString()).toBe("2026-03-08T22:00:00.000Z");
});
it("rejects end hour outside 0..24", () => {
const fireAt = new Date("2026-05-10T00:00:00Z");
expect(() => windowEndAt(TZ, -1, fireAt)).toThrow();
expect(() => windowEndAt(TZ, 25, fireAt)).toThrow();
});
});
- Step 2: Run the failing test
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run delivery-window
Expected: FAIL with "Cannot find module './delivery-window.js'".
- Step 3: Implement
apps/bot/src/scheduler/delivery-window.ts
import { DateTime } from "luxon";
/**
* Returns the end-of-window timestamp for the calendar day `fireAt`
* falls on, in the operator's timezone.
*
* windowEndAt("Asia/Kuala_Lumpur", 18, fireAt)
* → today's 18:00 KL (which may be in the past if fireAt is already
* past 18:00 KL — caller's first window-gate fires immediately).
*
* `endHour` is 0..24. Hour 24 is treated as midnight of the next
* calendar day (i.e. "end of today" inclusive).
*
* Pure: no I/O, no Date.now() reads, no clock dependency. Easy to
* test with fixture inputs.
*/
export function windowEndAt(
timezone: string,
endHour: number,
fireAt: Date,
): Date {
if (!Number.isInteger(endHour) || endHour < 0 || endHour > 24) {
throw new Error(`windowEndAt: endHour must be 0..24, got ${endHour}`);
}
const dt = DateTime.fromJSDate(fireAt).setZone(timezone);
if (!dt.isValid) {
throw new Error(`windowEndAt: invalid timezone "${timezone}"`);
}
// For hour 24, "end of day" is the next midnight. Luxon's `set` with
// hour=24 normalises into hour=0 of the next day, which is exactly
// what we want.
const end = dt.set({ hour: endHour, minute: 0, second: 0, millisecond: 0 });
return end.toJSDate();
}
- Step 4: Run the tests
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run delivery-window
Expected: PASS, 6 tests.
- Step 5: Commit
git add apps/bot/src/scheduler/delivery-window.ts apps/bot/src/scheduler/delivery-window.test.ts
git commit -m "feat(bot): pure delivery-window end calculator
windowEndAt(timezone, endHour, fireAt) returns the end-of-window for
the day fireAt is on. If fireAt is already past, the result is a
past timestamp — the run loop's first window gate trips immediately
and the entire run resolves as failed (zero sent), which is the
right behaviour for 'we can't send after window close'."
Task 6: MediaUploadCache (TDD with mock socket)
Files:
-
Create:
apps/bot/src/scheduler/media-upload-cache.test.ts -
Create:
apps/bot/src/scheduler/media-upload-cache.ts -
Step 1: Write the failing test file at
apps/bot/src/scheduler/media-upload-cache.test.ts
import { describe, it, expect, vi } from "vitest";
import { MediaUploadCache } from "./media-upload-cache.js";
describe("MediaUploadCache", () => {
it("uploads each unique mediaId exactly once across N gets", async () => {
const prepare = vi.fn(async (mediaId: string) => ({
kind: "prepared",
mediaId,
}));
const cache = new MediaUploadCache(prepare);
const a1 = await cache.get("media-A");
const a2 = await cache.get("media-A");
const b1 = await cache.get("media-B");
expect(prepare).toHaveBeenCalledTimes(2);
expect(prepare).toHaveBeenCalledWith("media-A");
expect(prepare).toHaveBeenCalledWith("media-B");
// Same handle returned for repeated lookups of A.
expect(a1).toBe(a2);
expect(a1).not.toBe(b1);
});
it("coalesces concurrent gets of the same mediaId into ONE prepare call", async () => {
let resolveA: (v: unknown) => void = () => {};
const aPromise = new Promise((r) => (resolveA = r));
const prepare = vi.fn(async (mediaId: string) => {
if (mediaId === "media-A") return aPromise;
return { kind: "prepared", mediaId };
});
const cache = new MediaUploadCache(prepare);
const p1 = cache.get("media-A");
const p2 = cache.get("media-A");
const p3 = cache.get("media-A");
// Resolve the in-flight prepare.
resolveA({ kind: "prepared", mediaId: "media-A" });
const [r1, r2, r3] = await Promise.all([p1, p2, p3]);
expect(prepare).toHaveBeenCalledTimes(1); // de-duplicated
expect(r1).toBe(r2);
expect(r2).toBe(r3);
});
it("a thrown prepare is NOT cached — next get retries", async () => {
let attempt = 0;
const prepare = vi.fn(async (_mediaId: string) => {
attempt++;
if (attempt === 1) throw new Error("upload network blip");
return { kind: "prepared", attempt };
});
const cache = new MediaUploadCache(prepare);
await expect(cache.get("media-A")).rejects.toThrow("upload network blip");
// Second attempt must call prepare again — DON'T cache failures.
const r = await cache.get("media-A");
expect(prepare).toHaveBeenCalledTimes(2);
expect(r).toEqual({ kind: "prepared", attempt: 2 });
});
it("size() reflects the number of cached unique mediaIds", async () => {
const prepare = async (mediaId: string) => ({ mediaId });
const cache = new MediaUploadCache(prepare);
expect(cache.size()).toBe(0);
await cache.get("a");
expect(cache.size()).toBe(1);
await cache.get("b");
expect(cache.size()).toBe(2);
await cache.get("a"); // already cached
expect(cache.size()).toBe(2);
});
});
- Step 2: Run the failing test
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run media-upload-cache
Expected: FAIL with "Cannot find module './media-upload-cache.js'".
- Step 3: Implement
apps/bot/src/scheduler/media-upload-cache.ts
/**
* Per-run cache of `prepareWAMessageMedia` results, keyed by
* `mediaId`. The point: when a reminder fans out to 1000 groups with
* one image, we want to upload that image to WhatsApp's CDN ONCE, not
* 1000 times. Subsequent group sends reuse the prepared message
* (with embedded directPath / mediaKey) via socket.relayMessage.
*
* Lifecycle: one cache instance per fire-reminder run. After the run
* completes, the cache is dropped — we don't share uploads across
* runs because WA media tokens are short-lived.
*
* Concurrent gets of the same mediaId are coalesced into a single
* prepare call. Failed prepares are NOT cached so the next attempt
* retries (network blips at upload time shouldn't poison the cache).
*/
export class MediaUploadCache<T> {
private readonly prepare: (mediaId: string) => Promise<T>;
private readonly entries = new Map<string, Promise<T>>();
constructor(prepare: (mediaId: string) => Promise<T>) {
this.prepare = prepare;
}
async get(mediaId: string): Promise<T> {
const existing = this.entries.get(mediaId);
if (existing) return existing;
const inflight = this.prepare(mediaId);
// Insert eagerly so concurrent gets dedupe.
this.entries.set(mediaId, inflight);
try {
return await inflight;
} catch (err) {
// Don't cache failures — the next caller should retry.
this.entries.delete(mediaId);
throw err;
}
}
size(): number {
return this.entries.size;
}
}
- Step 4: Run the tests
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run media-upload-cache
Expected: PASS, 4 tests.
- Step 5: Commit
git add apps/bot/src/scheduler/media-upload-cache.ts apps/bot/src/scheduler/media-upload-cache.test.ts
git commit -m "feat(bot): MediaUploadCache for once-per-run media prepare
One cache instance per fire-reminder run. Each unique mediaId gets
prepared (uploaded to WA CDN) exactly once, and subsequent group
sends within the run reuse the prepared message via relayMessage.
Concurrent gets coalesce into a single prepare. Failed prepares
don't poison the cache — next caller retries."
Task 7: Rewrite fire-reminder.ts and bump pg-boss teamSize
Files:
-
Modify:
apps/bot/src/scheduler/reminder-jobs.ts -
Rewrite:
apps/bot/src/scheduler/fire-reminder.ts -
Step 1: Update
apps/bot/src/scheduler/reminder-jobs.ts— passteamSize
Whole file:
import type { PgBoss } from "pg-boss";
import { logger } from "../logger.js";
import { env } from "../env.js";
import { fireReminder, type FireReminderPayload } from "./fire-reminder.js";
export const REMINDER_FIRE_QUEUE = "reminder.fire";
export async function registerReminderJobs(boss: PgBoss): Promise<void> {
await boss.createQueue(REMINDER_FIRE_QUEUE);
await boss.work<FireReminderPayload>(
REMINDER_FIRE_QUEUE,
{
// Up to BOT_FIRE_CONCURRENCY jobs in flight at once. Combined with
// the per-account mutex inside fireReminder, this lets reminders
// on DIFFERENT accounts run in parallel while same-account
// reminders take turns.
teamSize: env.BOT_FIRE_CONCURRENCY,
teamConcurrency: 1,
},
async (jobs) => {
const job = jobs[0];
if (!job) return;
logger.debug({ jobId: job.id, payload: job.data }, "reminder.fire: handling");
await fireReminder(job.data);
},
);
logger.info(
{ teamSize: env.BOT_FIRE_CONCURRENCY },
"reminder.fire: handler registered",
);
}
export async function scheduleReminderFire(
boss: PgBoss,
reminderId: string,
scheduledAt: Date,
): Promise<string | null> {
const id = await boss.send(
REMINDER_FIRE_QUEUE,
{ reminderId },
{
startAfter: scheduledAt,
retryLimit: 3,
retryDelay: 30,
retryBackoff: true,
// Use the reminderId as a singleton key so re-scheduling cancels the old job
singletonKey: `reminder:${reminderId}`,
},
);
logger.info({ reminderId, jobId: id, scheduledAt }, "reminder.fire: scheduled");
return id;
}
export async function cancelReminderFire(_boss: PgBoss, reminderId: string): Promise<void> {
// Soft cancel: pg-boss doesn't expose a clean cancel-by-singleton API in v12.
// The scheduled job will still fire, but `fireReminder` exits early when the
// reminder row is gone. Hard cancel can be added later by storing the jobId.
logger.info({ reminderId }, "reminder.fire: cancel requested (soft, fizzles on fire)");
}
- Step 2: Rewrite
apps/bot/src/scheduler/fire-reminder.ts
Whole file:
import { and, eq, inArray } from "drizzle-orm";
import { reminderRuns, reminderRunTargets, reminders } from "@cmbot/db";
import {
generateWAMessageContent,
generateMessageID,
type AnyMessageContent,
type proto,
} from "@whiskeysockets/baileys";
import pLimit from "p-limit";
import { db } from "../db.js";
import { logger } from "../logger.js";
import { sessionManager } from "../whatsapp/session-manager.js";
import { absoluteMediaPath, nextOccurrence, resolveDeliveryKind } from "@cmbot/shared";
import { open as fsOpen, readFile } from "node:fs/promises";
import { env } from "../env.js";
import { writeAuditLog } from "../audit.js";
import { getReminderWithDetails } from "../reminders/crud.js";
import { getBoss } from "./pgboss-client.js";
import { scheduleReminderFire } from "./reminder-jobs.js";
import { pgNotifyWeb } from "../ipc/notify.js";
import { accountMutex } from "./per-key-mutex.js";
import { accountRateLimiter } from "./rate-limiter.js";
import { windowEndAt } from "./delivery-window.js";
import { MediaUploadCache } from "./media-upload-cache.js";
export type FireReminderPayload = {
reminderId: string;
/** Optional resume hook. When present, fire-reminder ATTACHES to
* the existing run instead of creating a new one, and only
* re-attempts targets in `pending` status. Set by the resume
* server action. */
runId?: string;
};
/** Read the first N bytes of a file (used to sniff HEIF/AVIF/MOV brands). */
async function readHeadBytes(filePath: string, n: number): Promise<Uint8Array> {
const fh = await fsOpen(filePath, "r");
try {
const buf = new Uint8Array(n);
await fh.read(buf, 0, n, 0);
return buf;
} finally {
await fh.close();
}
}
/** Random delay between same-group message parts (ms). Just enough for
* visible ordering in the chat at WA's natural pace. */
function partJitterMs(): number {
return 200 + Math.floor(Math.random() * 300); // 200..499
}
export async function fireReminder(payload: FireReminderPayload): Promise<void> {
const reminder = await getReminderWithDetails(payload.reminderId);
if (!reminder) {
logger.warn({ reminderId: payload.reminderId }, "fire-reminder: reminder not found");
return;
}
if (reminder.status !== "active") {
logger.info({ reminderId: reminder.id, status: reminder.status }, "fire-reminder: skipping (not active)");
return;
}
// Per-account mutex: two reminders on the SAME account take turns.
// Different accounts run in parallel (cross-account isolation).
await accountMutex.run(reminder.accountId, () => fireReminderInner(reminder, payload.runId));
}
async function fireReminderInner(
reminder: NonNullable<Awaited<ReturnType<typeof getReminderWithDetails>>>,
resumeRunId?: string,
): Promise<void> {
// Resume path: attach to the existing run; fresh path: create one.
let runId: string;
if (resumeRunId) {
const existing = await db.query.reminderRuns.findFirst({
where: (r, { eq }) => eq(r.id, resumeRunId),
});
if (!existing) {
logger.warn({ reminderId: reminder.id, resumeRunId }, "fire-reminder: resume target run missing");
return;
}
runId = existing.id;
// Re-mark as in-flight so the UI shows the run is no longer paused.
await db
.update(reminderRuns)
.set({ status: "pending", errorSummary: null })
.where(eq(reminderRuns.id, runId));
} else {
const [run] = await db
.insert(reminderRuns)
.values({
reminderId: reminder.id,
reminderName: reminder.name,
status: "pending",
})
.returning({ id: reminderRuns.id });
runId = run!.id;
}
const session = sessionManager.getSession(reminder.accountId);
if (!session) {
logger.warn({ reminderId: reminder.id }, "fire-reminder: account not connected");
await markAllSkipped(runId, reminder, "account not connected");
await db.update(reminderRuns).set({ status: "skipped", errorSummary: "account not connected" }).where(eq(reminderRuns.id, runId));
await pgNotifyWeb({ type: "reminder.fired", reminderId: reminder.id, runId, status: "skipped" });
return;
}
// Up-front: load all groups + media rows in TWO bulk queries.
const groupIds = reminder.targets.map((t) => t.groupId);
const groupRows = groupIds.length
? await db.query.whatsappGroups.findMany({ where: (g) => inArray(g.id, groupIds) })
: [];
const groupById = new Map(groupRows.map((g) => [g.id, g]));
const mediaIds = Array.from(
new Set(reminder.messages.map((m) => m.mediaId).filter((id): id is string => Boolean(id))),
);
const mediaRows = mediaIds.length
? await db.query.mediaFiles.findMany({ where: (m) => inArray(m.id, mediaIds) })
: [];
const mediaById = new Map(mediaRows.map((m) => [m.id, m]));
// Pre-create run_targets rows so progress is observable mid-run.
if (reminder.targets.length > 0) {
await db.insert(reminderRunTargets).values(
reminder.targets.map((t) => ({
runId,
groupId: t.groupId,
groupLabel: groupById.get(t.groupId)?.name ?? null,
status: "pending" as const,
})),
);
}
// Window-end timestamp. If the reminder fires AFTER today's end-hour
// (e.g. cron miss-fired late) this is in the past — every iteration
// will trip the gate and the run resolves as failed.
const windowEnd = windowEndAt(
reminder.timezone,
reminder.deliveryWindowEndHour,
new Date(),
);
// Per-run media upload cache. Each unique mediaId is prepared via
// generateWAMessageContent ONCE (which uploads to WA's CDN); the
// resulting proto.Message is reused for every group via relayMessage.
// socket.waUploadToServer is the upload helper Baileys exposes.
const uploadCache = new MediaUploadCache<proto.Message>(async (mediaId) => {
const media = mediaById.get(mediaId);
if (!media) throw new Error(`media row missing: ${mediaId}`);
const filePath = absoluteMediaPath(media.storagePath, env.MEDIA_DIR);
const buffer = await readFile(filePath);
const head = buffer.subarray(0, 12);
const resolved = resolveDeliveryKind(media.mimeType, head);
const senderKind: "image" | "video" | "document" =
resolved === "image" || resolved === "video" ? resolved : "document";
const content: AnyMessageContent =
senderKind === "image"
? { image: buffer, mimetype: media.mimeType }
: senderKind === "video"
? { video: buffer, mimetype: media.mimeType }
: { document: buffer, fileName: media.filenameOriginal, mimetype: media.mimeType };
return generateWAMessageContent(content, {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
upload: (session.socket as any).waUploadToServer,
});
});
// Per-account rate limiter — gates each socket.sendMessage / relayMessage call.
const rateLimiter = accountRateLimiter.get(reminder.accountId);
let sentCount = 0;
let failedCount = 0;
let skippedCount = 0;
let windowClosed = false;
const groupConcurrency = pLimit(env.BOT_GROUP_CONCURRENCY);
await Promise.all(
reminder.targets.map((target) =>
groupConcurrency(async () => {
// Window-end gate — even if we already started other groups,
// any not-yet-started one stops here.
if (Date.now() >= windowEnd.getTime()) {
windowClosed = true;
await db
.update(reminderRunTargets)
.set({ status: "skipped", error: "delivery window closed" })
.where(
and(
eq(reminderRunTargets.runId, runId),
eq(reminderRunTargets.groupId, target.groupId),
),
);
skippedCount++;
return;
}
const group = groupById.get(target.groupId);
if (!group) {
await db
.update(reminderRunTargets)
.set({ status: "skipped", error: "group missing from db" })
.where(
and(
eq(reminderRunTargets.runId, runId),
eq(reminderRunTargets.groupId, target.groupId),
),
);
skippedCount++;
return;
}
const start = Date.now();
try {
let lastMessageId: string | undefined;
for (const part of reminder.messages) {
await rateLimiter.acquire();
if (part.kind === "text" && part.textContent) {
const r = await session.socket.sendMessage(group.waGroupJid, {
text: part.textContent,
});
lastMessageId = r?.key?.id ?? undefined;
} else if (part.mediaId) {
const prebuilt = await uploadCache.get(part.mediaId);
// Inject the caption (if any) just before relaying — the
// prebuilt content carries the media but each relay uses
// a fresh messageId.
if (part.textContent) {
injectCaption(prebuilt, part.textContent);
}
const messageId = generateMessageID();
await session.socket.relayMessage(group.waGroupJid, prebuilt, { messageId });
lastMessageId = messageId;
}
await new Promise((r) => setTimeout(r, partJitterMs()));
}
await db
.update(reminderRunTargets)
.set({
status: "sent",
waMessageId: lastMessageId ?? null,
latencyMs: Date.now() - start,
})
.where(
and(
eq(reminderRunTargets.runId, runId),
eq(reminderRunTargets.groupId, target.groupId),
),
);
sentCount++;
} catch (err) {
logger.error(
{ err, reminderId: reminder.id, groupId: target.groupId },
"fire-reminder: send failed",
);
await db
.update(reminderRunTargets)
.set({ status: "failed", error: (err as Error).message })
.where(
and(
eq(reminderRunTargets.runId, runId),
eq(reminderRunTargets.groupId, target.groupId),
),
);
failedCount++;
}
}),
),
);
const total = reminder.targets.length;
let status: "success" | "partial" | "failed";
let errorSummary: string | null = null;
if (sentCount === total) {
status = "success";
} else if (sentCount > 0) {
status = "partial";
errorSummary = windowClosed
? `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}). ${sentCount} of ${total} groups delivered. This account is at capacity for this fan-out — consider sending the remainder from another paired account.`
: `${sentCount} of ${total} groups delivered (${failedCount} failed, ${skippedCount} skipped).`;
} else {
status = "failed";
errorSummary = windowClosed
? `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}) before any group could be sent. The reminder fired too late in the day.`
: `All ${total} sends failed.`;
}
await db
.update(reminderRuns)
.set({ status, errorSummary })
.where(eq(reminderRuns.id, runId));
await pgNotifyWeb({
type: "reminder.fired",
reminderId: reminder.id,
runId,
status,
});
// One-off reminders end after firing. Recurring reminders re-arm.
if (reminder.scheduleKind === "one_off") {
await db
.update(reminders)
.set({ status: "ended", updatedAt: new Date() })
.where(eq(reminders.id, reminder.id));
} else if (reminder.scheduleKind === "recurring" && reminder.rrule) {
const next = nextOccurrence(reminder.rrule, reminder.timezone, new Date());
await db
.update(reminders)
.set({ lastFiredAt: new Date(), updatedAt: new Date() })
.where(eq(reminders.id, reminder.id));
if (next) {
try {
await scheduleReminderFire(getBoss(), reminder.id, next);
logger.info({ reminderId: reminder.id, next }, "fire-reminder: re-armed for next occurrence");
} catch (err) {
logger.error({ err, reminderId: reminder.id }, "fire-reminder: failed to re-arm next occurrence");
}
} else {
logger.info({ reminderId: reminder.id }, "fire-reminder: no further occurrences, ending");
await db.update(reminders).set({ status: "ended" }).where(eq(reminders.id, reminder.id));
}
}
await writeAuditLog(db, {
operatorId: reminder.createdBy,
source: "system",
action: "reminder.fired",
targetType: "reminder",
targetId: reminder.id,
payload: { runId, status, sent: sentCount, failed: failedCount, skipped: skippedCount },
});
logger.info(
{ reminderId: reminder.id, runId, status, sent: sentCount, failed: failedCount, skipped: skippedCount },
"fire-reminder: done",
);
}
/**
* Mark every target as skipped with the given error. Used when the
* account is offline before the loop even starts (no run_target rows
* have been created yet, so we INSERT instead of UPDATE).
*/
async function markAllSkipped(
runId: string,
reminder: NonNullable<Awaited<ReturnType<typeof getReminderWithDetails>>>,
error: string,
): Promise<void> {
if (reminder.targets.length === 0) return;
const rows = await db.query.whatsappGroups.findMany({
where: (g) => inArray(g.id, reminder.targets.map((t) => t.groupId)),
columns: { id: true, name: true },
});
const labelById = new Map(rows.map((r) => [r.id, r.name]));
await db.insert(reminderRunTargets).values(
reminder.targets.map((t) => ({
runId,
groupId: t.groupId,
groupLabel: labelById.get(t.groupId) ?? null,
status: "skipped" as const,
error,
})),
);
}
/**
* Mutates the prebuilt proto.Message to set the caption on whichever
* media variant it carries. Baileys' relayMessage does not let us
* pass the caption alongside; the protobuf already carries the slot.
*/
function injectCaption(msg: proto.Message, caption: string): void {
if (msg.imageMessage) msg.imageMessage.caption = caption;
else if (msg.videoMessage) msg.videoMessage.caption = caption;
else if (msg.documentMessage) msg.documentMessage.caption = caption;
}
- Step 3: Add
p-limitto apps/bot deps
NO_SUDO=1 ./scripts/dev.sh exec pnpm add -F @cmbot/bot p-limit
Expected: pnpm reports +1 package. The lockfile updates.
- Step 4: Typecheck the bot
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot typecheck
Expected: passes. If type errors mention relayMessage overloads, double-check the @whiskeysockets/baileys import added — the function comes from the socket instance, not the package's named exports.
- Step 5: Run all bot tests
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run
Expected: PASS for all (existing 26 + 7 PerKeyMutex + 8 RateLimiter + 6 DeliveryWindow + 4 MediaUploadCache = 51).
- Step 6: Restart the bot to pick up the rewrite
NO_SUDO=1 ./scripts/dev.sh restart-bot
NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml logs --tail=40 bot 2>&1 | grep -i "reminder.fire"
Expected: A line like reminder.fire: handler registered teamSize=8.
- Step 7: Commit
git add apps/bot/src/scheduler/fire-reminder.ts apps/bot/src/scheduler/reminder-jobs.ts apps/bot/package.json pnpm-lock.yaml
git commit -m "feat(bot): windowed, pacing-safe fan-out
Rewrites the per-target loop to:
- Wrap inner work in PerKeyMutex(accountId) so two reminders on the
same account take turns; different accounts run in parallel.
- Bulk-load groups and media rows up front (drops ~3000 round-trips
to ~3 for a 1000-group run).
- Pre-create run_target rows with status='pending' so the Activity
tab shows progress mid-run.
- Pre-upload each unique media via MediaUploadCache (one
generateWAMessageContent call per mediaId, then relayMessage to
every group). For 1000 groups × 5 MB image, this turns 5 GB of
upload into 5 MB.
- Run BOT_GROUP_CONCURRENCY (default 3) groups in parallel within
one account; each group's parts stay serial for visible order.
- Gate every socket call on a per-account TokenBucket
(BOT_MAX_SEND_PER_MINUTE, default 40).
- Replace the rigid 1.5s inter-part sleep with 200..499 ms jitter.
- Window-end gate: stop sending once today's end-hour (in the
reminder's timezone) has passed; mark unstarted targets skipped;
surface a partial-status message that names the timezone, the
delivered/total count, and points at multi-account offload.
reminder-jobs.ts now passes teamSize=BOT_FIRE_CONCURRENCY (default 8)
to boss.work so up to 8 different-account reminders run concurrently."
Task 8: Web wiring — schema, action, wizard input, notification body
Files:
-
Modify:
apps/web/src/actions/reminders.ts -
Modify:
apps/web/src/components/reminder-wizard/when-form-client.tsx -
Modify:
apps/web/src/components/reminder-edit/edit-when-form.tsx -
Modify:
apps/web/src/lib/notifications.ts -
Create:
apps/web/src/lib/notifications-body.test.ts(extends existing notifications tests with the partial-status body) -
Step 1: Update the create/update Zod schemas in
apps/web/src/actions/reminders.ts
Find the createReminderSchema definition (around lines 220–250) and add the two fields. Schema chunk:
const createReminderSchema = z
.object({
accountId: z.string().uuid(),
groupIds: z.array(z.string().uuid()),
messages: z.array(messagePartSchema).optional(),
name: z.string().nullable().optional(),
text: z.string().nullable().optional(),
mediaId: z.string().uuid().nullable().optional(),
caption: z.string().nullable().optional(),
scheduledAtIso: z.string().datetime({ offset: true }),
rrule: z.string().nullable().optional(),
timezone: z.string().default(DEFAULT_TIMEZONE),
// Delivery window. End hour is enforced at runtime by fire-reminder;
// start hour is documented but not gated in v1.
deliveryWindowStartHour: z.number().int().min(0).max(24).default(6),
deliveryWindowEndHour: z.number().int().min(0).max(24).default(18),
})
.refine(
(d) =>
(d.messages && d.messages.length > 0) ||
Boolean(d.text?.trim()) ||
Boolean(d.mediaId),
{
message: "Add a message or attach a file",
path: ["messages"],
},
)
.refine((d) => d.deliveryWindowStartHour < d.deliveryWindowEndHour, {
message: "Delivery window start must be earlier than end",
path: ["deliveryWindowStartHour"],
});
Then in BOTH createReminderAction and updateReminderAction, where the db.insert(reminders).values({...}) / db.update(reminders).set({...}) happens, include the two new fields. Find the values({ blocks (around line 350 and line 460) and add:
deliveryWindowStartHour: parsed.data.deliveryWindowStartHour,
deliveryWindowEndHour: parsed.data.deliveryWindowEndHour,
immediately after timezone,.
- Step 2: Add the input controls to
apps/web/src/components/reminder-wizard/when-form-client.tsx
Add to the props interface and component state. Top-of-file imports stay; add a new state and a new section in the JSX. The whole edit:
Find the interface WhenFormClientProps block. Add:
interface WhenFormClientProps {
accountId: string;
groupIds: string;
timezone: string;
initialDefaultIso: string;
initialSpec?: RecurrenceSpec;
initialDeliveryStartHour?: number;
initialDeliveryEndHour?: number;
passThroughParams: PassThroughParams;
}
In the component body (around state declarations near the top), add:
const [deliveryStartHour, setDeliveryStartHour] = useState<number>(
props.initialDeliveryStartHour ?? 6,
);
const [deliveryEndHour, setDeliveryEndHour] = useState<number>(
props.initialDeliveryEndHour ?? 18,
);
In handleContinue, when building the URLSearchParams, add (after the existing params):
sp.set("deliveryStartHour", String(deliveryStartHour));
sp.set("deliveryEndHour", String(deliveryEndHour));
In the JSX, just before the existing <RecurrencePicker> block, add a new "Delivery hours" card:
<div className="space-y-2">
<Label className="flex items-center gap-1.5">
<ClockIcon className="size-3.5" />
Delivery hours
</Label>
<div className="flex items-center gap-2">
<Input
type="number"
min={0}
max={24}
step={1}
value={deliveryStartHour}
onChange={(e) => setDeliveryStartHour(Number(e.target.value))}
className="h-9 w-20"
aria-label="Delivery start hour"
/>
<span className="text-sm text-muted-foreground">to</span>
<Input
type="number"
min={0}
max={24}
step={1}
value={deliveryEndHour}
onChange={(e) => setDeliveryEndHour(Number(e.target.value))}
className="h-9 w-20"
aria-label="Delivery end hour"
/>
<span className="text-xs text-muted-foreground">
(24-hour, in {timezone})
</span>
</div>
<p className="text-xs text-muted-foreground">
The bot stops sending after the end hour. Long fan-outs that don't
finish in this window get reported as partial.
</p>
</div>
Add ClockIcon to the lucide-react imports if it's not already there (file currently imports CalendarIcon, ClockIcon, AlertCircleIcon — already there).
- Step 3: Wire the new params into
apps/web/src/components/reminder-wizard/step-when.tsx
The wizard reads initialDeliveryStartHour / initialDeliveryEndHour from URL and passes them to <WhenFormClient>. Find where <WhenFormClient> is rendered and add the two new props:
<WhenFormClient
accountId={accountId}
groupIds={groupIds}
timezone={op.defaultTimezone ?? "UTC"}
initialDefaultIso={initialDefaultIso}
initialSpec={initialSpec}
initialDeliveryStartHour={
sp.deliveryStartHour ? Number(sp.deliveryStartHour) : undefined
}
initialDeliveryEndHour={
sp.deliveryEndHour ? Number(sp.deliveryEndHour) : undefined
}
passThroughParams={passThroughParams}
/>
Also update the interface StepWhenParams (or wherever the searchParams type lives in step-when.tsx) to include deliveryStartHour?: string; deliveryEndHour?: string;.
- Step 4: Pass-through in step-groups.tsx and step-review.tsx
Both step pages thread URL params through. Open each and find where they re-emit the URL (look for the URLSearchParams builder or editLink helper). Add:
if (sp.deliveryStartHour) backParams.set("deliveryStartHour", sp.deliveryStartHour);
if (sp.deliveryEndHour) backParams.set("deliveryEndHour", sp.deliveryEndHour);
Add the same to editLink in step-review.tsx so the "Edit when" link from the review page round-trips the values.
In review-submit-client.tsx, where the action payload is built, include the two fields:
const payload = {
accountId,
groupIds: groupIds ? groupIds.split(",").filter(Boolean) : [],
name: name?.trim() || null,
messages,
scheduledAtIso: scheduledAt,
rrule: rrule ?? null,
timezone,
deliveryWindowStartHour: deliveryStartHour ?? 6,
deliveryWindowEndHour: deliveryEndHour ?? 18,
};
(And add deliveryStartHour?: number; deliveryEndHour?: number; to the props interface and pass them in from step-review.tsx after parsing the URL.)
- Step 5: Update
apps/web/src/components/reminder-edit/edit-when-form.tsxsimilarly
Add the same Delivery hours card into the form, same state vars, and include the two fields in the updateReminderAction payload (find the existing updateReminderAction({...}) call). Pull the initial values from the loaded reminder row in the parent edit page (apps/web/src/app/reminders/[id]/edit/when/page.tsx):
<EditWhenForm
reminderId={reminder.id}
accountId={reminder.accountId}
groupIds={targets.map((t) => t.groupId)}
messages={initialMessages}
name={reminder.name}
initialIso={(reminder.scheduledAt ?? new Date()).toISOString()}
initialSpec={specFromRrule(reminder.rrule)}
timezone={reminder.timezone}
initialDeliveryStartHour={reminder.deliveryWindowStartHour}
initialDeliveryEndHour={reminder.deliveryWindowEndHour}
/>
The form's prop interface gets:
initialDeliveryStartHour?: number;
initialDeliveryEndHour?: number;
- Step 6: Extend the partial-status notification body in
apps/web/src/lib/notifications.ts
Find reminderFiredToNotification. The existing function takes { reminderId, runId, status }. Update the body to mention the delivered/total when status is partial:
export function reminderFiredToNotification(event: {
type: "reminder.fired";
reminderId: string;
runId: string;
status: string;
sent?: number;
total?: number;
}): ShowNotificationOptions | null {
if (event.status === "skipped") return null;
const headline =
event.status === "success"
? "Reminder sent"
: event.status === "partial"
? "Reminder partly sent"
: "Reminder failed";
let body =
event.status === "success"
? "All groups received the message."
: event.status === "partial"
? "Some groups received the message; others failed. See activity."
: "No groups received the message. See activity.";
if (event.status === "partial" && event.sent !== undefined && event.total !== undefined) {
body = `${event.sent} of ${event.total} groups delivered. See activity for details.`;
}
return {
title: headline,
body,
tag: `reminder:${event.reminderId}`,
href: `/reminders/${event.reminderId}`,
};
}
- Step 7: Add tests for the extended notification body
Append to apps/web/src/lib/notifications.test.ts (inside the existing describe("reminderFiredToNotification mapping", () => {...}) block):
it("partial with sent/total renders 'X of Y groups delivered' instead of the generic body", () => {
const args = reminderFiredToNotification({
type: "reminder.fired",
reminderId: "r-1",
runId: "run-1",
status: "partial",
sent: 412,
total: 1000,
});
expect(args?.body).toBe("412 of 1000 groups delivered. See activity for details.");
});
it("partial without sent/total falls back to the generic body", () => {
const args = reminderFiredToNotification({
type: "reminder.fired",
reminderId: "r-1",
runId: "run-1",
status: "partial",
});
expect(args?.body).toMatch(/Some groups received/);
});
- Step 8: Update existing edit-section-forms.test.tsx fixtures so they don't break on the new required props
Find apps/web/src/components/reminder-edit/edit-section-forms.test.tsx. Both EditAccountForm and EditGroupsForm may need additional defaults. If their props don't reference the new fields, the tests still pass — re-run them and patch only if compilation fails. Run:
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web typecheck
If typecheck reports Property 'initialDeliveryStartHour' is missing on any test, add initialDeliveryStartHour: 6, initialDeliveryEndHour: 18 to that fixture.
- Step 9: Run web tests + bot tests
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run
Expected: web 298+ pass, bot 26+25 = 51 pass. Total ~380 across the project including shared.
- Step 10: Restart web container so the server bundle picks up the schema change
NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml restart web
- Step 11: Commit
git add apps/web
git commit -m "feat(web): delivery-window inputs in wizard + edit; partial body extension
Wizard 'When' step and the per-section edit-when page get two number
inputs for the delivery window hours (default 6/18). Server actions
accept them on the create/update Zod schemas, validate
0 <= start < end <= 24, and persist to the new reminders columns.
Notification body for partial-status reminders now reads
'412 of 1000 groups delivered. See activity for details.' when the
SSE event carries sent/total counts."
Acceptance check (manual)
After all 8 tasks land:
-
Smoke 1. Create a reminder for 1 group with default delivery hours and a 30-second future fire time. Verify it lands; verify the run's
error_summaryis null and statussuccess. -
Smoke 2. Create a reminder with end hour set to a value that's already past in the operator's timezone. Schedule it to fire NOW. Verify the run resolves
failedand theerror_summaryreads "Delivery window closed at H:00 (TZ) before any group could be sent." -
Smoke 3. Create two reminders on TWO different paired accounts with the same
scheduledAt. Watch bot logs:NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml logs --tail=60 bot 2>&1 | grep -E "reminder|fire-reminder"Expected: both reminders'
fire-reminder: doneentries land within seconds of each other (parallel), not sequentially separated by a full fan-out. -
Smoke 4. Create a reminder with a 5-MB JPEG and 3 groups. Fire it. Bot logs should show only ONE
prepareWAMessageMedia/ upload entry (look for the upload size in the Baileys debug log) followed by 3 relayMessage events. -
Final test sweep:
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/shared test -- --runExpected: all green. ~380 total.