cm_whatsapp_bot_v1/docs/superpowers/plans/2026-05-10-windowed-fanout.md
yiekheng 082a70db06 docs: consolidate windowed-fanout spec/plan with ETA + paused/resume
Folds in three rounds of requirement evolution:

* Pause/resume on window close (was stop-and-report-partial).
* ETA preview pill at compose / edit time so the operator sees
  whether their chosen window will fit before scheduling.
* Interactive paused-run banner with Resume / Cancel buttons on the
  detail page; pause notification deep-links to it.

Helper relocations:

* windowEndAt() moves to packages/shared so both bot fire-reminder
  and the web ETA pill can import the same calculator.

Plan grows from 8 to 10 tasks: adds Task 9 (run-eta + RunEtaPill,
TDD) and Task 10 (resume/cancel actions + PausedRunBanner).
Acceptance gains two paused-flow smoke tests.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 14:33:51 +08:00

2549 lines
89 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Windowed, Pacing-Safe Reminder Fan-Out — Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Deliver reminders to many groups (1000+) safely within a per-reminder delivery window (default 6am6pm). On window-close mid-run, **pause** with a clear "account at capacity / consider offload or smaller media" message and let the operator **resume** later from the UI.
**Architecture:** Replace the current serial fan-out with a per-account isolation model. pg-boss `teamSize` raises so different-account reminders run in parallel; a per-key (accountId) async mutex serialises same-account fan-outs; a per-account token-bucket rate limiter paces sends; a per-run media-upload cache reuses each unique file across every group via Baileys' `prepareWAMessageMedia` + `relayMessage`; a window-end gate flips the run to `paused` (un-sent targets stay `pending`); a `resumeReminderRunAction` re-enters fire-reminder with the existing runId so the loop picks up only the still-pending targets.
**Tech Stack:** Node.js + TypeScript, Baileys 7.0.0-rc10 (`prepareWAMessageMedia`, `relayMessage`, `generateMessageID`), pg-boss v12 (`boss.work` with `teamSize`), Drizzle ORM + Postgres, vitest.
---
## File Structure
| File | Role |
| --- | --- |
| `packages/db/migrations/0008_*.sql` (generated) | add `delivery_window_start_hour`, `delivery_window_end_hour` to `reminders` |
| `packages/db/src/schema.ts` | drizzle alignment for the two new columns |
| `apps/bot/src/env.ts` | three new env vars: `BOT_FIRE_CONCURRENCY`, `BOT_GROUP_CONCURRENCY`, `BOT_MAX_SEND_PER_MINUTE` |
| `apps/bot/src/scheduler/per-key-mutex.ts` (new) | accountId-keyed async mutex |
| `apps/bot/src/scheduler/per-key-mutex.test.ts` (new) | unit tests |
| `apps/bot/src/scheduler/rate-limiter.ts` (new) | per-account token bucket |
| `apps/bot/src/scheduler/rate-limiter.test.ts` (new) | fake-clock unit tests |
| `packages/shared/src/delivery-window.ts` (new) | pure window-end calculator (shared bot+web) |
| `packages/shared/src/delivery-window.test.ts` (new) | unit tests |
| `apps/bot/src/scheduler/media-upload-cache.ts` (new) | `prepareWAMessageMedia` results, keyed by mediaId |
| `apps/bot/src/scheduler/media-upload-cache.test.ts` (new) | mock-socket unit tests |
| `apps/bot/src/scheduler/fire-reminder.ts` (rewrite) | new loop using all of the above; accepts optional `runId` for resume |
| `apps/bot/src/scheduler/reminder-jobs.ts` | pass `teamSize` config |
| `apps/web/src/actions/reminders.ts` | accept the two new fields; add `resumeReminderRunAction`, `cancelReminderRunAction` |
| `apps/web/src/components/reminder-wizard/when-form-client.tsx` | "Delivery hours" inputs |
| `apps/web/src/components/reminder-edit/edit-when-form.tsx` | same |
| `apps/web/src/lib/run-eta.ts` (new) | pure ETA calculator |
| `apps/web/src/components/reminder-wizard/run-eta-pill.tsx` (new) | green/amber ETA pill |
| `apps/web/src/components/reminder-detail/paused-run-banner.tsx` (new) | Resume / Cancel run banner on detail page |
| `apps/web/src/lib/notifications.ts` | paused + partial notification body extension |
---
## Task 1: Schema migration — delivery window columns
**Files:**
- Modify: `packages/db/src/schema.ts` (lines around the `reminders` table — add 2 columns)
- Generate: `packages/db/migrations/0008_<name>.sql`
- [ ] **Step 1: Edit `packages/db/src/schema.ts` — add the two columns to `reminders`**
Find the `reminders` table block and append the two new columns just before the closing `});`. The existing block ends with `lastFiredAt: timestamp(...)`. Add:
```ts
export const reminders = pgTable("reminders", {
id: uuid("id").primaryKey().defaultRandom(),
accountId: uuid("account_id").notNull().references(() => whatsappAccounts.id, { onDelete: "cascade" }),
name: text("name").notNull(),
scheduleKind: text("schedule_kind").notNull(),
scheduledAt: timestamp("scheduled_at", { withTimezone: true }),
rrule: text("rrule"),
timezone: text("timezone").notNull(),
endsAt: timestamp("ends_at", { withTimezone: true }),
maxRuns: integer("max_runs"),
status: text("status").notNull().default("active"),
createdBy: uuid("created_by").notNull().references(() => operators.id),
createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(),
updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(),
lastFiredAt: timestamp("last_fired_at", { withTimezone: true }),
// Delivery window — operator-supplied hours (in the row's timezone).
// Only the END hour is enforced at runtime in v1: the run loop stops
// sending once `now()` crosses today's end hour. The START hour is
// documented for the UI; not gated. See spec
// docs/superpowers/specs/2026-05-10-windowed-fanout-design.md.
deliveryWindowStartHour: integer("delivery_window_start_hour").notNull().default(6),
deliveryWindowEndHour: integer("delivery_window_end_hour").notNull().default(18),
});
```
- [ ] **Step 2: Generate the migration**
Run:
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/db generate
```
Expected: a file appears at `packages/db/migrations/0008_<name>.sql` containing `ALTER TABLE "reminders" ADD COLUMN "delivery_window_start_hour" integer DEFAULT 6 NOT NULL;` and a similar line for `delivery_window_end_hour`.
- [ ] **Step 3: Apply the migration**
Run:
```bash
NO_SUDO=1 ./scripts/db.sh migrate
```
Expected: `Migrations applied.` Verify with:
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/db build
```
Expected: clean tsc output (the @cmbot/db package re-builds so downstream consumers pick up the new columns).
- [ ] **Step 4: Typecheck the bot + web to confirm no schema-consumer regressions**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot typecheck
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web typecheck
```
Expected: both pass with no errors.
- [ ] **Step 5: Commit**
```bash
git add packages/db
git commit -m "feat(db): add delivery_window_start_hour / end_hour to reminders
Both default 6 / 18, stored as int (hour-of-day in the row's
timezone). Only the end hour is gated at runtime — see spec
docs/superpowers/specs/2026-05-10-windowed-fanout-design.md."
```
---
## Task 2: Bot env vars — concurrency + rate caps
**Files:**
- Modify: `apps/bot/src/env.ts`
- [ ] **Step 1: Replace `apps/bot/src/env.ts` contents**
Whole file:
```ts
import { z } from "zod";
const numberFromString = z.string().regex(/^\d+$/).transform((s) => Number(s));
const envSchema = z.object({
DATABASE_URL: z.string().url(),
DATA_DIR: z.string().min(1),
SESSIONS_DIR: z.string().min(1),
MEDIA_DIR: z.string().min(1),
BOT_HEALTH_PORT: numberFromString,
BOT_LOG_LEVEL: z.enum(["fatal", "error", "warn", "info", "debug", "trace"]).default("info"),
// Reminder fan-out tuning. Defaults aim for an established WhatsApp
// account (~30-60 msg/min safe band).
// - BOT_FIRE_CONCURRENCY: pg-boss worker pool size for the
// `reminder.fire` queue. Sets the max number of accounts that can
// run their fan-outs simultaneously. 8 is a sane default; raise if
// you have more paired accounts firing concurrently.
// - BOT_GROUP_CONCURRENCY: per-account parallel group sends. Each
// group's parts stay serial (preserves visible order in the chat).
// 3 has been empirically stable on real traffic; anything above 5
// without observation is asking for trouble.
// - BOT_MAX_SEND_PER_MINUTE: per-account token-bucket rate. 40 is the
// safe default; loosen to 60 only after a few weeks of running
// without flags. Tighten to 20 if WA returns rate-limit errors.
BOT_FIRE_CONCURRENCY: numberFromString.default("8"),
BOT_GROUP_CONCURRENCY: numberFromString.default("3"),
BOT_MAX_SEND_PER_MINUTE: numberFromString.default("40"),
});
export type Env = z.infer<typeof envSchema>;
export function parseEnv(input: Record<string, string | undefined>): Env {
return envSchema.parse(input);
}
export const env = parseEnv(process.env);
```
- [ ] **Step 2: Typecheck**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot typecheck
```
Expected: passes.
- [ ] **Step 3: Run the bot tests to confirm `parseEnv` defaults still validate**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run env
```
Expected: all `env.test.ts` tests pass (existing tests don't supply the new vars; defaults must kick in).
- [ ] **Step 4: Commit**
```bash
git add apps/bot/src/env.ts
git commit -m "feat(bot): add fan-out tuning env vars
BOT_FIRE_CONCURRENCY (8) — pg-boss worker pool, gates max accounts
running fan-outs simultaneously.
BOT_GROUP_CONCURRENCY (3) — per-account parallel group sends.
BOT_MAX_SEND_PER_MINUTE (40) — per-account rate-limit budget.
Defaults are tuned for an established WhatsApp account. See spec for
the safe band rationale."
```
---
## Task 3: PerKeyMutex (TDD)
**Files:**
- Create: `apps/bot/src/scheduler/per-key-mutex.test.ts`
- Create: `apps/bot/src/scheduler/per-key-mutex.ts`
- [ ] **Step 1: Write the failing test file at `apps/bot/src/scheduler/per-key-mutex.test.ts`**
```ts
import { describe, it, expect } from "vitest";
import { PerKeyMutex } from "./per-key-mutex.js";
/** Tiny clock-free helper: returns a Promise that resolves after
* `n` microtasks. Lets us check ordering without real timers. */
function tickN(n: number): Promise<void> {
let p: Promise<void> = Promise.resolve();
for (let i = 0; i < n; i++) p = p.then();
return p;
}
describe("PerKeyMutex", () => {
it("allows a single call against one key to run immediately", async () => {
const m = new PerKeyMutex();
const result = await m.run("k1", async () => 42);
expect(result).toBe(42);
});
it("serialises two calls against the same key", async () => {
const m = new PerKeyMutex();
const order: string[] = [];
const a = m.run("k1", async () => {
order.push("a-start");
await tickN(5);
order.push("a-end");
});
const b = m.run("k1", async () => {
order.push("b-start");
order.push("b-end");
});
await Promise.all([a, b]);
// b cannot start until a has finished.
expect(order).toEqual(["a-start", "a-end", "b-start", "b-end"]);
});
it("runs different keys in parallel", async () => {
const m = new PerKeyMutex();
const order: string[] = [];
const a = m.run("k1", async () => {
order.push("a-start");
await tickN(5);
order.push("a-end");
});
const b = m.run("k2", async () => {
order.push("b-start");
order.push("b-end");
});
await Promise.all([a, b]);
// b doesn't wait for a — interleaving expected.
expect(order[0]).toBe("a-start");
expect(order).toContain("b-start");
expect(order).toContain("b-end");
// b's pair lands before a's end.
expect(order.indexOf("b-end")).toBeLessThan(order.indexOf("a-end"));
});
it("releases the lock when the handler throws", async () => {
const m = new PerKeyMutex();
await expect(
m.run("k1", async () => {
throw new Error("boom");
}),
).rejects.toThrow("boom");
// Next call on the same key must NOT hang.
const result = await m.run("k1", async () => "after");
expect(result).toBe("after");
});
it("forwards the resolved value of the handler", async () => {
const m = new PerKeyMutex();
const out = await m.run("k1", async () => ({ ok: true, n: 7 }));
expect(out).toEqual({ ok: true, n: 7 });
});
it("cleans up internal state for keys with no waiters", async () => {
const m = new PerKeyMutex();
await m.run("k1", async () => {});
expect(m.activeKeyCount()).toBe(0);
});
it("retains a key while a chain is in flight, then drops it", async () => {
const m = new PerKeyMutex();
let release!: () => void;
const gate = new Promise<void>((r) => (release = r));
const inFlight = m.run("k1", () => gate);
expect(m.activeKeyCount()).toBe(1);
release();
await inFlight;
expect(m.activeKeyCount()).toBe(0);
});
});
```
- [ ] **Step 2: Run the failing test to confirm it fails for the right reason**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run per-key-mutex
```
Expected: FAIL with "Cannot find module './per-key-mutex.js'".
- [ ] **Step 3: Implement `apps/bot/src/scheduler/per-key-mutex.ts`**
```ts
/**
* Async mutex keyed by a string. Different keys run in parallel;
* same-key calls serialise.
*
* Used by fire-reminder so two reminders on the SAME WhatsApp account
* take turns (running them concurrently would double the effective
* send rate and risk a ban), while reminders on DIFFERENT accounts
* proceed in parallel.
*
* The implementation is a chain-per-key Promise: each call appends
* its work to the key's tail. Empty chains are cleaned up so the
* Map doesn't grow unbounded across the bot's lifetime.
*/
export class PerKeyMutex {
private chains = new Map<string, Promise<void>>();
/** Run `fn` exclusively against `key`. Returns whatever `fn` returns. */
async run<T>(key: string, fn: () => Promise<T>): Promise<T> {
const prev = this.chains.get(key) ?? Promise.resolve();
let release!: () => void;
const completion = new Promise<void>((r) => (release = r));
// The chain we publish is "what was already there + this completion".
// The next caller awaits THIS, so order is preserved.
const chained = prev.then(() => completion);
this.chains.set(key, chained);
try {
await prev;
return await fn();
} finally {
release();
// Drop the entry only if no later caller has appended in the
// meantime — otherwise we'd evict the in-flight chain.
if (this.chains.get(key) === chained) {
this.chains.delete(key);
}
}
}
/** Diagnostic: how many keys currently have an in-flight or queued chain. */
activeKeyCount(): number {
return this.chains.size;
}
}
/**
* Singleton mutex used by fire-reminder, keyed by accountId. Lives at
* module scope so multiple pg-boss workers in the same process share
* state.
*/
export const accountMutex = new PerKeyMutex();
```
- [ ] **Step 4: Run the test to confirm it passes**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run per-key-mutex
```
Expected: PASS, 7 tests.
- [ ] **Step 5: Commit**
```bash
git add apps/bot/src/scheduler/per-key-mutex.ts apps/bot/src/scheduler/per-key-mutex.test.ts
git commit -m "feat(bot): PerKeyMutex for accountId-keyed serialisation
Same key serialises, different keys run in parallel. Used by
fire-reminder to prevent two same-account fan-outs from doubling
the effective send rate (which would risk a WhatsApp ban). Chains
auto-clean empty entries so the Map doesn't leak."
```
---
## Task 4: TokenBucket rate limiter (TDD)
**Files:**
- Create: `apps/bot/src/scheduler/rate-limiter.test.ts`
- Create: `apps/bot/src/scheduler/rate-limiter.ts`
- [ ] **Step 1: Write the failing test file at `apps/bot/src/scheduler/rate-limiter.test.ts`**
```ts
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import { TokenBucket, accountRateLimiter } from "./rate-limiter.js";
describe("TokenBucket", () => {
beforeEach(() => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-10T10:00:00Z"));
});
afterEach(() => {
vi.useRealTimers();
});
it("starts full: first N=capacity acquires resolve immediately", async () => {
const b = new TokenBucket({ ratePerMinute: 60, capacity: 5 });
for (let i = 0; i < 5; i++) {
await b.acquire(); // should not stall
}
// The 6th is a different test — see below.
});
it("blocks the (capacity+1)th acquire until a token regenerates", async () => {
// 60/min = 1 token per second. Capacity 2.
const b = new TokenBucket({ ratePerMinute: 60, capacity: 2 });
await b.acquire();
await b.acquire();
let resolved = false;
const pending = b.acquire().then(() => {
resolved = true;
});
// Immediately after draining, no token available.
await Promise.resolve();
expect(resolved).toBe(false);
// Advance the clock by exactly 1 second — one token should be back.
await vi.advanceTimersByTimeAsync(1000);
await pending;
expect(resolved).toBe(true);
});
it("FIFO: pending acquires resolve in the order they arrived", async () => {
const b = new TokenBucket({ ratePerMinute: 60, capacity: 1 });
await b.acquire(); // bucket empty
const order: number[] = [];
const a = b.acquire().then(() => order.push(1));
const c = b.acquire().then(() => order.push(2));
// Two seconds → two tokens regenerate, in order.
await vi.advanceTimersByTimeAsync(2000);
await Promise.all([a, c]);
expect(order).toEqual([1, 2]);
});
it("does not over-fill past capacity even if the clock leaps forward", async () => {
const b = new TokenBucket({ ratePerMinute: 60, capacity: 3 });
// Drain.
await b.acquire();
await b.acquire();
await b.acquire();
// Leap an hour. Naive impl would credit 3600 tokens; we should cap at 3.
await vi.advanceTimersByTimeAsync(3_600_000);
// 3 immediate acquires should resolve.
await b.acquire();
await b.acquire();
await b.acquire();
// The 4th waits for fresh regeneration.
let resolved = false;
b.acquire().then(() => (resolved = true));
await Promise.resolve();
expect(resolved).toBe(false);
});
it("ratePerMinute=0 is rejected at construction (caller bug, fail loud)", () => {
expect(() => new TokenBucket({ ratePerMinute: 0, capacity: 1 })).toThrow();
});
});
describe("accountRateLimiter (singleton)", () => {
beforeEach(() => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-10T10:00:00Z"));
});
afterEach(() => {
vi.useRealTimers();
});
it("returns the SAME bucket for repeated lookups of one accountId", () => {
const a1 = accountRateLimiter.get("acct-1");
const a2 = accountRateLimiter.get("acct-1");
expect(a1).toBe(a2);
});
it("returns DIFFERENT buckets for different accountIds (isolation)", () => {
const a = accountRateLimiter.get("acct-A");
const b = accountRateLimiter.get("acct-B");
expect(a).not.toBe(b);
});
it("a drained account A bucket does not block account B", async () => {
const a = accountRateLimiter.get("acct-A");
const b = accountRateLimiter.get("acct-B");
// Drain A entirely (capacity defaults to ratePerMinute, so we
// need to pull at least that many).
for (let i = 0; i < 40; i++) await a.acquire();
// B should still grant immediately.
let bResolved = false;
b.acquire().then(() => (bResolved = true));
await Promise.resolve();
expect(bResolved).toBe(true);
});
});
```
- [ ] **Step 2: Run the failing test**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run rate-limiter
```
Expected: FAIL with "Cannot find module './rate-limiter.js'".
- [ ] **Step 3: Implement `apps/bot/src/scheduler/rate-limiter.ts`**
```ts
import { env } from "../env.js";
/**
* Token bucket for per-account send pacing.
*
* Tokens regenerate at `ratePerMinute / 60` per second. Capacity caps
* how many can accumulate during idle periods (so the operator can't
* burst 1000 messages just because the account was quiet for a day).
*
* `acquire()` resolves when a token is available, FIFO across waiters.
* Used by fire-reminder to gate every `socket.sendMessage` call.
*/
export interface TokenBucketOptions {
ratePerMinute: number;
/** Defaults to ratePerMinute (one minute's worth). */
capacity?: number;
}
export class TokenBucket {
private readonly ratePerMs: number;
private readonly capacity: number;
private tokens: number;
private lastRefillMs: number;
private waiters: Array<() => void> = [];
constructor(opts: TokenBucketOptions) {
if (opts.ratePerMinute <= 0) {
throw new Error(`TokenBucket: ratePerMinute must be > 0, got ${opts.ratePerMinute}`);
}
this.ratePerMs = opts.ratePerMinute / 60_000;
this.capacity = opts.capacity ?? opts.ratePerMinute;
this.tokens = this.capacity;
this.lastRefillMs = Date.now();
}
/** Resolve when a token is available. FIFO across concurrent waiters. */
async acquire(): Promise<void> {
this.refill();
if (this.tokens >= 1 && this.waiters.length === 0) {
this.tokens -= 1;
return;
}
return new Promise<void>((resolve) => {
this.waiters.push(resolve);
this.scheduleNext();
});
}
private refill(): void {
const now = Date.now();
const elapsed = now - this.lastRefillMs;
if (elapsed <= 0) return;
const gained = elapsed * this.ratePerMs;
this.tokens = Math.min(this.capacity, this.tokens + gained);
this.lastRefillMs = now;
}
private scheduleNext(): void {
// Wait until at least one token is available, then drain waiters
// FIFO. We compute the gap from current fractional token deficit.
this.refill();
while (this.tokens >= 1 && this.waiters.length > 0) {
this.tokens -= 1;
const w = this.waiters.shift()!;
w();
}
if (this.waiters.length === 0) return;
const tokensShort = 1 - this.tokens;
const waitMs = Math.max(1, Math.ceil(tokensShort / this.ratePerMs));
setTimeout(() => this.scheduleNext(), waitMs);
}
}
/**
* Per-accountId TokenBucket registry. Each account gets its own
* pacing budget, so a slow account A never throttles account B.
*/
class AccountRateLimiter {
private buckets = new Map<string, TokenBucket>();
private ratePerMinute: number;
constructor(ratePerMinute: number) {
this.ratePerMinute = ratePerMinute;
}
get(accountId: string): TokenBucket {
let b = this.buckets.get(accountId);
if (!b) {
b = new TokenBucket({ ratePerMinute: this.ratePerMinute });
this.buckets.set(accountId, b);
}
return b;
}
}
export const accountRateLimiter = new AccountRateLimiter(env.BOT_MAX_SEND_PER_MINUTE);
```
- [ ] **Step 4: Run the tests**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run rate-limiter
```
Expected: PASS, 8 tests.
- [ ] **Step 5: Commit**
```bash
git add apps/bot/src/scheduler/rate-limiter.ts apps/bot/src/scheduler/rate-limiter.test.ts
git commit -m "feat(bot): per-account token-bucket rate limiter
TokenBucket gates each socket.sendMessage call. Tokens regenerate at
ratePerMinute/60 per second, capped at one minute's worth so quiet
accounts can't burst. FIFO drain across concurrent waiters.
accountRateLimiter (singleton) hands out one bucket per accountId, so
account A's drain never throttles account B. Default rate is
BOT_MAX_SEND_PER_MINUTE (40) — the safe band for an established
WhatsApp account."
```
---
## Task 5: Delivery window helper (TDD)
The helper lives in `packages/shared` (NOT the bot) because both bundles need it: bot's fire-reminder loop gates on it, and web's ETA pill (Task 9) compares ETA against it to flip the green/amber state.
**Files:**
- Create: `packages/shared/src/delivery-window.test.ts`
- Create: `packages/shared/src/delivery-window.ts`
- Modify: `packages/shared/src/index.ts` (re-export `windowEndAt`)
- [ ] **Step 1: Write the failing test file at `packages/shared/src/delivery-window.test.ts`**
```ts
import { describe, it, expect } from "vitest";
import { windowEndAt } from "./delivery-window.js";
const TZ = "Asia/Kuala_Lumpur"; // UTC+8
describe("windowEndAt", () => {
it("returns today's end-hour boundary in the given timezone", () => {
// Fire at 2026-05-10 10:00 KL == 02:00 UTC. End hour 18 == 18:00 KL == 10:00 UTC.
const fireAt = new Date("2026-05-10T02:00:00.000Z");
const out = windowEndAt(TZ, 18, fireAt);
expect(out.toISOString()).toBe("2026-05-10T10:00:00.000Z");
});
it("returns a past timestamp when fireAt is already after the end hour", () => {
// Fire at 2026-05-10 19:00 KL == 11:00 UTC. End hour 18 → today's 18:00 KL == 10:00 UTC.
// That's BEFORE fireAt. The caller's first window-gate check trips immediately.
const fireAt = new Date("2026-05-10T11:00:00.000Z");
const out = windowEndAt(TZ, 18, fireAt);
expect(out.toISOString()).toBe("2026-05-10T10:00:00.000Z");
expect(out.getTime()).toBeLessThan(fireAt.getTime());
});
it("respects the timezone (UTC+0 vs UTC+8)", () => {
const fireAt = new Date("2026-05-10T02:00:00.000Z");
const inUtc = windowEndAt("UTC", 18, fireAt);
expect(inUtc.toISOString()).toBe("2026-05-10T18:00:00.000Z");
const inKl = windowEndAt("Asia/Kuala_Lumpur", 18, fireAt);
expect(inKl.toISOString()).toBe("2026-05-10T10:00:00.000Z");
});
it("handles end hour 24 as midnight of the same calendar day's end", () => {
// 2026-05-10 in KL ends at 2026-05-11 00:00 KL == 2026-05-10 16:00 UTC.
const fireAt = new Date("2026-05-10T02:00:00.000Z");
const out = windowEndAt(TZ, 24, fireAt);
expect(out.toISOString()).toBe("2026-05-10T16:00:00.000Z");
});
it("DST transition day stays on the SAME calendar day (no skipping forward)", () => {
// US/Eastern has DST. On 2026-03-08 (DST starts), 18:00 EDT is a real time.
// Fire at 2026-03-08 10:00 EST (15:00 UTC). End at 2026-03-08 18:00 EDT (22:00 UTC).
const fireAt = new Date("2026-03-08T15:00:00.000Z");
const out = windowEndAt("America/New_York", 18, fireAt);
expect(out.toISOString()).toBe("2026-03-08T22:00:00.000Z");
});
it("rejects end hour outside 0..24", () => {
const fireAt = new Date("2026-05-10T00:00:00Z");
expect(() => windowEndAt(TZ, -1, fireAt)).toThrow();
expect(() => windowEndAt(TZ, 25, fireAt)).toThrow();
});
});
```
- [ ] **Step 2: Run the failing test**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/shared test -- --run delivery-window
```
Expected: FAIL with "Cannot find module './delivery-window.js'".
- [ ] **Step 3: Implement `packages/shared/src/delivery-window.ts`**
```ts
import { DateTime } from "luxon";
/**
* Returns the end-of-window timestamp for the calendar day `fireAt`
* falls on, in the operator's timezone.
*
* windowEndAt("Asia/Kuala_Lumpur", 18, fireAt)
* → today's 18:00 KL (which may be in the past if fireAt is already
* past 18:00 KL — caller's first window-gate fires immediately).
*
* `endHour` is 0..24. Hour 24 is treated as midnight of the next
* calendar day (i.e. "end of today" inclusive).
*
* Pure: no I/O, no Date.now() reads, no clock dependency. Easy to
* test with fixture inputs.
*/
export function windowEndAt(
timezone: string,
endHour: number,
fireAt: Date,
): Date {
if (!Number.isInteger(endHour) || endHour < 0 || endHour > 24) {
throw new Error(`windowEndAt: endHour must be 0..24, got ${endHour}`);
}
const dt = DateTime.fromJSDate(fireAt).setZone(timezone);
if (!dt.isValid) {
throw new Error(`windowEndAt: invalid timezone "${timezone}"`);
}
// For hour 24, "end of day" is the next midnight. Luxon's `set` with
// hour=24 normalises into hour=0 of the next day, which is exactly
// what we want.
const end = dt.set({ hour: endHour, minute: 0, second: 0, millisecond: 0 });
return end.toJSDate();
}
```
- [ ] **Step 4: Re-export from `packages/shared/src/index.ts`**
Append:
```ts
export { windowEndAt } from "./delivery-window.js";
```
- [ ] **Step 5: Run the tests**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/shared test -- --run delivery-window
```
Expected: PASS, 6 tests.
- [ ] **Step 6: Commit**
```bash
git add packages/shared/src/delivery-window.ts packages/shared/src/delivery-window.test.ts packages/shared/src/index.ts
git commit -m "feat(shared): pure delivery-window end calculator
windowEndAt(timezone, endHour, fireAt) returns the end-of-window for
the day fireAt is on. If fireAt is already past, the result is a
past timestamp — the run loop's first window gate trips immediately
and the entire run resolves as failed (zero sent), which is the
right behaviour for 'we can't send after window close'."
```
---
## Task 6: MediaUploadCache (TDD with mock socket)
**Files:**
- Create: `apps/bot/src/scheduler/media-upload-cache.test.ts`
- Create: `apps/bot/src/scheduler/media-upload-cache.ts`
- [ ] **Step 1: Write the failing test file at `apps/bot/src/scheduler/media-upload-cache.test.ts`**
```ts
import { describe, it, expect, vi } from "vitest";
import { MediaUploadCache } from "./media-upload-cache.js";
describe("MediaUploadCache", () => {
it("uploads each unique mediaId exactly once across N gets", async () => {
const prepare = vi.fn(async (mediaId: string) => ({
kind: "prepared",
mediaId,
}));
const cache = new MediaUploadCache(prepare);
const a1 = await cache.get("media-A");
const a2 = await cache.get("media-A");
const b1 = await cache.get("media-B");
expect(prepare).toHaveBeenCalledTimes(2);
expect(prepare).toHaveBeenCalledWith("media-A");
expect(prepare).toHaveBeenCalledWith("media-B");
// Same handle returned for repeated lookups of A.
expect(a1).toBe(a2);
expect(a1).not.toBe(b1);
});
it("coalesces concurrent gets of the same mediaId into ONE prepare call", async () => {
let resolveA: (v: unknown) => void = () => {};
const aPromise = new Promise((r) => (resolveA = r));
const prepare = vi.fn(async (mediaId: string) => {
if (mediaId === "media-A") return aPromise;
return { kind: "prepared", mediaId };
});
const cache = new MediaUploadCache(prepare);
const p1 = cache.get("media-A");
const p2 = cache.get("media-A");
const p3 = cache.get("media-A");
// Resolve the in-flight prepare.
resolveA({ kind: "prepared", mediaId: "media-A" });
const [r1, r2, r3] = await Promise.all([p1, p2, p3]);
expect(prepare).toHaveBeenCalledTimes(1); // de-duplicated
expect(r1).toBe(r2);
expect(r2).toBe(r3);
});
it("a thrown prepare is NOT cached — next get retries", async () => {
let attempt = 0;
const prepare = vi.fn(async (_mediaId: string) => {
attempt++;
if (attempt === 1) throw new Error("upload network blip");
return { kind: "prepared", attempt };
});
const cache = new MediaUploadCache(prepare);
await expect(cache.get("media-A")).rejects.toThrow("upload network blip");
// Second attempt must call prepare again — DON'T cache failures.
const r = await cache.get("media-A");
expect(prepare).toHaveBeenCalledTimes(2);
expect(r).toEqual({ kind: "prepared", attempt: 2 });
});
it("size() reflects the number of cached unique mediaIds", async () => {
const prepare = async (mediaId: string) => ({ mediaId });
const cache = new MediaUploadCache(prepare);
expect(cache.size()).toBe(0);
await cache.get("a");
expect(cache.size()).toBe(1);
await cache.get("b");
expect(cache.size()).toBe(2);
await cache.get("a"); // already cached
expect(cache.size()).toBe(2);
});
});
```
- [ ] **Step 2: Run the failing test**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run media-upload-cache
```
Expected: FAIL with "Cannot find module './media-upload-cache.js'".
- [ ] **Step 3: Implement `apps/bot/src/scheduler/media-upload-cache.ts`**
```ts
/**
* Per-run cache of `prepareWAMessageMedia` results, keyed by
* `mediaId`. The point: when a reminder fans out to 1000 groups with
* one image, we want to upload that image to WhatsApp's CDN ONCE, not
* 1000 times. Subsequent group sends reuse the prepared message
* (with embedded directPath / mediaKey) via socket.relayMessage.
*
* Lifecycle: one cache instance per fire-reminder run. After the run
* completes, the cache is dropped — we don't share uploads across
* runs because WA media tokens are short-lived.
*
* Concurrent gets of the same mediaId are coalesced into a single
* prepare call. Failed prepares are NOT cached so the next attempt
* retries (network blips at upload time shouldn't poison the cache).
*/
export class MediaUploadCache<T> {
private readonly prepare: (mediaId: string) => Promise<T>;
private readonly entries = new Map<string, Promise<T>>();
constructor(prepare: (mediaId: string) => Promise<T>) {
this.prepare = prepare;
}
async get(mediaId: string): Promise<T> {
const existing = this.entries.get(mediaId);
if (existing) return existing;
const inflight = this.prepare(mediaId);
// Insert eagerly so concurrent gets dedupe.
this.entries.set(mediaId, inflight);
try {
return await inflight;
} catch (err) {
// Don't cache failures — the next caller should retry.
this.entries.delete(mediaId);
throw err;
}
}
size(): number {
return this.entries.size;
}
}
```
- [ ] **Step 4: Run the tests**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run media-upload-cache
```
Expected: PASS, 4 tests.
- [ ] **Step 5: Commit**
```bash
git add apps/bot/src/scheduler/media-upload-cache.ts apps/bot/src/scheduler/media-upload-cache.test.ts
git commit -m "feat(bot): MediaUploadCache for once-per-run media prepare
One cache instance per fire-reminder run. Each unique mediaId gets
prepared (uploaded to WA CDN) exactly once, and subsequent group
sends within the run reuse the prepared message via relayMessage.
Concurrent gets coalesce into a single prepare. Failed prepares
don't poison the cache — next caller retries."
```
---
## Task 7: Rewrite `fire-reminder.ts` and bump pg-boss `teamSize`
**Files:**
- Modify: `apps/bot/src/scheduler/reminder-jobs.ts`
- Rewrite: `apps/bot/src/scheduler/fire-reminder.ts`
- [ ] **Step 1: Update `apps/bot/src/scheduler/reminder-jobs.ts` — pass `teamSize`**
Whole file:
```ts
import type { PgBoss } from "pg-boss";
import { logger } from "../logger.js";
import { env } from "../env.js";
import { fireReminder, type FireReminderPayload } from "./fire-reminder.js";
export const REMINDER_FIRE_QUEUE = "reminder.fire";
export async function registerReminderJobs(boss: PgBoss): Promise<void> {
await boss.createQueue(REMINDER_FIRE_QUEUE);
await boss.work<FireReminderPayload>(
REMINDER_FIRE_QUEUE,
{
// Up to BOT_FIRE_CONCURRENCY jobs in flight at once. Combined with
// the per-account mutex inside fireReminder, this lets reminders
// on DIFFERENT accounts run in parallel while same-account
// reminders take turns.
teamSize: env.BOT_FIRE_CONCURRENCY,
teamConcurrency: 1,
},
async (jobs) => {
const job = jobs[0];
if (!job) return;
logger.debug({ jobId: job.id, payload: job.data }, "reminder.fire: handling");
await fireReminder(job.data);
},
);
logger.info(
{ teamSize: env.BOT_FIRE_CONCURRENCY },
"reminder.fire: handler registered",
);
}
export async function scheduleReminderFire(
boss: PgBoss,
reminderId: string,
scheduledAt: Date,
): Promise<string | null> {
const id = await boss.send(
REMINDER_FIRE_QUEUE,
{ reminderId },
{
startAfter: scheduledAt,
retryLimit: 3,
retryDelay: 30,
retryBackoff: true,
// Use the reminderId as a singleton key so re-scheduling cancels the old job
singletonKey: `reminder:${reminderId}`,
},
);
logger.info({ reminderId, jobId: id, scheduledAt }, "reminder.fire: scheduled");
return id;
}
export async function cancelReminderFire(_boss: PgBoss, reminderId: string): Promise<void> {
// Soft cancel: pg-boss doesn't expose a clean cancel-by-singleton API in v12.
// The scheduled job will still fire, but `fireReminder` exits early when the
// reminder row is gone. Hard cancel can be added later by storing the jobId.
logger.info({ reminderId }, "reminder.fire: cancel requested (soft, fizzles on fire)");
}
```
- [ ] **Step 2: Rewrite `apps/bot/src/scheduler/fire-reminder.ts`**
Whole file:
```ts
import { and, eq, inArray } from "drizzle-orm";
import { reminderRuns, reminderRunTargets, reminders } from "@cmbot/db";
import {
generateWAMessageContent,
generateMessageID,
type AnyMessageContent,
type proto,
} from "@whiskeysockets/baileys";
import pLimit from "p-limit";
import { db } from "../db.js";
import { logger } from "../logger.js";
import { sessionManager } from "../whatsapp/session-manager.js";
import { absoluteMediaPath, nextOccurrence, resolveDeliveryKind } from "@cmbot/shared";
import { open as fsOpen, readFile } from "node:fs/promises";
import { env } from "../env.js";
import { writeAuditLog } from "../audit.js";
import { getReminderWithDetails } from "../reminders/crud.js";
import { getBoss } from "./pgboss-client.js";
import { scheduleReminderFire } from "./reminder-jobs.js";
import { pgNotifyWeb } from "../ipc/notify.js";
import { accountMutex } from "./per-key-mutex.js";
import { accountRateLimiter } from "./rate-limiter.js";
import { windowEndAt } from "@cmbot/shared";
import { MediaUploadCache } from "./media-upload-cache.js";
export type FireReminderPayload = {
reminderId: string;
/** Optional resume hook. When present, fire-reminder ATTACHES to
* the existing run instead of creating a new one, and only
* re-attempts targets in `pending` status. Set by the resume
* server action. */
runId?: string;
};
/** Read the first N bytes of a file (used to sniff HEIF/AVIF/MOV brands). */
async function readHeadBytes(filePath: string, n: number): Promise<Uint8Array> {
const fh = await fsOpen(filePath, "r");
try {
const buf = new Uint8Array(n);
await fh.read(buf, 0, n, 0);
return buf;
} finally {
await fh.close();
}
}
/** Random delay between same-group message parts (ms). Just enough for
* visible ordering in the chat at WA's natural pace. */
function partJitterMs(): number {
return 200 + Math.floor(Math.random() * 300); // 200..499
}
export async function fireReminder(payload: FireReminderPayload): Promise<void> {
const reminder = await getReminderWithDetails(payload.reminderId);
if (!reminder) {
logger.warn({ reminderId: payload.reminderId }, "fire-reminder: reminder not found");
return;
}
if (reminder.status !== "active") {
logger.info({ reminderId: reminder.id, status: reminder.status }, "fire-reminder: skipping (not active)");
return;
}
// Per-account mutex: two reminders on the SAME account take turns.
// Different accounts run in parallel (cross-account isolation).
await accountMutex.run(reminder.accountId, () => fireReminderInner(reminder, payload.runId));
}
async function fireReminderInner(
reminder: NonNullable<Awaited<ReturnType<typeof getReminderWithDetails>>>,
resumeRunId?: string,
): Promise<void> {
// Resume path: attach to the existing run; fresh path: create one.
let runId: string;
if (resumeRunId) {
const existing = await db.query.reminderRuns.findFirst({
where: (r, { eq }) => eq(r.id, resumeRunId),
});
if (!existing) {
logger.warn({ reminderId: reminder.id, resumeRunId }, "fire-reminder: resume target run missing");
return;
}
runId = existing.id;
// Re-mark as in-flight so the UI shows the run is no longer paused.
await db
.update(reminderRuns)
.set({ status: "pending", errorSummary: null })
.where(eq(reminderRuns.id, runId));
} else {
const [run] = await db
.insert(reminderRuns)
.values({
reminderId: reminder.id,
reminderName: reminder.name,
status: "pending",
})
.returning({ id: reminderRuns.id });
runId = run!.id;
}
const session = sessionManager.getSession(reminder.accountId);
if (!session) {
logger.warn({ reminderId: reminder.id }, "fire-reminder: account not connected");
await markAllSkipped(runId, reminder, "account not connected");
await db.update(reminderRuns).set({ status: "skipped", errorSummary: "account not connected" }).where(eq(reminderRuns.id, runId));
await pgNotifyWeb({ type: "reminder.fired", reminderId: reminder.id, runId, status: "skipped" });
return;
}
// Up-front: load all groups + media rows in TWO bulk queries.
const groupIds = reminder.targets.map((t) => t.groupId);
const groupRows = groupIds.length
? await db.query.whatsappGroups.findMany({ where: (g) => inArray(g.id, groupIds) })
: [];
const groupById = new Map(groupRows.map((g) => [g.id, g]));
const mediaIds = Array.from(
new Set(reminder.messages.map((m) => m.mediaId).filter((id): id is string => Boolean(id))),
);
const mediaRows = mediaIds.length
? await db.query.mediaFiles.findMany({ where: (m) => inArray(m.id, mediaIds) })
: [];
const mediaById = new Map(mediaRows.map((m) => [m.id, m]));
// Pre-create run_targets rows so progress is observable mid-run.
// On a RESUME, the rows already exist — only the original fire path
// inserts them. The resume path skips this; the loop below filters
// to only the still-pending rows.
if (!resumeRunId && reminder.targets.length > 0) {
await db.insert(reminderRunTargets).values(
reminder.targets.map((t) => ({
runId,
groupId: t.groupId,
groupLabel: groupById.get(t.groupId)?.name ?? null,
status: "pending" as const,
})),
);
}
// On a RESUME, only the still-pending targets need attention. On
// a fresh fire, every target is pending. Either way we read the
// current run_target rows from the DB to be the source of truth
// about what's left to do.
const pendingRows = await db.query.reminderRunTargets.findMany({
where: (t, { eq, and: drizzleAnd }) =>
drizzleAnd(eq(t.runId, runId), eq(t.status, "pending")),
});
const pendingGroupIds = new Set(pendingRows.map((r) => r.groupId));
const targetsToProcess = reminder.targets.filter((t) =>
pendingGroupIds.has(t.groupId),
);
// Already-sent count from prior run (so the final tally adds to total).
const priorSentCount = resumeRunId
? (
await db.query.reminderRunTargets.findMany({
where: (t, { eq, and: drizzleAnd }) =>
drizzleAnd(eq(t.runId, runId), eq(t.status, "sent")),
})
).length
: 0;
const priorFailedCount = resumeRunId
? (
await db.query.reminderRunTargets.findMany({
where: (t, { eq, and: drizzleAnd }) =>
drizzleAnd(eq(t.runId, runId), eq(t.status, "failed")),
})
).length
: 0;
// Window-end timestamp. If the reminder fires AFTER today's end-hour
// (e.g. cron miss-fired late) this is in the past — every iteration
// will trip the gate and the run resolves as failed.
const windowEnd = windowEndAt(
reminder.timezone,
reminder.deliveryWindowEndHour,
new Date(),
);
// Per-run media upload cache. Each unique mediaId is prepared via
// generateWAMessageContent ONCE (which uploads to WA's CDN); the
// resulting proto.Message is reused for every group via relayMessage.
// socket.waUploadToServer is the upload helper Baileys exposes.
const uploadCache = new MediaUploadCache<proto.Message>(async (mediaId) => {
const media = mediaById.get(mediaId);
if (!media) throw new Error(`media row missing: ${mediaId}`);
const filePath = absoluteMediaPath(media.storagePath, env.MEDIA_DIR);
const buffer = await readFile(filePath);
const head = buffer.subarray(0, 12);
const resolved = resolveDeliveryKind(media.mimeType, head);
const senderKind: "image" | "video" | "document" =
resolved === "image" || resolved === "video" ? resolved : "document";
const content: AnyMessageContent =
senderKind === "image"
? { image: buffer, mimetype: media.mimeType }
: senderKind === "video"
? { video: buffer, mimetype: media.mimeType }
: { document: buffer, fileName: media.filenameOriginal, mimetype: media.mimeType };
return generateWAMessageContent(content, {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
upload: (session.socket as any).waUploadToServer,
});
});
// Per-account rate limiter — gates each socket.sendMessage / relayMessage call.
const rateLimiter = accountRateLimiter.get(reminder.accountId);
let sentCount = 0;
let failedCount = 0;
let skippedCount = 0;
let windowClosed = false;
const groupConcurrency = pLimit(env.BOT_GROUP_CONCURRENCY);
await Promise.all(
targetsToProcess.map((target) =>
groupConcurrency(async () => {
// Window-end gate. CRITICAL: leave the row as `pending` (NOT
// `skipped`) so the run can be resumed later. The run as a
// whole flips to `paused` after this loop.
if (Date.now() >= windowEnd.getTime()) {
windowClosed = true;
// Don't touch the row — it's already `pending`. Just count.
return;
}
const group = groupById.get(target.groupId);
if (!group) {
await db
.update(reminderRunTargets)
.set({ status: "skipped", error: "group missing from db" })
.where(
and(
eq(reminderRunTargets.runId, runId),
eq(reminderRunTargets.groupId, target.groupId),
),
);
skippedCount++;
return;
}
const start = Date.now();
try {
let lastMessageId: string | undefined;
for (const part of reminder.messages) {
await rateLimiter.acquire();
if (part.kind === "text" && part.textContent) {
const r = await session.socket.sendMessage(group.waGroupJid, {
text: part.textContent,
});
lastMessageId = r?.key?.id ?? undefined;
} else if (part.mediaId) {
const prebuilt = await uploadCache.get(part.mediaId);
// Inject the caption (if any) just before relaying — the
// prebuilt content carries the media but each relay uses
// a fresh messageId.
if (part.textContent) {
injectCaption(prebuilt, part.textContent);
}
const messageId = generateMessageID();
await session.socket.relayMessage(group.waGroupJid, prebuilt, { messageId });
lastMessageId = messageId;
}
await new Promise((r) => setTimeout(r, partJitterMs()));
}
await db
.update(reminderRunTargets)
.set({
status: "sent",
waMessageId: lastMessageId ?? null,
latencyMs: Date.now() - start,
})
.where(
and(
eq(reminderRunTargets.runId, runId),
eq(reminderRunTargets.groupId, target.groupId),
),
);
sentCount++;
} catch (err) {
logger.error(
{ err, reminderId: reminder.id, groupId: target.groupId },
"fire-reminder: send failed",
);
await db
.update(reminderRunTargets)
.set({ status: "failed", error: (err as Error).message })
.where(
and(
eq(reminderRunTargets.runId, runId),
eq(reminderRunTargets.groupId, target.groupId),
),
);
failedCount++;
}
}),
),
);
// Final status. The four shapes:
// - paused : window closed with at least one row STILL pending.
// Resumable. Sent rows stay sent, pending stays pending.
// - success : every target sent (no failures, no pending).
// - partial : every target was attempted; some sent, some failed
// or skipped. NOT resumable; failures are real.
// - failed : zero sent. Either every send errored, or the window
// was already closed when the run began (nothing
// attempted, but no pending-with-progress to resume).
const total = reminder.targets.length;
const totalSent = priorSentCount + sentCount;
const totalFailed = priorFailedCount + failedCount;
// Re-read pending count from the DB so the count reflects whatever
// the loop left behind (any window-skipped rows are still pending).
const remainingPending = (
await db.query.reminderRunTargets.findMany({
where: (t, { eq, and: drizzleAnd }) =>
drizzleAnd(eq(t.runId, runId), eq(t.status, "pending")),
})
).length;
let status: "success" | "partial" | "failed" | "paused";
let errorSummary: string | null = null;
if (windowClosed && remainingPending > 0 && totalSent > 0) {
status = "paused";
errorSummary = `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}). ${totalSent} of ${total} groups delivered, ${remainingPending} still pending. Resume from the Activity tab. If this happens repeatedly, consider offloading to another paired account, or shrinking the message body / media size to fit more groups in your daily window.`;
} else if (windowClosed && totalSent === 0) {
// Window was closed before any send happened. Not paused — there's
// nothing meaningful to resume. Counts as a hard failure.
status = "failed";
errorSummary = `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}) before any group could be sent. The reminder fired too late in the day.`;
} else if (totalSent === total) {
status = "success";
} else if (totalSent > 0) {
status = "partial";
errorSummary = `${totalSent} of ${total} groups delivered (${totalFailed} failed, ${skippedCount} skipped).`;
} else {
status = "failed";
errorSummary = `All ${total} sends failed.`;
}
await db
.update(reminderRuns)
.set({ status, errorSummary })
.where(eq(reminderRuns.id, runId));
await pgNotifyWeb({
type: "reminder.fired",
reminderId: reminder.id,
runId,
status,
});
// Lifecycle bookkeeping. Skip when the run is paused — the reminder
// shouldn't end or re-arm while a resume is still possible.
const runIsTerminal = status !== "paused";
if (runIsTerminal) {
if (reminder.scheduleKind === "one_off") {
await db
.update(reminders)
.set({ status: "ended", updatedAt: new Date() })
.where(eq(reminders.id, reminder.id));
} else if (reminder.scheduleKind === "recurring" && reminder.rrule) {
const next = nextOccurrence(reminder.rrule, reminder.timezone, new Date());
await db
.update(reminders)
.set({ lastFiredAt: new Date(), updatedAt: new Date() })
.where(eq(reminders.id, reminder.id));
if (next) {
try {
await scheduleReminderFire(getBoss(), reminder.id, next);
logger.info({ reminderId: reminder.id, next }, "fire-reminder: re-armed for next occurrence");
} catch (err) {
logger.error({ err, reminderId: reminder.id }, "fire-reminder: failed to re-arm next occurrence");
}
} else {
logger.info({ reminderId: reminder.id }, "fire-reminder: no further occurrences, ending");
await db.update(reminders).set({ status: "ended" }).where(eq(reminders.id, reminder.id));
}
}
} else {
logger.info(
{ reminderId: reminder.id, runId },
"fire-reminder: paused — leaving reminder lifecycle unchanged for resume",
);
}
await writeAuditLog(db, {
operatorId: reminder.createdBy,
source: "system",
action: "reminder.fired",
targetType: "reminder",
targetId: reminder.id,
payload: { runId, status, sent: sentCount, failed: failedCount, skipped: skippedCount },
});
logger.info(
{ reminderId: reminder.id, runId, status, sent: sentCount, failed: failedCount, skipped: skippedCount },
"fire-reminder: done",
);
}
/**
* Mark every target as skipped with the given error. Used when the
* account is offline before the loop even starts (no run_target rows
* have been created yet, so we INSERT instead of UPDATE).
*/
async function markAllSkipped(
runId: string,
reminder: NonNullable<Awaited<ReturnType<typeof getReminderWithDetails>>>,
error: string,
): Promise<void> {
if (reminder.targets.length === 0) return;
const rows = await db.query.whatsappGroups.findMany({
where: (g) => inArray(g.id, reminder.targets.map((t) => t.groupId)),
columns: { id: true, name: true },
});
const labelById = new Map(rows.map((r) => [r.id, r.name]));
await db.insert(reminderRunTargets).values(
reminder.targets.map((t) => ({
runId,
groupId: t.groupId,
groupLabel: labelById.get(t.groupId) ?? null,
status: "skipped" as const,
error,
})),
);
}
/**
* Mutates the prebuilt proto.Message to set the caption on whichever
* media variant it carries. Baileys' relayMessage does not let us
* pass the caption alongside; the protobuf already carries the slot.
*/
function injectCaption(msg: proto.Message, caption: string): void {
if (msg.imageMessage) msg.imageMessage.caption = caption;
else if (msg.videoMessage) msg.videoMessage.caption = caption;
else if (msg.documentMessage) msg.documentMessage.caption = caption;
}
```
- [ ] **Step 3: Add `p-limit` to apps/bot deps**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm add -F @cmbot/bot p-limit
```
Expected: pnpm reports `+1` package. The lockfile updates.
- [ ] **Step 4: Typecheck the bot**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot typecheck
```
Expected: passes. If type errors mention `relayMessage` overloads, double-check the `@whiskeysockets/baileys` import added — the function comes from the socket instance, not the package's named exports.
- [ ] **Step 5: Run all bot tests**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run
```
Expected: PASS for all (existing 26 + 7 PerKeyMutex + 8 RateLimiter + 6 DeliveryWindow + 4 MediaUploadCache = 51).
- [ ] **Step 6: Restart the bot to pick up the rewrite**
```bash
NO_SUDO=1 ./scripts/dev.sh restart-bot
NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml logs --tail=40 bot 2>&1 | grep -i "reminder.fire"
```
Expected: A line like `reminder.fire: handler registered teamSize=8`.
- [ ] **Step 7: Commit**
```bash
git add apps/bot/src/scheduler/fire-reminder.ts apps/bot/src/scheduler/reminder-jobs.ts apps/bot/package.json pnpm-lock.yaml
git commit -m "feat(bot): windowed, pacing-safe fan-out
Rewrites the per-target loop to:
- Wrap inner work in PerKeyMutex(accountId) so two reminders on the
same account take turns; different accounts run in parallel.
- Bulk-load groups and media rows up front (drops ~3000 round-trips
to ~3 for a 1000-group run).
- Pre-create run_target rows with status='pending' so the Activity
tab shows progress mid-run.
- Pre-upload each unique media via MediaUploadCache (one
generateWAMessageContent call per mediaId, then relayMessage to
every group). For 1000 groups × 5 MB image, this turns 5 GB of
upload into 5 MB.
- Run BOT_GROUP_CONCURRENCY (default 3) groups in parallel within
one account; each group's parts stay serial for visible order.
- Gate every socket call on a per-account TokenBucket
(BOT_MAX_SEND_PER_MINUTE, default 40).
- Replace the rigid 1.5s inter-part sleep with 200..499 ms jitter.
- Window-end gate: stop sending once today's end-hour (in the
reminder's timezone) has passed; mark unstarted targets skipped;
surface a partial-status message that names the timezone, the
delivered/total count, and points at multi-account offload.
reminder-jobs.ts now passes teamSize=BOT_FIRE_CONCURRENCY (default 8)
to boss.work so up to 8 different-account reminders run concurrently."
```
---
## Task 8: Web wiring — schema, action, wizard input, notification body
**Files:**
- Modify: `apps/web/src/actions/reminders.ts`
- Modify: `apps/web/src/components/reminder-wizard/when-form-client.tsx`
- Modify: `apps/web/src/components/reminder-edit/edit-when-form.tsx`
- Modify: `apps/web/src/lib/notifications.ts`
- Create: `apps/web/src/lib/notifications-body.test.ts` (extends existing notifications tests with the partial-status body)
- [ ] **Step 1: Update the create/update Zod schemas in `apps/web/src/actions/reminders.ts`**
Find the `createReminderSchema` definition (around lines 220250) and add the two fields. Schema chunk:
```ts
const createReminderSchema = z
.object({
accountId: z.string().uuid(),
groupIds: z.array(z.string().uuid()),
messages: z.array(messagePartSchema).optional(),
name: z.string().nullable().optional(),
text: z.string().nullable().optional(),
mediaId: z.string().uuid().nullable().optional(),
caption: z.string().nullable().optional(),
scheduledAtIso: z.string().datetime({ offset: true }),
rrule: z.string().nullable().optional(),
timezone: z.string().default(DEFAULT_TIMEZONE),
// Delivery window. End hour is enforced at runtime by fire-reminder;
// start hour is documented but not gated in v1.
deliveryWindowStartHour: z.number().int().min(0).max(24).default(6),
deliveryWindowEndHour: z.number().int().min(0).max(24).default(18),
})
.refine(
(d) =>
(d.messages && d.messages.length > 0) ||
Boolean(d.text?.trim()) ||
Boolean(d.mediaId),
{
message: "Add a message or attach a file",
path: ["messages"],
},
)
.refine((d) => d.deliveryWindowStartHour < d.deliveryWindowEndHour, {
message: "Delivery window start must be earlier than end",
path: ["deliveryWindowStartHour"],
});
```
Then in BOTH `createReminderAction` and `updateReminderAction`, where the `db.insert(reminders).values({...})` / `db.update(reminders).set({...})` happens, include the two new fields. Find the `values({` blocks (around line 350 and line 460) and add:
```ts
deliveryWindowStartHour: parsed.data.deliveryWindowStartHour,
deliveryWindowEndHour: parsed.data.deliveryWindowEndHour,
```
immediately after `timezone,`.
- [ ] **Step 2: Add the input controls to `apps/web/src/components/reminder-wizard/when-form-client.tsx`**
Add to the props interface and component state. Top-of-file imports stay; add a new state and a new section in the JSX. The whole edit:
Find the `interface WhenFormClientProps` block. Add:
```ts
interface WhenFormClientProps {
accountId: string;
groupIds: string;
timezone: string;
initialDefaultIso: string;
initialSpec?: RecurrenceSpec;
initialDeliveryStartHour?: number;
initialDeliveryEndHour?: number;
passThroughParams: PassThroughParams;
}
```
In the component body (around state declarations near the top), add:
```ts
const [deliveryStartHour, setDeliveryStartHour] = useState<number>(
props.initialDeliveryStartHour ?? 6,
);
const [deliveryEndHour, setDeliveryEndHour] = useState<number>(
props.initialDeliveryEndHour ?? 18,
);
```
In `handleContinue`, when building the URLSearchParams, add (after the existing params):
```ts
sp.set("deliveryStartHour", String(deliveryStartHour));
sp.set("deliveryEndHour", String(deliveryEndHour));
```
In the JSX, just before the existing `<RecurrencePicker>` block, add a new "Delivery hours" card:
```tsx
<div className="space-y-2">
<Label className="flex items-center gap-1.5">
<ClockIcon className="size-3.5" />
Delivery hours
</Label>
<div className="flex items-center gap-2">
<Input
type="number"
min={0}
max={24}
step={1}
value={deliveryStartHour}
onChange={(e) => setDeliveryStartHour(Number(e.target.value))}
className="h-9 w-20"
aria-label="Delivery start hour"
/>
<span className="text-sm text-muted-foreground">to</span>
<Input
type="number"
min={0}
max={24}
step={1}
value={deliveryEndHour}
onChange={(e) => setDeliveryEndHour(Number(e.target.value))}
className="h-9 w-20"
aria-label="Delivery end hour"
/>
<span className="text-xs text-muted-foreground">
(24-hour, in {timezone})
</span>
</div>
<p className="text-xs text-muted-foreground">
The bot stops sending after the end hour. Long fan-outs that don&apos;t
finish in this window are paused you can resume them from the
Activity tab.
</p>
</div>
```
Add `ClockIcon` to the lucide-react imports if it's not already there (file currently imports `CalendarIcon`, `ClockIcon`, `AlertCircleIcon` — already there).
- [ ] **Step 3: Wire the new params into `apps/web/src/components/reminder-wizard/step-when.tsx`**
The wizard reads `initialDeliveryStartHour` / `initialDeliveryEndHour` from URL and passes them to `<WhenFormClient>`. Find where `<WhenFormClient>` is rendered and add the two new props:
```tsx
<WhenFormClient
accountId={accountId}
groupIds={groupIds}
timezone={op.defaultTimezone ?? "UTC"}
initialDefaultIso={initialDefaultIso}
initialSpec={initialSpec}
initialDeliveryStartHour={
sp.deliveryStartHour ? Number(sp.deliveryStartHour) : undefined
}
initialDeliveryEndHour={
sp.deliveryEndHour ? Number(sp.deliveryEndHour) : undefined
}
passThroughParams={passThroughParams}
/>
```
Also update the `interface StepWhenParams` (or wherever the searchParams type lives in step-when.tsx) to include `deliveryStartHour?: string; deliveryEndHour?: string;`.
- [ ] **Step 4: Pass-through in step-groups.tsx and step-review.tsx**
Both step pages thread URL params through. Open each and find where they re-emit the URL (look for the `URLSearchParams` builder or `editLink` helper). Add:
```ts
if (sp.deliveryStartHour) backParams.set("deliveryStartHour", sp.deliveryStartHour);
if (sp.deliveryEndHour) backParams.set("deliveryEndHour", sp.deliveryEndHour);
```
Add the same to `editLink` in step-review.tsx so the "Edit when" link from the review page round-trips the values.
In `review-submit-client.tsx`, where the action payload is built, include the two fields:
```ts
const payload = {
accountId,
groupIds: groupIds ? groupIds.split(",").filter(Boolean) : [],
name: name?.trim() || null,
messages,
scheduledAtIso: scheduledAt,
rrule: rrule ?? null,
timezone,
deliveryWindowStartHour: deliveryStartHour ?? 6,
deliveryWindowEndHour: deliveryEndHour ?? 18,
};
```
(And add `deliveryStartHour?: number; deliveryEndHour?: number;` to the props interface and pass them in from `step-review.tsx` after parsing the URL.)
- [ ] **Step 5: Update `apps/web/src/components/reminder-edit/edit-when-form.tsx` similarly**
Add the same `Delivery hours` card into the form, same state vars, and include the two fields in the `updateReminderAction` payload (find the existing `updateReminderAction({...})` call). Pull the initial values from the loaded reminder row in the parent edit page (`apps/web/src/app/reminders/[id]/edit/when/page.tsx`):
```tsx
<EditWhenForm
reminderId={reminder.id}
accountId={reminder.accountId}
groupIds={targets.map((t) => t.groupId)}
messages={initialMessages}
name={reminder.name}
initialIso={(reminder.scheduledAt ?? new Date()).toISOString()}
initialSpec={specFromRrule(reminder.rrule)}
timezone={reminder.timezone}
initialDeliveryStartHour={reminder.deliveryWindowStartHour}
initialDeliveryEndHour={reminder.deliveryWindowEndHour}
/>
```
The form's prop interface gets:
```ts
initialDeliveryStartHour?: number;
initialDeliveryEndHour?: number;
```
- [ ] **Step 6: Extend the notification body for paused + partial in `apps/web/src/lib/notifications.ts`**
Find `reminderFiredToNotification`. The existing function takes `{ reminderId, runId, status }`. Extend the event shape to carry `sent`/`total` and handle `paused` as a first-class status:
```ts
export function reminderFiredToNotification(event: {
type: "reminder.fired";
reminderId: string;
runId: string;
status: string;
sent?: number;
total?: number;
}): ShowNotificationOptions | null {
if (event.status === "skipped") return null;
const headline =
event.status === "success"
? "Reminder sent"
: event.status === "paused"
? "Reminder paused"
: event.status === "partial"
? "Reminder partly sent"
: "Reminder failed";
let body =
event.status === "success"
? "All groups received the message."
: event.status === "paused"
? "Delivery window closed before all groups got the message."
: event.status === "partial"
? "Some groups received the message; others failed. See activity."
: "No groups received the message. See activity.";
if (event.status === "paused" && event.sent !== undefined && event.total !== undefined) {
body = `${event.sent} of ${event.total} groups delivered. Tap to resume or cancel.`;
} else if (event.status === "partial" && event.sent !== undefined && event.total !== undefined) {
body = `${event.sent} of ${event.total} groups delivered. See activity for details.`;
}
return {
title: headline,
body,
tag: `reminder:${event.reminderId}`,
href: `/reminders/${event.reminderId}`,
};
}
```
Also update the SSE event emitter (search for `reminder.fired` in `apps/bot/src/scheduler/fire-reminder.ts`) to include `sent` and `total` on the published event payload — Task 7 already wires this in the new fire-reminder. Confirm the web-side receiver passes those fields through to `reminderFiredToNotification`.
- [ ] **Step 7: Add tests for the extended notification body**
Append to `apps/web/src/lib/notifications.test.ts` (inside the existing `describe("reminderFiredToNotification mapping", () => {...})` block):
```ts
it("paused with sent/total renders 'Tap to resume or cancel'", () => {
const args = reminderFiredToNotification({
type: "reminder.fired",
reminderId: "r-1",
runId: "run-1",
status: "paused",
sent: 412,
total: 1000,
});
expect(args?.title).toBe("Reminder paused");
expect(args?.body).toBe("412 of 1000 groups delivered. Tap to resume or cancel.");
});
it("paused without sent/total falls back to a generic paused body", () => {
const args = reminderFiredToNotification({
type: "reminder.fired",
reminderId: "r-1",
runId: "run-1",
status: "paused",
});
expect(args?.title).toBe("Reminder paused");
expect(args?.body).toMatch(/Delivery window closed/);
});
it("partial with sent/total renders 'X of Y groups delivered' instead of the generic body", () => {
const args = reminderFiredToNotification({
type: "reminder.fired",
reminderId: "r-1",
runId: "run-1",
status: "partial",
sent: 412,
total: 1000,
});
expect(args?.body).toBe("412 of 1000 groups delivered. See activity for details.");
});
it("partial without sent/total falls back to the generic body", () => {
const args = reminderFiredToNotification({
type: "reminder.fired",
reminderId: "r-1",
runId: "run-1",
status: "partial",
});
expect(args?.body).toMatch(/Some groups received/);
});
```
- [ ] **Step 8: Update existing edit-section-forms.test.tsx fixtures so they don't break on the new required props**
Find `apps/web/src/components/reminder-edit/edit-section-forms.test.tsx`. Both `EditAccountForm` and `EditGroupsForm` may need additional defaults. If their props don't reference the new fields, the tests still pass — re-run them and patch only if compilation fails. Run:
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web typecheck
```
If typecheck reports `Property 'initialDeliveryStartHour' is missing` on any test, add `initialDeliveryStartHour: 6, initialDeliveryEndHour: 18` to that fixture.
- [ ] **Step 9: Run web tests + bot tests**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run
```
Expected: web 298+ pass, bot 26+25 = 51 pass. Total ~380 across the project including shared.
- [ ] **Step 10: Restart web container so the server bundle picks up the schema change**
```bash
NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml restart web
```
- [ ] **Step 11: Commit**
```bash
git add apps/web
git commit -m "feat(web): delivery-window inputs in wizard + edit; partial body extension
Wizard 'When' step and the per-section edit-when page get two number
inputs for the delivery window hours (default 6/18). Server actions
accept them on the create/update Zod schemas, validate
0 <= start < end <= 24, and persist to the new reminders columns.
Notification body for paused-status now reads
'412 of 1000 groups delivered. Tap to resume or cancel.'; partial
status uses the same delivered/total wording when sent/total are
present."
```
---
## Task 9: Run-ETA helper + ETA pill in wizard review (TDD)
**Files:**
- Create: `apps/web/src/lib/run-eta.ts`
- Create: `apps/web/src/lib/run-eta.test.ts`
- Create: `apps/web/src/components/reminder-wizard/run-eta-pill.tsx`
- Create: `apps/web/src/components/reminder-wizard/run-eta-pill.test.tsx`
- Modify: `apps/web/src/components/reminder-wizard/review-submit-client.tsx`
- Modify: `apps/web/src/components/reminder-edit/edit-when-form.tsx`
- Modify: `apps/web/src/components/reminder-edit/edit-groups-form.tsx`
- [ ] **Step 1: Write the failing run-eta test**
Create `apps/web/src/lib/run-eta.test.ts`:
```ts
import { describe, it, expect } from "vitest";
import { estimateRunDuration, ASSUMED_RATE_PER_MINUTE } from "./run-eta";
describe("estimateRunDuration", () => {
it("uses target count / rate plus a 15% buffer, ceiling-rounded to whole minutes", () => {
const r = estimateRunDuration({
targetCount: 1000,
ratePerMinute: 40,
fireAt: new Date("2026-05-13T09:00:00.000+08:00"),
});
// 1000 / 40 = 25 min; +15% = 28.75 → ceil = 29
expect(r.durationMinutes).toBe(29);
expect(r.estimatedFinishAt.toISOString()).toBe(
new Date("2026-05-13T09:29:00.000+08:00").toISOString(),
);
});
it("returns a 1-minute floor for very small runs", () => {
const r = estimateRunDuration({
targetCount: 1,
ratePerMinute: 40,
fireAt: new Date("2026-05-13T09:00:00.000+08:00"),
});
expect(r.durationMinutes).toBe(1);
});
it("returns 0 minutes and finishAt = fireAt when targetCount is 0", () => {
const fireAt = new Date("2026-05-13T09:00:00.000+08:00");
const r = estimateRunDuration({ targetCount: 0, ratePerMinute: 40, fireAt });
expect(r.durationMinutes).toBe(0);
expect(r.estimatedFinishAt.toISOString()).toBe(fireAt.toISOString());
});
it("throws when ratePerMinute is 0 or negative", () => {
expect(() =>
estimateRunDuration({
targetCount: 100,
ratePerMinute: 0,
fireAt: new Date(),
}),
).toThrow();
expect(() =>
estimateRunDuration({
targetCount: 100,
ratePerMinute: -1,
fireAt: new Date(),
}),
).toThrow();
});
it("exports the configured default rate constant", () => {
expect(typeof ASSUMED_RATE_PER_MINUTE).toBe("number");
expect(ASSUMED_RATE_PER_MINUTE).toBeGreaterThan(0);
});
});
```
- [ ] **Step 2: Run test to verify it fails**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run run-eta
```
Expected: FAIL — `Cannot find module './run-eta'`.
- [ ] **Step 3: Write the helper to pass**
Create `apps/web/src/lib/run-eta.ts`:
```ts
/**
* Default per-account send rate, mirroring `BOT_MAX_SEND_PER_MINUTE`
* in the bot env. The web bundle hardcodes this — operators who tune
* the bot env are expected to redeploy web with the matching value.
*/
export const ASSUMED_RATE_PER_MINUTE = 40;
const ETA_BUFFER = 1.15;
export function estimateRunDuration(opts: {
targetCount: number;
ratePerMinute?: number;
fireAt: Date;
}): { durationMinutes: number; estimatedFinishAt: Date } {
const rate = opts.ratePerMinute ?? ASSUMED_RATE_PER_MINUTE;
if (rate <= 0) throw new Error("ratePerMinute must be > 0");
if (opts.targetCount <= 0) {
return { durationMinutes: 0, estimatedFinishAt: new Date(opts.fireAt) };
}
const raw = (opts.targetCount / rate) * ETA_BUFFER;
const durationMinutes = Math.max(1, Math.ceil(raw));
const estimatedFinishAt = new Date(
opts.fireAt.getTime() + durationMinutes * 60_000,
);
return { durationMinutes, estimatedFinishAt };
}
```
- [ ] **Step 4: Run test to verify it passes**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run run-eta
```
Expected: PASS (5/5).
- [ ] **Step 5: Write the failing pill component test**
Create `apps/web/src/components/reminder-wizard/run-eta-pill.test.tsx`:
```tsx
import { describe, it, expect } from "vitest";
import { renderToStaticMarkup } from "react-dom/server";
import { RunEtaPill } from "./run-eta-pill";
describe("RunEtaPill", () => {
it("renders green 'Fits in window' when estimatedFinishAt <= windowEndAt", () => {
const html = renderToStaticMarkup(
<RunEtaPill
targetCount={500}
fireAt={new Date("2026-05-13T09:00:00.000+08:00")}
windowEndAt={new Date("2026-05-13T18:00:00.000+08:00")}
timezone="Asia/Kuala_Lumpur"
/>,
);
expect(html).toMatch(/Fits in window/);
expect(html).toMatch(/min/);
expect(html).not.toMatch(/Likely to pause/);
});
it("renders amber 'Likely to pause' when ETA exceeds window", () => {
const html = renderToStaticMarkup(
<RunEtaPill
targetCount={5000}
fireAt={new Date("2026-05-13T17:00:00.000+08:00")}
windowEndAt={new Date("2026-05-13T18:00:00.000+08:00")}
timezone="Asia/Kuala_Lumpur"
/>,
);
expect(html).toMatch(/Likely to pause/);
expect(html).toMatch(/Widen the window/);
});
it("renders nothing for zero targets", () => {
const html = renderToStaticMarkup(
<RunEtaPill
targetCount={0}
fireAt={new Date("2026-05-13T09:00:00.000+08:00")}
windowEndAt={new Date("2026-05-13T18:00:00.000+08:00")}
timezone="Asia/Kuala_Lumpur"
/>,
);
expect(html).toBe("");
});
});
```
- [ ] **Step 6: Implement the pill**
Create `apps/web/src/components/reminder-wizard/run-eta-pill.tsx`:
```tsx
import { ClockIcon, AlertTriangleIcon } from "lucide-react";
import { estimateRunDuration } from "@/lib/run-eta";
interface RunEtaPillProps {
targetCount: number;
fireAt: Date;
windowEndAt: Date;
timezone: string;
}
/**
* Visible at the wizard's review step and on the per-section edit
* pages that affect ETA (groups, when). Advisory only — does NOT
* block submission. The operator can still schedule a run that
* pauses; the pause-and-resume flow covers that case.
*/
export function RunEtaPill({
targetCount,
fireAt,
windowEndAt,
timezone,
}: RunEtaPillProps) {
if (targetCount <= 0) return null;
const { durationMinutes, estimatedFinishAt } = estimateRunDuration({
targetCount,
fireAt,
});
const fits = estimatedFinishAt.getTime() <= windowEndAt.getTime();
const finishLocal = new Intl.DateTimeFormat("en-GB", {
hour: "2-digit",
minute: "2-digit",
timeZone: timezone,
}).format(estimatedFinishAt);
if (fits) {
return (
<div className="flex items-center gap-2 rounded-lg bg-emerald-500/10 px-3 py-2 text-xs text-emerald-700 dark:text-emerald-400">
<ClockIcon className="size-3.5" />
<span>
~{durationMinutes} min · finishes ~{finishLocal} · Fits in window
</span>
</div>
);
}
return (
<div className="flex items-start gap-2 rounded-lg bg-amber-500/10 px-3 py-2 text-xs text-amber-700 dark:text-amber-400">
<AlertTriangleIcon className="size-3.5 mt-0.5 shrink-0" />
<div className="space-y-0.5">
<div>
~{durationMinutes} min · finishes ~{finishLocal} · Likely to pause
</div>
<div className="text-[11px] opacity-80">
Widen the window or split into smaller runs.
</div>
</div>
</div>
);
}
```
- [ ] **Step 7: Run pill tests**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run run-eta-pill
```
Expected: 3/3 PASS.
- [ ] **Step 8: Wire the pill into review-submit-client**
In `apps/web/src/components/reminder-wizard/review-submit-client.tsx`, add the pill above the Schedule button. The component already has access to `groupIds`, `scheduledAt`, `timezone`, and now `deliveryEndHour` (from Task 8). Compute the windowEndAt inline:
```tsx
import { RunEtaPill } from "./run-eta-pill";
import { windowEndAt as computeWindowEndAt } from "@cmbot/shared"; // see note below
// inside the component, just before the action button:
{groupIds && scheduledAt && (() => {
const ids = groupIds.split(",").filter(Boolean);
const fireAt = new Date(scheduledAt);
const wEnd = computeWindowEndAt(timezone, deliveryEndHour ?? 18, fireAt);
return (
<RunEtaPill
targetCount={ids.length}
fireAt={fireAt}
windowEndAt={wEnd}
timezone={timezone}
/>
);
})()}
```
`computeWindowEndAt` is the helper Task 5 created in `packages/shared/src/delivery-window.ts` — both bundles import it from `@cmbot/shared`.
- [ ] **Step 9: Wire the pill into edit-groups-form and edit-when-form**
Both forms know the reminder's `targetCount`, `fireAt`, `timezone`, and `deliveryWindowEndHour`. Add the pill above their Save button:
```tsx
<RunEtaPill
targetCount={selectedGroupIds.length}
fireAt={new Date(scheduledAtIso)}
windowEndAt={computeWindowEndAt(
timezone,
deliveryEndHour,
new Date(scheduledAtIso),
)}
timezone={timezone}
/>
```
- [ ] **Step 10: Run all web tests**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/shared test -- --run
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run
```
Expected: all green. Web ≈304 (added 8 ETA tests + 3 pill tests, partially offset by no test removals). Shared adds the windowEndAt tests that moved from bot.
- [ ] **Step 11: Commit**
```bash
git add apps/web packages/shared apps/bot
git commit -m "feat(web): ETA preview pill in wizard + edit-groups + edit-when
estimateRunDuration() computes a per-run ETA from BOT_MAX_SEND_PER_MINUTE
(hardcoded as ASSUMED_RATE_PER_MINUTE in the web bundle) plus a 15%
buffer. The RunEtaPill component shows a green 'Fits in window' or
amber 'Likely to pause' badge with a one-line suggestion. windowEndAt
moves from apps/bot/src/scheduler/delivery-window.ts to
packages/shared/src/delivery-window.ts so both bundles can import it."
```
---
## Task 10: Resume + cancel actions and PausedRunBanner
**Files:**
- Modify: `apps/web/src/actions/reminders.ts` (add `resumeReminderRunAction`, `cancelReminderRunAction`)
- Create: `apps/web/src/components/reminder-detail/paused-run-banner.tsx`
- Create: `apps/web/src/components/reminder-detail/paused-run-banner.test.tsx`
- Modify: `apps/web/src/app/reminders/[id]/page.tsx` (mount the banner)
- Modify: `apps/web/src/app/activity/page.tsx` (add Paused filter + Resume button per row)
- [ ] **Step 1: Write the failing banner test**
Create `apps/web/src/components/reminder-detail/paused-run-banner.test.tsx`:
```tsx
import { describe, it, expect, vi, beforeEach } from "vitest";
import { renderToStaticMarkup } from "react-dom/server";
import { PausedRunBanner } from "./paused-run-banner";
const resumeMock = vi.fn();
const cancelMock = vi.fn();
vi.mock("@/actions/reminders", () => ({
resumeReminderRunAction: (...args: unknown[]) => resumeMock(...args),
cancelReminderRunAction: (...args: unknown[]) => cancelMock(...args),
}));
describe("PausedRunBanner", () => {
beforeEach(() => {
resumeMock.mockReset();
cancelMock.mockReset();
});
it("renders 'Resume' and 'Cancel run' buttons when latest run is paused", () => {
const html = renderToStaticMarkup(
<PausedRunBanner
runId="run-1"
sent={412}
total={1000}
windowEndHour={18}
timezone="Asia/Kuala_Lumpur"
/>,
);
expect(html).toMatch(/Reminder paused/);
expect(html).toMatch(/412 of 1000/);
expect(html).toMatch(/Resume/);
expect(html).toMatch(/Cancel run/);
});
});
```
- [ ] **Step 2: Run to verify it fails**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run paused-run-banner
```
Expected: FAIL — module not found.
- [ ] **Step 3: Add resume + cancel actions to `apps/web/src/actions/reminders.ts`**
Append (near the other run-related actions):
```ts
const runIdSchema = z.object({ runId: z.string().uuid() });
export async function resumeReminderRunAction(input: { runId: string }) {
const op = await getSeededOperator();
const parsed = runIdSchema.safeParse(input);
if (!parsed.success) {
return { ok: false as const, error: "Invalid runId" };
}
const run = await db.query.reminderRuns.findFirst({
where: (r, { eq }) => eq(r.id, parsed.data.runId),
with: { reminder: { columns: { operatorId: true, id: true } } },
});
if (!run || run.reminder.operatorId !== op.id) {
return { ok: false as const, error: "Run not found" };
}
if (run.status !== "paused") {
return { ok: false as const, error: `Cannot resume a ${run.status} run` };
}
await getBoss().send("reminder.fire", {
reminderId: run.reminder.id,
runId: run.id,
});
await writeAudit(op.id, "reminder.run.resumed", { runId: run.id });
revalidatePath(`/reminders/${run.reminder.id}`);
revalidatePath(`/activity`);
return { ok: true as const };
}
export async function cancelReminderRunAction(input: { runId: string }) {
const op = await getSeededOperator();
const parsed = runIdSchema.safeParse(input);
if (!parsed.success) {
return { ok: false as const, error: "Invalid runId" };
}
const run = await db.query.reminderRuns.findFirst({
where: (r, { eq }) => eq(r.id, parsed.data.runId),
with: { reminder: { columns: { operatorId: true, id: true } } },
});
if (!run || run.reminder.operatorId !== op.id) {
return { ok: false as const, error: "Run not found" };
}
if (run.status !== "paused") {
return { ok: false as const, error: `Cannot cancel a ${run.status} run` };
}
await db.transaction(async (tx) => {
await tx
.update(reminderRunTargets)
.set({ status: "skipped", error: "canceled by operator" })
.where(
and(
eq(reminderRunTargets.runId, run.id),
eq(reminderRunTargets.status, "pending"),
),
);
await tx
.update(reminderRuns)
.set({
status: "partial",
endedAt: new Date(),
errorSummary: "Canceled by operator before all groups received the message.",
})
.where(eq(reminderRuns.id, run.id));
});
await writeAudit(op.id, "reminder.run.canceled", { runId: run.id });
revalidatePath(`/reminders/${run.reminder.id}`);
revalidatePath(`/activity`);
return { ok: true as const };
}
```
(Imports needed at top of file: `revalidatePath` from `next/cache`, `getBoss` from `@/lib/boss`, `writeAudit` from `@/lib/audit`, `reminderRuns` and `reminderRunTargets` from `@cmbot/db`, `and`/`eq` from `drizzle-orm`.)
- [ ] **Step 4: Implement PausedRunBanner**
Create `apps/web/src/components/reminder-detail/paused-run-banner.tsx`:
```tsx
"use client";
import { useState, useTransition } from "react";
import { AlertCircleIcon, PlayIcon, XIcon, Loader2Icon } from "lucide-react";
import { Button } from "@/components/ui/button";
import {
resumeReminderRunAction,
cancelReminderRunAction,
} from "@/actions/reminders";
interface PausedRunBannerProps {
runId: string;
sent: number;
total: number;
windowEndHour: number;
timezone: string;
}
export function PausedRunBanner({
runId,
sent,
total,
windowEndHour,
timezone,
}: PausedRunBannerProps) {
const [pending, startTransition] = useTransition();
const [error, setError] = useState<string | null>(null);
const onResume = () =>
startTransition(async () => {
setError(null);
const r = await resumeReminderRunAction({ runId });
if (!r.ok) setError(r.error);
});
const onCancel = () =>
startTransition(async () => {
setError(null);
const r = await cancelReminderRunAction({ runId });
if (!r.ok) setError(r.error);
});
return (
<div className="rounded-lg border border-amber-500/40 bg-amber-500/5 p-4 space-y-3">
<div className="flex items-start gap-2">
<AlertCircleIcon className="size-4 text-amber-600 dark:text-amber-400 mt-0.5 shrink-0" />
<div className="space-y-1">
<p className="text-sm font-medium">Reminder paused</p>
<p className="text-xs text-muted-foreground">
{sent} of {total} groups delivered. The delivery window
closed at {windowEndHour}:00 ({timezone}). Resume to send
the remaining {total - sent} groups, or cancel the run.
</p>
</div>
</div>
{error && (
<div className="text-xs text-destructive">{error}</div>
)}
<div className="flex gap-2">
<Button size="sm" onClick={onResume} disabled={pending} className="gap-2">
{pending ? <Loader2Icon className="size-3.5 animate-spin" /> : <PlayIcon className="size-3.5" />}
Resume
</Button>
<Button size="sm" variant="outline" onClick={onCancel} disabled={pending} className="gap-2">
<XIcon className="size-3.5" />
Cancel run
</Button>
</div>
</div>
);
}
```
- [ ] **Step 5: Run banner test**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run paused-run-banner
```
Expected: PASS.
- [ ] **Step 6: Mount the banner on the reminder detail page**
In `apps/web/src/app/reminders/[id]/page.tsx`, after loading `reminder` and `runs`, find the latest paused run and conditionally render the banner above the section list:
```tsx
const latestPausedRun = runs.find((r) => r.status === "paused");
// in JSX, near the top of the page content:
{latestPausedRun && (
<PausedRunBanner
runId={latestPausedRun.id}
sent={latestPausedRun.sentCount ?? 0}
total={latestPausedRun.totalCount ?? 0}
windowEndHour={reminder.deliveryWindowEndHour}
timezone={reminder.timezone}
/>
)}
```
(`sentCount` and `totalCount` need to be derived in the query — adjust `getReminderWithRuns` to count run-target rows by status. If not present, add a count step there.)
- [ ] **Step 7: Add Paused filter + Resume button on Activity page**
In `apps/web/src/app/activity/page.tsx`, extend the status filter pills to include `"paused"` (amber). For each row whose status is `paused`, render a small Resume button inline (use the same `resumeReminderRunAction`).
Re-use a small client component `ResumeRunButton` that wraps the action call (mirrors the existing inline buttons elsewhere). Place it in `apps/web/src/components/activity/resume-run-button.tsx`.
- [ ] **Step 8: Run all web tests**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run
```
Expected: all green.
- [ ] **Step 9: Restart web container so SSR picks up the new client components**
```bash
NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml restart web
```
- [ ] **Step 10: Commit**
```bash
git add apps/web
git commit -m "feat(web): paused-run banner with Resume / Cancel buttons
resumeReminderRunAction re-enqueues the existing run via pg-boss with
runId in the payload (Task 7's fire-reminder accepts that). Cancel
action flips remaining pending targets to skipped and resolves the
run to partial. Activity tab gets a Paused filter and inline Resume
button on each paused row."
```
---
## Acceptance check (manual)
After all 10 tasks land:
- [ ] **Smoke 1.** Create a reminder for 1 group with default delivery hours and a 30-second future fire time. Verify it lands; verify the run's `error_summary` is null and status `success`.
- [ ] **Smoke 2.** Create a reminder with end hour set to a value that's already past in the operator's timezone. Schedule it to fire NOW. Verify the run resolves `failed` and the `error_summary` reads "Delivery window closed at H:00 (TZ) before any group could be sent."
- [ ] **Smoke 3.** Create two reminders on TWO different paired accounts with the same `scheduledAt`. Watch bot logs:
```bash
NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml logs --tail=60 bot 2>&1 | grep -E "reminder|fire-reminder"
```
Expected: both reminders' `fire-reminder: done` entries land within seconds of each other (parallel), not sequentially separated by a full fan-out.
- [ ] **Smoke 4.** Create a reminder with a 5-MB JPEG and 3 groups. Fire it. Bot logs should show only ONE `prepareWAMessageMedia` / upload entry (look for the upload size in the Baileys debug log) followed by 3 relayMessage events.
- [ ] **Smoke 5.** Wizard ETA pill: pick 5 groups + default 6/18 hours. Pill should be green ("Fits in window"). Pick 5000 groups (clone an existing one): pill should flip amber ("Likely to pause") with the "Widen the window" hint.
- [ ] **Smoke 6.** Trigger a paused run on purpose: set `BOT_MAX_SEND_PER_MINUTE=2` in `.env.development`, restart bot, fire a 10-group reminder with end hour set ~3 minutes from now. Verify:
- The run resolves `paused` (~6 groups sent).
- A "Reminder paused" notification appears (with sent/total in body).
- The detail page shows the banner with **Resume** and **Cancel run**.
- Click **Resume** → run continues from the unsent targets, eventually resolving to `success` (or `paused` again if the window closes again).
- Reset `BOT_MAX_SEND_PER_MINUTE` after the test.
- [ ] **Smoke 7.** Cancel-run: same setup as Smoke 6, but click **Cancel run** instead of Resume. Verify remaining pending targets become `skipped`, run resolves `partial` with errorSummary "Canceled by operator before all groups received the message.", banner disappears.
- [ ] **Final test sweep:**
```bash
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/shared test -- --run
```
Expected: all green. ~395 total (web ≈315 with the new ETA + banner + notification tests; shared adds windowEndAt suite that moved from bot).