cm_whatsapp_bot_v1/docs/superpowers/plans/2026-05-10-windowed-fanout.md
yiekheng 082a70db06 docs: consolidate windowed-fanout spec/plan with ETA + paused/resume
Folds in three rounds of requirement evolution:

* Pause/resume on window close (was stop-and-report-partial).
* ETA preview pill at compose / edit time so the operator sees
  whether their chosen window will fit before scheduling.
* Interactive paused-run banner with Resume / Cancel buttons on the
  detail page; pause notification deep-links to it.

Helper relocations:

* windowEndAt() moves to packages/shared so both bot fire-reminder
  and the web ETA pill can import the same calculator.

Plan grows from 8 to 10 tasks: adds Task 9 (run-eta + RunEtaPill,
TDD) and Task 10 (resume/cancel actions + PausedRunBanner).
Acceptance gains two paused-flow smoke tests.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 14:33:51 +08:00

89 KiB
Raw Blame History

Windowed, Pacing-Safe Reminder Fan-Out — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Deliver reminders to many groups (1000+) safely within a per-reminder delivery window (default 6am6pm). On window-close mid-run, pause with a clear "account at capacity / consider offload or smaller media" message and let the operator resume later from the UI.

Architecture: Replace the current serial fan-out with a per-account isolation model. pg-boss teamSize raises so different-account reminders run in parallel; a per-key (accountId) async mutex serialises same-account fan-outs; a per-account token-bucket rate limiter paces sends; a per-run media-upload cache reuses each unique file across every group via Baileys' prepareWAMessageMedia + relayMessage; a window-end gate flips the run to paused (un-sent targets stay pending); a resumeReminderRunAction re-enters fire-reminder with the existing runId so the loop picks up only the still-pending targets.

Tech Stack: Node.js + TypeScript, Baileys 7.0.0-rc10 (prepareWAMessageMedia, relayMessage, generateMessageID), pg-boss v12 (boss.work with teamSize), Drizzle ORM + Postgres, vitest.


File Structure

File Role
packages/db/migrations/0008_*.sql (generated) add delivery_window_start_hour, delivery_window_end_hour to reminders
packages/db/src/schema.ts drizzle alignment for the two new columns
apps/bot/src/env.ts three new env vars: BOT_FIRE_CONCURRENCY, BOT_GROUP_CONCURRENCY, BOT_MAX_SEND_PER_MINUTE
apps/bot/src/scheduler/per-key-mutex.ts (new) accountId-keyed async mutex
apps/bot/src/scheduler/per-key-mutex.test.ts (new) unit tests
apps/bot/src/scheduler/rate-limiter.ts (new) per-account token bucket
apps/bot/src/scheduler/rate-limiter.test.ts (new) fake-clock unit tests
packages/shared/src/delivery-window.ts (new) pure window-end calculator (shared bot+web)
packages/shared/src/delivery-window.test.ts (new) unit tests
apps/bot/src/scheduler/media-upload-cache.ts (new) prepareWAMessageMedia results, keyed by mediaId
apps/bot/src/scheduler/media-upload-cache.test.ts (new) mock-socket unit tests
apps/bot/src/scheduler/fire-reminder.ts (rewrite) new loop using all of the above; accepts optional runId for resume
apps/bot/src/scheduler/reminder-jobs.ts pass teamSize config
apps/web/src/actions/reminders.ts accept the two new fields; add resumeReminderRunAction, cancelReminderRunAction
apps/web/src/components/reminder-wizard/when-form-client.tsx "Delivery hours" inputs
apps/web/src/components/reminder-edit/edit-when-form.tsx same
apps/web/src/lib/run-eta.ts (new) pure ETA calculator
apps/web/src/components/reminder-wizard/run-eta-pill.tsx (new) green/amber ETA pill
apps/web/src/components/reminder-detail/paused-run-banner.tsx (new) Resume / Cancel run banner on detail page
apps/web/src/lib/notifications.ts paused + partial notification body extension

Task 1: Schema migration — delivery window columns

Files:

  • Modify: packages/db/src/schema.ts (lines around the reminders table — add 2 columns)

  • Generate: packages/db/migrations/0008_<name>.sql

  • Step 1: Edit packages/db/src/schema.ts — add the two columns to reminders

Find the reminders table block and append the two new columns just before the closing });. The existing block ends with lastFiredAt: timestamp(...). Add:

export const reminders = pgTable("reminders", {
  id: uuid("id").primaryKey().defaultRandom(),
  accountId: uuid("account_id").notNull().references(() => whatsappAccounts.id, { onDelete: "cascade" }),
  name: text("name").notNull(),
  scheduleKind: text("schedule_kind").notNull(),
  scheduledAt: timestamp("scheduled_at", { withTimezone: true }),
  rrule: text("rrule"),
  timezone: text("timezone").notNull(),
  endsAt: timestamp("ends_at", { withTimezone: true }),
  maxRuns: integer("max_runs"),
  status: text("status").notNull().default("active"),
  createdBy: uuid("created_by").notNull().references(() => operators.id),
  createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(),
  updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(),
  lastFiredAt: timestamp("last_fired_at", { withTimezone: true }),
  // Delivery window — operator-supplied hours (in the row's timezone).
  // Only the END hour is enforced at runtime in v1: the run loop stops
  // sending once `now()` crosses today's end hour. The START hour is
  // documented for the UI; not gated. See spec
  // docs/superpowers/specs/2026-05-10-windowed-fanout-design.md.
  deliveryWindowStartHour: integer("delivery_window_start_hour").notNull().default(6),
  deliveryWindowEndHour: integer("delivery_window_end_hour").notNull().default(18),
});
  • Step 2: Generate the migration

Run:

NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/db generate

Expected: a file appears at packages/db/migrations/0008_<name>.sql containing ALTER TABLE "reminders" ADD COLUMN "delivery_window_start_hour" integer DEFAULT 6 NOT NULL; and a similar line for delivery_window_end_hour.

  • Step 3: Apply the migration

Run:

NO_SUDO=1 ./scripts/db.sh migrate

Expected: Migrations applied. Verify with:

NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/db build

Expected: clean tsc output (the @cmbot/db package re-builds so downstream consumers pick up the new columns).

  • Step 4: Typecheck the bot + web to confirm no schema-consumer regressions
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot typecheck
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web typecheck

Expected: both pass with no errors.

  • Step 5: Commit
git add packages/db
git commit -m "feat(db): add delivery_window_start_hour / end_hour to reminders

Both default 6 / 18, stored as int (hour-of-day in the row's
timezone). Only the end hour is gated at runtime — see spec
docs/superpowers/specs/2026-05-10-windowed-fanout-design.md."

Task 2: Bot env vars — concurrency + rate caps

Files:

  • Modify: apps/bot/src/env.ts

  • Step 1: Replace apps/bot/src/env.ts contents

Whole file:

import { z } from "zod";

const numberFromString = z.string().regex(/^\d+$/).transform((s) => Number(s));

const envSchema = z.object({
  DATABASE_URL: z.string().url(),
  DATA_DIR: z.string().min(1),
  SESSIONS_DIR: z.string().min(1),
  MEDIA_DIR: z.string().min(1),
  BOT_HEALTH_PORT: numberFromString,
  BOT_LOG_LEVEL: z.enum(["fatal", "error", "warn", "info", "debug", "trace"]).default("info"),

  // Reminder fan-out tuning. Defaults aim for an established WhatsApp
  // account (~30-60 msg/min safe band).
  // - BOT_FIRE_CONCURRENCY: pg-boss worker pool size for the
  //   `reminder.fire` queue. Sets the max number of accounts that can
  //   run their fan-outs simultaneously. 8 is a sane default; raise if
  //   you have more paired accounts firing concurrently.
  // - BOT_GROUP_CONCURRENCY: per-account parallel group sends. Each
  //   group's parts stay serial (preserves visible order in the chat).
  //   3 has been empirically stable on real traffic; anything above 5
  //   without observation is asking for trouble.
  // - BOT_MAX_SEND_PER_MINUTE: per-account token-bucket rate. 40 is the
  //   safe default; loosen to 60 only after a few weeks of running
  //   without flags. Tighten to 20 if WA returns rate-limit errors.
  BOT_FIRE_CONCURRENCY: numberFromString.default("8"),
  BOT_GROUP_CONCURRENCY: numberFromString.default("3"),
  BOT_MAX_SEND_PER_MINUTE: numberFromString.default("40"),
});

export type Env = z.infer<typeof envSchema>;

export function parseEnv(input: Record<string, string | undefined>): Env {
  return envSchema.parse(input);
}

export const env = parseEnv(process.env);
  • Step 2: Typecheck
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot typecheck

Expected: passes.

  • Step 3: Run the bot tests to confirm parseEnv defaults still validate
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run env

Expected: all env.test.ts tests pass (existing tests don't supply the new vars; defaults must kick in).

  • Step 4: Commit
git add apps/bot/src/env.ts
git commit -m "feat(bot): add fan-out tuning env vars

BOT_FIRE_CONCURRENCY (8) — pg-boss worker pool, gates max accounts
running fan-outs simultaneously.
BOT_GROUP_CONCURRENCY (3) — per-account parallel group sends.
BOT_MAX_SEND_PER_MINUTE (40) — per-account rate-limit budget.

Defaults are tuned for an established WhatsApp account. See spec for
the safe band rationale."

Task 3: PerKeyMutex (TDD)

Files:

  • Create: apps/bot/src/scheduler/per-key-mutex.test.ts

  • Create: apps/bot/src/scheduler/per-key-mutex.ts

  • Step 1: Write the failing test file at apps/bot/src/scheduler/per-key-mutex.test.ts

import { describe, it, expect } from "vitest";
import { PerKeyMutex } from "./per-key-mutex.js";

/** Tiny clock-free helper: returns a Promise that resolves after
 *  `n` microtasks. Lets us check ordering without real timers. */
function tickN(n: number): Promise<void> {
  let p: Promise<void> = Promise.resolve();
  for (let i = 0; i < n; i++) p = p.then();
  return p;
}

describe("PerKeyMutex", () => {
  it("allows a single call against one key to run immediately", async () => {
    const m = new PerKeyMutex();
    const result = await m.run("k1", async () => 42);
    expect(result).toBe(42);
  });

  it("serialises two calls against the same key", async () => {
    const m = new PerKeyMutex();
    const order: string[] = [];

    const a = m.run("k1", async () => {
      order.push("a-start");
      await tickN(5);
      order.push("a-end");
    });
    const b = m.run("k1", async () => {
      order.push("b-start");
      order.push("b-end");
    });

    await Promise.all([a, b]);
    // b cannot start until a has finished.
    expect(order).toEqual(["a-start", "a-end", "b-start", "b-end"]);
  });

  it("runs different keys in parallel", async () => {
    const m = new PerKeyMutex();
    const order: string[] = [];

    const a = m.run("k1", async () => {
      order.push("a-start");
      await tickN(5);
      order.push("a-end");
    });
    const b = m.run("k2", async () => {
      order.push("b-start");
      order.push("b-end");
    });

    await Promise.all([a, b]);
    // b doesn't wait for a — interleaving expected.
    expect(order[0]).toBe("a-start");
    expect(order).toContain("b-start");
    expect(order).toContain("b-end");
    // b's pair lands before a's end.
    expect(order.indexOf("b-end")).toBeLessThan(order.indexOf("a-end"));
  });

  it("releases the lock when the handler throws", async () => {
    const m = new PerKeyMutex();
    await expect(
      m.run("k1", async () => {
        throw new Error("boom");
      }),
    ).rejects.toThrow("boom");
    // Next call on the same key must NOT hang.
    const result = await m.run("k1", async () => "after");
    expect(result).toBe("after");
  });

  it("forwards the resolved value of the handler", async () => {
    const m = new PerKeyMutex();
    const out = await m.run("k1", async () => ({ ok: true, n: 7 }));
    expect(out).toEqual({ ok: true, n: 7 });
  });

  it("cleans up internal state for keys with no waiters", async () => {
    const m = new PerKeyMutex();
    await m.run("k1", async () => {});
    expect(m.activeKeyCount()).toBe(0);
  });

  it("retains a key while a chain is in flight, then drops it", async () => {
    const m = new PerKeyMutex();
    let release!: () => void;
    const gate = new Promise<void>((r) => (release = r));

    const inFlight = m.run("k1", () => gate);
    expect(m.activeKeyCount()).toBe(1);
    release();
    await inFlight;
    expect(m.activeKeyCount()).toBe(0);
  });
});
  • Step 2: Run the failing test to confirm it fails for the right reason
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run per-key-mutex

Expected: FAIL with "Cannot find module './per-key-mutex.js'".

  • Step 3: Implement apps/bot/src/scheduler/per-key-mutex.ts
/**
 * Async mutex keyed by a string. Different keys run in parallel;
 * same-key calls serialise.
 *
 * Used by fire-reminder so two reminders on the SAME WhatsApp account
 * take turns (running them concurrently would double the effective
 * send rate and risk a ban), while reminders on DIFFERENT accounts
 * proceed in parallel.
 *
 * The implementation is a chain-per-key Promise: each call appends
 * its work to the key's tail. Empty chains are cleaned up so the
 * Map doesn't grow unbounded across the bot's lifetime.
 */
export class PerKeyMutex {
  private chains = new Map<string, Promise<void>>();

  /** Run `fn` exclusively against `key`. Returns whatever `fn` returns. */
  async run<T>(key: string, fn: () => Promise<T>): Promise<T> {
    const prev = this.chains.get(key) ?? Promise.resolve();

    let release!: () => void;
    const completion = new Promise<void>((r) => (release = r));
    // The chain we publish is "what was already there + this completion".
    // The next caller awaits THIS, so order is preserved.
    const chained = prev.then(() => completion);
    this.chains.set(key, chained);

    try {
      await prev;
      return await fn();
    } finally {
      release();
      // Drop the entry only if no later caller has appended in the
      // meantime — otherwise we'd evict the in-flight chain.
      if (this.chains.get(key) === chained) {
        this.chains.delete(key);
      }
    }
  }

  /** Diagnostic: how many keys currently have an in-flight or queued chain. */
  activeKeyCount(): number {
    return this.chains.size;
  }
}

/**
 * Singleton mutex used by fire-reminder, keyed by accountId. Lives at
 * module scope so multiple pg-boss workers in the same process share
 * state.
 */
export const accountMutex = new PerKeyMutex();
  • Step 4: Run the test to confirm it passes
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run per-key-mutex

Expected: PASS, 7 tests.

  • Step 5: Commit
git add apps/bot/src/scheduler/per-key-mutex.ts apps/bot/src/scheduler/per-key-mutex.test.ts
git commit -m "feat(bot): PerKeyMutex for accountId-keyed serialisation

Same key serialises, different keys run in parallel. Used by
fire-reminder to prevent two same-account fan-outs from doubling
the effective send rate (which would risk a WhatsApp ban). Chains
auto-clean empty entries so the Map doesn't leak."

Task 4: TokenBucket rate limiter (TDD)

Files:

  • Create: apps/bot/src/scheduler/rate-limiter.test.ts

  • Create: apps/bot/src/scheduler/rate-limiter.ts

  • Step 1: Write the failing test file at apps/bot/src/scheduler/rate-limiter.test.ts

import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import { TokenBucket, accountRateLimiter } from "./rate-limiter.js";

describe("TokenBucket", () => {
  beforeEach(() => {
    vi.useFakeTimers();
    vi.setSystemTime(new Date("2026-05-10T10:00:00Z"));
  });
  afterEach(() => {
    vi.useRealTimers();
  });

  it("starts full: first N=capacity acquires resolve immediately", async () => {
    const b = new TokenBucket({ ratePerMinute: 60, capacity: 5 });
    for (let i = 0; i < 5; i++) {
      await b.acquire(); // should not stall
    }
    // The 6th is a different test — see below.
  });

  it("blocks the (capacity+1)th acquire until a token regenerates", async () => {
    // 60/min = 1 token per second. Capacity 2.
    const b = new TokenBucket({ ratePerMinute: 60, capacity: 2 });
    await b.acquire();
    await b.acquire();

    let resolved = false;
    const pending = b.acquire().then(() => {
      resolved = true;
    });

    // Immediately after draining, no token available.
    await Promise.resolve();
    expect(resolved).toBe(false);

    // Advance the clock by exactly 1 second — one token should be back.
    await vi.advanceTimersByTimeAsync(1000);
    await pending;
    expect(resolved).toBe(true);
  });

  it("FIFO: pending acquires resolve in the order they arrived", async () => {
    const b = new TokenBucket({ ratePerMinute: 60, capacity: 1 });
    await b.acquire(); // bucket empty

    const order: number[] = [];
    const a = b.acquire().then(() => order.push(1));
    const c = b.acquire().then(() => order.push(2));

    // Two seconds → two tokens regenerate, in order.
    await vi.advanceTimersByTimeAsync(2000);
    await Promise.all([a, c]);
    expect(order).toEqual([1, 2]);
  });

  it("does not over-fill past capacity even if the clock leaps forward", async () => {
    const b = new TokenBucket({ ratePerMinute: 60, capacity: 3 });
    // Drain.
    await b.acquire();
    await b.acquire();
    await b.acquire();
    // Leap an hour. Naive impl would credit 3600 tokens; we should cap at 3.
    await vi.advanceTimersByTimeAsync(3_600_000);
    // 3 immediate acquires should resolve.
    await b.acquire();
    await b.acquire();
    await b.acquire();
    // The 4th waits for fresh regeneration.
    let resolved = false;
    b.acquire().then(() => (resolved = true));
    await Promise.resolve();
    expect(resolved).toBe(false);
  });

  it("ratePerMinute=0 is rejected at construction (caller bug, fail loud)", () => {
    expect(() => new TokenBucket({ ratePerMinute: 0, capacity: 1 })).toThrow();
  });
});

describe("accountRateLimiter (singleton)", () => {
  beforeEach(() => {
    vi.useFakeTimers();
    vi.setSystemTime(new Date("2026-05-10T10:00:00Z"));
  });
  afterEach(() => {
    vi.useRealTimers();
  });

  it("returns the SAME bucket for repeated lookups of one accountId", () => {
    const a1 = accountRateLimiter.get("acct-1");
    const a2 = accountRateLimiter.get("acct-1");
    expect(a1).toBe(a2);
  });

  it("returns DIFFERENT buckets for different accountIds (isolation)", () => {
    const a = accountRateLimiter.get("acct-A");
    const b = accountRateLimiter.get("acct-B");
    expect(a).not.toBe(b);
  });

  it("a drained account A bucket does not block account B", async () => {
    const a = accountRateLimiter.get("acct-A");
    const b = accountRateLimiter.get("acct-B");
    // Drain A entirely (capacity defaults to ratePerMinute, so we
    // need to pull at least that many).
    for (let i = 0; i < 40; i++) await a.acquire();

    // B should still grant immediately.
    let bResolved = false;
    b.acquire().then(() => (bResolved = true));
    await Promise.resolve();
    expect(bResolved).toBe(true);
  });
});
  • Step 2: Run the failing test
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run rate-limiter

Expected: FAIL with "Cannot find module './rate-limiter.js'".

  • Step 3: Implement apps/bot/src/scheduler/rate-limiter.ts
import { env } from "../env.js";

/**
 * Token bucket for per-account send pacing.
 *
 * Tokens regenerate at `ratePerMinute / 60` per second. Capacity caps
 * how many can accumulate during idle periods (so the operator can't
 * burst 1000 messages just because the account was quiet for a day).
 *
 * `acquire()` resolves when a token is available, FIFO across waiters.
 * Used by fire-reminder to gate every `socket.sendMessage` call.
 */
export interface TokenBucketOptions {
  ratePerMinute: number;
  /** Defaults to ratePerMinute (one minute's worth). */
  capacity?: number;
}

export class TokenBucket {
  private readonly ratePerMs: number;
  private readonly capacity: number;
  private tokens: number;
  private lastRefillMs: number;
  private waiters: Array<() => void> = [];

  constructor(opts: TokenBucketOptions) {
    if (opts.ratePerMinute <= 0) {
      throw new Error(`TokenBucket: ratePerMinute must be > 0, got ${opts.ratePerMinute}`);
    }
    this.ratePerMs = opts.ratePerMinute / 60_000;
    this.capacity = opts.capacity ?? opts.ratePerMinute;
    this.tokens = this.capacity;
    this.lastRefillMs = Date.now();
  }

  /** Resolve when a token is available. FIFO across concurrent waiters. */
  async acquire(): Promise<void> {
    this.refill();
    if (this.tokens >= 1 && this.waiters.length === 0) {
      this.tokens -= 1;
      return;
    }
    return new Promise<void>((resolve) => {
      this.waiters.push(resolve);
      this.scheduleNext();
    });
  }

  private refill(): void {
    const now = Date.now();
    const elapsed = now - this.lastRefillMs;
    if (elapsed <= 0) return;
    const gained = elapsed * this.ratePerMs;
    this.tokens = Math.min(this.capacity, this.tokens + gained);
    this.lastRefillMs = now;
  }

  private scheduleNext(): void {
    // Wait until at least one token is available, then drain waiters
    // FIFO. We compute the gap from current fractional token deficit.
    this.refill();
    while (this.tokens >= 1 && this.waiters.length > 0) {
      this.tokens -= 1;
      const w = this.waiters.shift()!;
      w();
    }
    if (this.waiters.length === 0) return;

    const tokensShort = 1 - this.tokens;
    const waitMs = Math.max(1, Math.ceil(tokensShort / this.ratePerMs));
    setTimeout(() => this.scheduleNext(), waitMs);
  }
}

/**
 * Per-accountId TokenBucket registry. Each account gets its own
 * pacing budget, so a slow account A never throttles account B.
 */
class AccountRateLimiter {
  private buckets = new Map<string, TokenBucket>();
  private ratePerMinute: number;

  constructor(ratePerMinute: number) {
    this.ratePerMinute = ratePerMinute;
  }

  get(accountId: string): TokenBucket {
    let b = this.buckets.get(accountId);
    if (!b) {
      b = new TokenBucket({ ratePerMinute: this.ratePerMinute });
      this.buckets.set(accountId, b);
    }
    return b;
  }
}

export const accountRateLimiter = new AccountRateLimiter(env.BOT_MAX_SEND_PER_MINUTE);
  • Step 4: Run the tests
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run rate-limiter

Expected: PASS, 8 tests.

  • Step 5: Commit
git add apps/bot/src/scheduler/rate-limiter.ts apps/bot/src/scheduler/rate-limiter.test.ts
git commit -m "feat(bot): per-account token-bucket rate limiter

TokenBucket gates each socket.sendMessage call. Tokens regenerate at
ratePerMinute/60 per second, capped at one minute's worth so quiet
accounts can't burst. FIFO drain across concurrent waiters.

accountRateLimiter (singleton) hands out one bucket per accountId, so
account A's drain never throttles account B. Default rate is
BOT_MAX_SEND_PER_MINUTE (40) — the safe band for an established
WhatsApp account."

Task 5: Delivery window helper (TDD)

The helper lives in packages/shared (NOT the bot) because both bundles need it: bot's fire-reminder loop gates on it, and web's ETA pill (Task 9) compares ETA against it to flip the green/amber state.

Files:

  • Create: packages/shared/src/delivery-window.test.ts

  • Create: packages/shared/src/delivery-window.ts

  • Modify: packages/shared/src/index.ts (re-export windowEndAt)

  • Step 1: Write the failing test file at packages/shared/src/delivery-window.test.ts

import { describe, it, expect } from "vitest";
import { windowEndAt } from "./delivery-window.js";

const TZ = "Asia/Kuala_Lumpur"; // UTC+8

describe("windowEndAt", () => {
  it("returns today's end-hour boundary in the given timezone", () => {
    // Fire at 2026-05-10 10:00 KL == 02:00 UTC. End hour 18 == 18:00 KL == 10:00 UTC.
    const fireAt = new Date("2026-05-10T02:00:00.000Z");
    const out = windowEndAt(TZ, 18, fireAt);
    expect(out.toISOString()).toBe("2026-05-10T10:00:00.000Z");
  });

  it("returns a past timestamp when fireAt is already after the end hour", () => {
    // Fire at 2026-05-10 19:00 KL == 11:00 UTC. End hour 18 → today's 18:00 KL == 10:00 UTC.
    // That's BEFORE fireAt. The caller's first window-gate check trips immediately.
    const fireAt = new Date("2026-05-10T11:00:00.000Z");
    const out = windowEndAt(TZ, 18, fireAt);
    expect(out.toISOString()).toBe("2026-05-10T10:00:00.000Z");
    expect(out.getTime()).toBeLessThan(fireAt.getTime());
  });

  it("respects the timezone (UTC+0 vs UTC+8)", () => {
    const fireAt = new Date("2026-05-10T02:00:00.000Z");
    const inUtc = windowEndAt("UTC", 18, fireAt);
    expect(inUtc.toISOString()).toBe("2026-05-10T18:00:00.000Z");
    const inKl = windowEndAt("Asia/Kuala_Lumpur", 18, fireAt);
    expect(inKl.toISOString()).toBe("2026-05-10T10:00:00.000Z");
  });

  it("handles end hour 24 as midnight of the same calendar day's end", () => {
    // 2026-05-10 in KL ends at 2026-05-11 00:00 KL == 2026-05-10 16:00 UTC.
    const fireAt = new Date("2026-05-10T02:00:00.000Z");
    const out = windowEndAt(TZ, 24, fireAt);
    expect(out.toISOString()).toBe("2026-05-10T16:00:00.000Z");
  });

  it("DST transition day stays on the SAME calendar day (no skipping forward)", () => {
    // US/Eastern has DST. On 2026-03-08 (DST starts), 18:00 EDT is a real time.
    // Fire at 2026-03-08 10:00 EST (15:00 UTC). End at 2026-03-08 18:00 EDT (22:00 UTC).
    const fireAt = new Date("2026-03-08T15:00:00.000Z");
    const out = windowEndAt("America/New_York", 18, fireAt);
    expect(out.toISOString()).toBe("2026-03-08T22:00:00.000Z");
  });

  it("rejects end hour outside 0..24", () => {
    const fireAt = new Date("2026-05-10T00:00:00Z");
    expect(() => windowEndAt(TZ, -1, fireAt)).toThrow();
    expect(() => windowEndAt(TZ, 25, fireAt)).toThrow();
  });
});
  • Step 2: Run the failing test
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/shared test -- --run delivery-window

Expected: FAIL with "Cannot find module './delivery-window.js'".

  • Step 3: Implement packages/shared/src/delivery-window.ts
import { DateTime } from "luxon";

/**
 * Returns the end-of-window timestamp for the calendar day `fireAt`
 * falls on, in the operator's timezone.
 *
 *   windowEndAt("Asia/Kuala_Lumpur", 18, fireAt)
 *   → today's 18:00 KL (which may be in the past if fireAt is already
 *     past 18:00 KL — caller's first window-gate fires immediately).
 *
 * `endHour` is 0..24. Hour 24 is treated as midnight of the next
 * calendar day (i.e. "end of today" inclusive).
 *
 * Pure: no I/O, no Date.now() reads, no clock dependency. Easy to
 * test with fixture inputs.
 */
export function windowEndAt(
  timezone: string,
  endHour: number,
  fireAt: Date,
): Date {
  if (!Number.isInteger(endHour) || endHour < 0 || endHour > 24) {
    throw new Error(`windowEndAt: endHour must be 0..24, got ${endHour}`);
  }

  const dt = DateTime.fromJSDate(fireAt).setZone(timezone);
  if (!dt.isValid) {
    throw new Error(`windowEndAt: invalid timezone "${timezone}"`);
  }

  // For hour 24, "end of day" is the next midnight. Luxon's `set` with
  // hour=24 normalises into hour=0 of the next day, which is exactly
  // what we want.
  const end = dt.set({ hour: endHour, minute: 0, second: 0, millisecond: 0 });
  return end.toJSDate();
}
  • Step 4: Re-export from packages/shared/src/index.ts

Append:

export { windowEndAt } from "./delivery-window.js";
  • Step 5: Run the tests
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/shared test -- --run delivery-window

Expected: PASS, 6 tests.

  • Step 6: Commit
git add packages/shared/src/delivery-window.ts packages/shared/src/delivery-window.test.ts packages/shared/src/index.ts
git commit -m "feat(shared): pure delivery-window end calculator

windowEndAt(timezone, endHour, fireAt) returns the end-of-window for
the day fireAt is on. If fireAt is already past, the result is a
past timestamp — the run loop's first window gate trips immediately
and the entire run resolves as failed (zero sent), which is the
right behaviour for 'we can't send after window close'."

Task 6: MediaUploadCache (TDD with mock socket)

Files:

  • Create: apps/bot/src/scheduler/media-upload-cache.test.ts

  • Create: apps/bot/src/scheduler/media-upload-cache.ts

  • Step 1: Write the failing test file at apps/bot/src/scheduler/media-upload-cache.test.ts

import { describe, it, expect, vi } from "vitest";
import { MediaUploadCache } from "./media-upload-cache.js";

describe("MediaUploadCache", () => {
  it("uploads each unique mediaId exactly once across N gets", async () => {
    const prepare = vi.fn(async (mediaId: string) => ({
      kind: "prepared",
      mediaId,
    }));
    const cache = new MediaUploadCache(prepare);

    const a1 = await cache.get("media-A");
    const a2 = await cache.get("media-A");
    const b1 = await cache.get("media-B");

    expect(prepare).toHaveBeenCalledTimes(2);
    expect(prepare).toHaveBeenCalledWith("media-A");
    expect(prepare).toHaveBeenCalledWith("media-B");
    // Same handle returned for repeated lookups of A.
    expect(a1).toBe(a2);
    expect(a1).not.toBe(b1);
  });

  it("coalesces concurrent gets of the same mediaId into ONE prepare call", async () => {
    let resolveA: (v: unknown) => void = () => {};
    const aPromise = new Promise((r) => (resolveA = r));
    const prepare = vi.fn(async (mediaId: string) => {
      if (mediaId === "media-A") return aPromise;
      return { kind: "prepared", mediaId };
    });
    const cache = new MediaUploadCache(prepare);

    const p1 = cache.get("media-A");
    const p2 = cache.get("media-A");
    const p3 = cache.get("media-A");

    // Resolve the in-flight prepare.
    resolveA({ kind: "prepared", mediaId: "media-A" });

    const [r1, r2, r3] = await Promise.all([p1, p2, p3]);
    expect(prepare).toHaveBeenCalledTimes(1); // de-duplicated
    expect(r1).toBe(r2);
    expect(r2).toBe(r3);
  });

  it("a thrown prepare is NOT cached — next get retries", async () => {
    let attempt = 0;
    const prepare = vi.fn(async (_mediaId: string) => {
      attempt++;
      if (attempt === 1) throw new Error("upload network blip");
      return { kind: "prepared", attempt };
    });
    const cache = new MediaUploadCache(prepare);

    await expect(cache.get("media-A")).rejects.toThrow("upload network blip");
    // Second attempt must call prepare again — DON'T cache failures.
    const r = await cache.get("media-A");
    expect(prepare).toHaveBeenCalledTimes(2);
    expect(r).toEqual({ kind: "prepared", attempt: 2 });
  });

  it("size() reflects the number of cached unique mediaIds", async () => {
    const prepare = async (mediaId: string) => ({ mediaId });
    const cache = new MediaUploadCache(prepare);
    expect(cache.size()).toBe(0);
    await cache.get("a");
    expect(cache.size()).toBe(1);
    await cache.get("b");
    expect(cache.size()).toBe(2);
    await cache.get("a"); // already cached
    expect(cache.size()).toBe(2);
  });
});
  • Step 2: Run the failing test
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run media-upload-cache

Expected: FAIL with "Cannot find module './media-upload-cache.js'".

  • Step 3: Implement apps/bot/src/scheduler/media-upload-cache.ts
/**
 * Per-run cache of `prepareWAMessageMedia` results, keyed by
 * `mediaId`. The point: when a reminder fans out to 1000 groups with
 * one image, we want to upload that image to WhatsApp's CDN ONCE, not
 * 1000 times. Subsequent group sends reuse the prepared message
 * (with embedded directPath / mediaKey) via socket.relayMessage.
 *
 * Lifecycle: one cache instance per fire-reminder run. After the run
 * completes, the cache is dropped — we don't share uploads across
 * runs because WA media tokens are short-lived.
 *
 * Concurrent gets of the same mediaId are coalesced into a single
 * prepare call. Failed prepares are NOT cached so the next attempt
 * retries (network blips at upload time shouldn't poison the cache).
 */
export class MediaUploadCache<T> {
  private readonly prepare: (mediaId: string) => Promise<T>;
  private readonly entries = new Map<string, Promise<T>>();

  constructor(prepare: (mediaId: string) => Promise<T>) {
    this.prepare = prepare;
  }

  async get(mediaId: string): Promise<T> {
    const existing = this.entries.get(mediaId);
    if (existing) return existing;

    const inflight = this.prepare(mediaId);
    // Insert eagerly so concurrent gets dedupe.
    this.entries.set(mediaId, inflight);

    try {
      return await inflight;
    } catch (err) {
      // Don't cache failures — the next caller should retry.
      this.entries.delete(mediaId);
      throw err;
    }
  }

  size(): number {
    return this.entries.size;
  }
}
  • Step 4: Run the tests
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run media-upload-cache

Expected: PASS, 4 tests.

  • Step 5: Commit
git add apps/bot/src/scheduler/media-upload-cache.ts apps/bot/src/scheduler/media-upload-cache.test.ts
git commit -m "feat(bot): MediaUploadCache for once-per-run media prepare

One cache instance per fire-reminder run. Each unique mediaId gets
prepared (uploaded to WA CDN) exactly once, and subsequent group
sends within the run reuse the prepared message via relayMessage.
Concurrent gets coalesce into a single prepare. Failed prepares
don't poison the cache — next caller retries."

Task 7: Rewrite fire-reminder.ts and bump pg-boss teamSize

Files:

  • Modify: apps/bot/src/scheduler/reminder-jobs.ts

  • Rewrite: apps/bot/src/scheduler/fire-reminder.ts

  • Step 1: Update apps/bot/src/scheduler/reminder-jobs.ts — pass teamSize

Whole file:

import type { PgBoss } from "pg-boss";
import { logger } from "../logger.js";
import { env } from "../env.js";
import { fireReminder, type FireReminderPayload } from "./fire-reminder.js";

export const REMINDER_FIRE_QUEUE = "reminder.fire";

export async function registerReminderJobs(boss: PgBoss): Promise<void> {
  await boss.createQueue(REMINDER_FIRE_QUEUE);
  await boss.work<FireReminderPayload>(
    REMINDER_FIRE_QUEUE,
    {
      // Up to BOT_FIRE_CONCURRENCY jobs in flight at once. Combined with
      // the per-account mutex inside fireReminder, this lets reminders
      // on DIFFERENT accounts run in parallel while same-account
      // reminders take turns.
      teamSize: env.BOT_FIRE_CONCURRENCY,
      teamConcurrency: 1,
    },
    async (jobs) => {
      const job = jobs[0];
      if (!job) return;
      logger.debug({ jobId: job.id, payload: job.data }, "reminder.fire: handling");
      await fireReminder(job.data);
    },
  );
  logger.info(
    { teamSize: env.BOT_FIRE_CONCURRENCY },
    "reminder.fire: handler registered",
  );
}

export async function scheduleReminderFire(
  boss: PgBoss,
  reminderId: string,
  scheduledAt: Date,
): Promise<string | null> {
  const id = await boss.send(
    REMINDER_FIRE_QUEUE,
    { reminderId },
    {
      startAfter: scheduledAt,
      retryLimit: 3,
      retryDelay: 30,
      retryBackoff: true,
      // Use the reminderId as a singleton key so re-scheduling cancels the old job
      singletonKey: `reminder:${reminderId}`,
    },
  );
  logger.info({ reminderId, jobId: id, scheduledAt }, "reminder.fire: scheduled");
  return id;
}

export async function cancelReminderFire(_boss: PgBoss, reminderId: string): Promise<void> {
  // Soft cancel: pg-boss doesn't expose a clean cancel-by-singleton API in v12.
  // The scheduled job will still fire, but `fireReminder` exits early when the
  // reminder row is gone. Hard cancel can be added later by storing the jobId.
  logger.info({ reminderId }, "reminder.fire: cancel requested (soft, fizzles on fire)");
}
  • Step 2: Rewrite apps/bot/src/scheduler/fire-reminder.ts

Whole file:

import { and, eq, inArray } from "drizzle-orm";
import { reminderRuns, reminderRunTargets, reminders } from "@cmbot/db";
import {
  generateWAMessageContent,
  generateMessageID,
  type AnyMessageContent,
  type proto,
} from "@whiskeysockets/baileys";
import pLimit from "p-limit";
import { db } from "../db.js";
import { logger } from "../logger.js";
import { sessionManager } from "../whatsapp/session-manager.js";
import { absoluteMediaPath, nextOccurrence, resolveDeliveryKind } from "@cmbot/shared";
import { open as fsOpen, readFile } from "node:fs/promises";
import { env } from "../env.js";
import { writeAuditLog } from "../audit.js";
import { getReminderWithDetails } from "../reminders/crud.js";
import { getBoss } from "./pgboss-client.js";
import { scheduleReminderFire } from "./reminder-jobs.js";
import { pgNotifyWeb } from "../ipc/notify.js";
import { accountMutex } from "./per-key-mutex.js";
import { accountRateLimiter } from "./rate-limiter.js";
import { windowEndAt } from "@cmbot/shared";
import { MediaUploadCache } from "./media-upload-cache.js";

export type FireReminderPayload = {
  reminderId: string;
  /** Optional resume hook. When present, fire-reminder ATTACHES to
   *  the existing run instead of creating a new one, and only
   *  re-attempts targets in `pending` status. Set by the resume
   *  server action. */
  runId?: string;
};

/** Read the first N bytes of a file (used to sniff HEIF/AVIF/MOV brands). */
async function readHeadBytes(filePath: string, n: number): Promise<Uint8Array> {
  const fh = await fsOpen(filePath, "r");
  try {
    const buf = new Uint8Array(n);
    await fh.read(buf, 0, n, 0);
    return buf;
  } finally {
    await fh.close();
  }
}

/** Random delay between same-group message parts (ms). Just enough for
 *  visible ordering in the chat at WA's natural pace. */
function partJitterMs(): number {
  return 200 + Math.floor(Math.random() * 300); // 200..499
}

export async function fireReminder(payload: FireReminderPayload): Promise<void> {
  const reminder = await getReminderWithDetails(payload.reminderId);
  if (!reminder) {
    logger.warn({ reminderId: payload.reminderId }, "fire-reminder: reminder not found");
    return;
  }
  if (reminder.status !== "active") {
    logger.info({ reminderId: reminder.id, status: reminder.status }, "fire-reminder: skipping (not active)");
    return;
  }

  // Per-account mutex: two reminders on the SAME account take turns.
  // Different accounts run in parallel (cross-account isolation).
  await accountMutex.run(reminder.accountId, () => fireReminderInner(reminder, payload.runId));
}

async function fireReminderInner(
  reminder: NonNullable<Awaited<ReturnType<typeof getReminderWithDetails>>>,
  resumeRunId?: string,
): Promise<void> {
  // Resume path: attach to the existing run; fresh path: create one.
  let runId: string;
  if (resumeRunId) {
    const existing = await db.query.reminderRuns.findFirst({
      where: (r, { eq }) => eq(r.id, resumeRunId),
    });
    if (!existing) {
      logger.warn({ reminderId: reminder.id, resumeRunId }, "fire-reminder: resume target run missing");
      return;
    }
    runId = existing.id;
    // Re-mark as in-flight so the UI shows the run is no longer paused.
    await db
      .update(reminderRuns)
      .set({ status: "pending", errorSummary: null })
      .where(eq(reminderRuns.id, runId));
  } else {
    const [run] = await db
      .insert(reminderRuns)
      .values({
        reminderId: reminder.id,
        reminderName: reminder.name,
        status: "pending",
      })
      .returning({ id: reminderRuns.id });
    runId = run!.id;
  }

  const session = sessionManager.getSession(reminder.accountId);
  if (!session) {
    logger.warn({ reminderId: reminder.id }, "fire-reminder: account not connected");
    await markAllSkipped(runId, reminder, "account not connected");
    await db.update(reminderRuns).set({ status: "skipped", errorSummary: "account not connected" }).where(eq(reminderRuns.id, runId));
    await pgNotifyWeb({ type: "reminder.fired", reminderId: reminder.id, runId, status: "skipped" });
    return;
  }

  // Up-front: load all groups + media rows in TWO bulk queries.
  const groupIds = reminder.targets.map((t) => t.groupId);
  const groupRows = groupIds.length
    ? await db.query.whatsappGroups.findMany({ where: (g) => inArray(g.id, groupIds) })
    : [];
  const groupById = new Map(groupRows.map((g) => [g.id, g]));

  const mediaIds = Array.from(
    new Set(reminder.messages.map((m) => m.mediaId).filter((id): id is string => Boolean(id))),
  );
  const mediaRows = mediaIds.length
    ? await db.query.mediaFiles.findMany({ where: (m) => inArray(m.id, mediaIds) })
    : [];
  const mediaById = new Map(mediaRows.map((m) => [m.id, m]));

  // Pre-create run_targets rows so progress is observable mid-run.
  // On a RESUME, the rows already exist — only the original fire path
  // inserts them. The resume path skips this; the loop below filters
  // to only the still-pending rows.
  if (!resumeRunId && reminder.targets.length > 0) {
    await db.insert(reminderRunTargets).values(
      reminder.targets.map((t) => ({
        runId,
        groupId: t.groupId,
        groupLabel: groupById.get(t.groupId)?.name ?? null,
        status: "pending" as const,
      })),
    );
  }

  // On a RESUME, only the still-pending targets need attention. On
  // a fresh fire, every target is pending. Either way we read the
  // current run_target rows from the DB to be the source of truth
  // about what's left to do.
  const pendingRows = await db.query.reminderRunTargets.findMany({
    where: (t, { eq, and: drizzleAnd }) =>
      drizzleAnd(eq(t.runId, runId), eq(t.status, "pending")),
  });
  const pendingGroupIds = new Set(pendingRows.map((r) => r.groupId));
  const targetsToProcess = reminder.targets.filter((t) =>
    pendingGroupIds.has(t.groupId),
  );

  // Already-sent count from prior run (so the final tally adds to total).
  const priorSentCount = resumeRunId
    ? (
        await db.query.reminderRunTargets.findMany({
          where: (t, { eq, and: drizzleAnd }) =>
            drizzleAnd(eq(t.runId, runId), eq(t.status, "sent")),
        })
      ).length
    : 0;
  const priorFailedCount = resumeRunId
    ? (
        await db.query.reminderRunTargets.findMany({
          where: (t, { eq, and: drizzleAnd }) =>
            drizzleAnd(eq(t.runId, runId), eq(t.status, "failed")),
        })
      ).length
    : 0;

  // Window-end timestamp. If the reminder fires AFTER today's end-hour
  // (e.g. cron miss-fired late) this is in the past — every iteration
  // will trip the gate and the run resolves as failed.
  const windowEnd = windowEndAt(
    reminder.timezone,
    reminder.deliveryWindowEndHour,
    new Date(),
  );

  // Per-run media upload cache. Each unique mediaId is prepared via
  // generateWAMessageContent ONCE (which uploads to WA's CDN); the
  // resulting proto.Message is reused for every group via relayMessage.
  // socket.waUploadToServer is the upload helper Baileys exposes.
  const uploadCache = new MediaUploadCache<proto.Message>(async (mediaId) => {
    const media = mediaById.get(mediaId);
    if (!media) throw new Error(`media row missing: ${mediaId}`);
    const filePath = absoluteMediaPath(media.storagePath, env.MEDIA_DIR);
    const buffer = await readFile(filePath);
    const head = buffer.subarray(0, 12);
    const resolved = resolveDeliveryKind(media.mimeType, head);
    const senderKind: "image" | "video" | "document" =
      resolved === "image" || resolved === "video" ? resolved : "document";
    const content: AnyMessageContent =
      senderKind === "image"
        ? { image: buffer, mimetype: media.mimeType }
        : senderKind === "video"
        ? { video: buffer, mimetype: media.mimeType }
        : { document: buffer, fileName: media.filenameOriginal, mimetype: media.mimeType };
    return generateWAMessageContent(content, {
      // eslint-disable-next-line @typescript-eslint/no-explicit-any
      upload: (session.socket as any).waUploadToServer,
    });
  });

  // Per-account rate limiter — gates each socket.sendMessage / relayMessage call.
  const rateLimiter = accountRateLimiter.get(reminder.accountId);

  let sentCount = 0;
  let failedCount = 0;
  let skippedCount = 0;
  let windowClosed = false;

  const groupConcurrency = pLimit(env.BOT_GROUP_CONCURRENCY);

  await Promise.all(
    targetsToProcess.map((target) =>
      groupConcurrency(async () => {
        // Window-end gate. CRITICAL: leave the row as `pending` (NOT
        // `skipped`) so the run can be resumed later. The run as a
        // whole flips to `paused` after this loop.
        if (Date.now() >= windowEnd.getTime()) {
          windowClosed = true;
          // Don't touch the row — it's already `pending`. Just count.
          return;
        }

        const group = groupById.get(target.groupId);
        if (!group) {
          await db
            .update(reminderRunTargets)
            .set({ status: "skipped", error: "group missing from db" })
            .where(
              and(
                eq(reminderRunTargets.runId, runId),
                eq(reminderRunTargets.groupId, target.groupId),
              ),
            );
          skippedCount++;
          return;
        }

        const start = Date.now();
        try {
          let lastMessageId: string | undefined;
          for (const part of reminder.messages) {
            await rateLimiter.acquire();
            if (part.kind === "text" && part.textContent) {
              const r = await session.socket.sendMessage(group.waGroupJid, {
                text: part.textContent,
              });
              lastMessageId = r?.key?.id ?? undefined;
            } else if (part.mediaId) {
              const prebuilt = await uploadCache.get(part.mediaId);
              // Inject the caption (if any) just before relaying — the
              // prebuilt content carries the media but each relay uses
              // a fresh messageId.
              if (part.textContent) {
                injectCaption(prebuilt, part.textContent);
              }
              const messageId = generateMessageID();
              await session.socket.relayMessage(group.waGroupJid, prebuilt, { messageId });
              lastMessageId = messageId;
            }
            await new Promise((r) => setTimeout(r, partJitterMs()));
          }
          await db
            .update(reminderRunTargets)
            .set({
              status: "sent",
              waMessageId: lastMessageId ?? null,
              latencyMs: Date.now() - start,
            })
            .where(
              and(
                eq(reminderRunTargets.runId, runId),
                eq(reminderRunTargets.groupId, target.groupId),
              ),
            );
          sentCount++;
        } catch (err) {
          logger.error(
            { err, reminderId: reminder.id, groupId: target.groupId },
            "fire-reminder: send failed",
          );
          await db
            .update(reminderRunTargets)
            .set({ status: "failed", error: (err as Error).message })
            .where(
              and(
                eq(reminderRunTargets.runId, runId),
                eq(reminderRunTargets.groupId, target.groupId),
              ),
            );
          failedCount++;
        }
      }),
    ),
  );

  // Final status. The four shapes:
  //   - paused  : window closed with at least one row STILL pending.
  //               Resumable. Sent rows stay sent, pending stays pending.
  //   - success : every target sent (no failures, no pending).
  //   - partial : every target was attempted; some sent, some failed
  //               or skipped. NOT resumable; failures are real.
  //   - failed  : zero sent. Either every send errored, or the window
  //               was already closed when the run began (nothing
  //               attempted, but no pending-with-progress to resume).
  const total = reminder.targets.length;
  const totalSent = priorSentCount + sentCount;
  const totalFailed = priorFailedCount + failedCount;
  // Re-read pending count from the DB so the count reflects whatever
  // the loop left behind (any window-skipped rows are still pending).
  const remainingPending = (
    await db.query.reminderRunTargets.findMany({
      where: (t, { eq, and: drizzleAnd }) =>
        drizzleAnd(eq(t.runId, runId), eq(t.status, "pending")),
    })
  ).length;

  let status: "success" | "partial" | "failed" | "paused";
  let errorSummary: string | null = null;
  if (windowClosed && remainingPending > 0 && totalSent > 0) {
    status = "paused";
    errorSummary = `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}). ${totalSent} of ${total} groups delivered, ${remainingPending} still pending. Resume from the Activity tab. If this happens repeatedly, consider offloading to another paired account, or shrinking the message body / media size to fit more groups in your daily window.`;
  } else if (windowClosed && totalSent === 0) {
    // Window was closed before any send happened. Not paused — there's
    // nothing meaningful to resume. Counts as a hard failure.
    status = "failed";
    errorSummary = `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}) before any group could be sent. The reminder fired too late in the day.`;
  } else if (totalSent === total) {
    status = "success";
  } else if (totalSent > 0) {
    status = "partial";
    errorSummary = `${totalSent} of ${total} groups delivered (${totalFailed} failed, ${skippedCount} skipped).`;
  } else {
    status = "failed";
    errorSummary = `All ${total} sends failed.`;
  }

  await db
    .update(reminderRuns)
    .set({ status, errorSummary })
    .where(eq(reminderRuns.id, runId));

  await pgNotifyWeb({
    type: "reminder.fired",
    reminderId: reminder.id,
    runId,
    status,
  });

  // Lifecycle bookkeeping. Skip when the run is paused — the reminder
  // shouldn't end or re-arm while a resume is still possible.
  const runIsTerminal = status !== "paused";

  if (runIsTerminal) {
    if (reminder.scheduleKind === "one_off") {
      await db
        .update(reminders)
        .set({ status: "ended", updatedAt: new Date() })
        .where(eq(reminders.id, reminder.id));
    } else if (reminder.scheduleKind === "recurring" && reminder.rrule) {
      const next = nextOccurrence(reminder.rrule, reminder.timezone, new Date());
      await db
        .update(reminders)
        .set({ lastFiredAt: new Date(), updatedAt: new Date() })
        .where(eq(reminders.id, reminder.id));
      if (next) {
        try {
          await scheduleReminderFire(getBoss(), reminder.id, next);
          logger.info({ reminderId: reminder.id, next }, "fire-reminder: re-armed for next occurrence");
        } catch (err) {
          logger.error({ err, reminderId: reminder.id }, "fire-reminder: failed to re-arm next occurrence");
        }
      } else {
        logger.info({ reminderId: reminder.id }, "fire-reminder: no further occurrences, ending");
        await db.update(reminders).set({ status: "ended" }).where(eq(reminders.id, reminder.id));
      }
    }
  } else {
    logger.info(
      { reminderId: reminder.id, runId },
      "fire-reminder: paused — leaving reminder lifecycle unchanged for resume",
    );
  }

  await writeAuditLog(db, {
    operatorId: reminder.createdBy,
    source: "system",
    action: "reminder.fired",
    targetType: "reminder",
    targetId: reminder.id,
    payload: { runId, status, sent: sentCount, failed: failedCount, skipped: skippedCount },
  });

  logger.info(
    { reminderId: reminder.id, runId, status, sent: sentCount, failed: failedCount, skipped: skippedCount },
    "fire-reminder: done",
  );
}

/**
 * Mark every target as skipped with the given error. Used when the
 * account is offline before the loop even starts (no run_target rows
 * have been created yet, so we INSERT instead of UPDATE).
 */
async function markAllSkipped(
  runId: string,
  reminder: NonNullable<Awaited<ReturnType<typeof getReminderWithDetails>>>,
  error: string,
): Promise<void> {
  if (reminder.targets.length === 0) return;
  const rows = await db.query.whatsappGroups.findMany({
    where: (g) => inArray(g.id, reminder.targets.map((t) => t.groupId)),
    columns: { id: true, name: true },
  });
  const labelById = new Map(rows.map((r) => [r.id, r.name]));
  await db.insert(reminderRunTargets).values(
    reminder.targets.map((t) => ({
      runId,
      groupId: t.groupId,
      groupLabel: labelById.get(t.groupId) ?? null,
      status: "skipped" as const,
      error,
    })),
  );
}

/**
 * Mutates the prebuilt proto.Message to set the caption on whichever
 * media variant it carries. Baileys' relayMessage does not let us
 * pass the caption alongside; the protobuf already carries the slot.
 */
function injectCaption(msg: proto.Message, caption: string): void {
  if (msg.imageMessage) msg.imageMessage.caption = caption;
  else if (msg.videoMessage) msg.videoMessage.caption = caption;
  else if (msg.documentMessage) msg.documentMessage.caption = caption;
}
  • Step 3: Add p-limit to apps/bot deps
NO_SUDO=1 ./scripts/dev.sh exec pnpm add -F @cmbot/bot p-limit

Expected: pnpm reports +1 package. The lockfile updates.

  • Step 4: Typecheck the bot
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot typecheck

Expected: passes. If type errors mention relayMessage overloads, double-check the @whiskeysockets/baileys import added — the function comes from the socket instance, not the package's named exports.

  • Step 5: Run all bot tests
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run

Expected: PASS for all (existing 26 + 7 PerKeyMutex + 8 RateLimiter + 6 DeliveryWindow + 4 MediaUploadCache = 51).

  • Step 6: Restart the bot to pick up the rewrite
NO_SUDO=1 ./scripts/dev.sh restart-bot
NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml logs --tail=40 bot 2>&1 | grep -i "reminder.fire"

Expected: A line like reminder.fire: handler registered teamSize=8.

  • Step 7: Commit
git add apps/bot/src/scheduler/fire-reminder.ts apps/bot/src/scheduler/reminder-jobs.ts apps/bot/package.json pnpm-lock.yaml
git commit -m "feat(bot): windowed, pacing-safe fan-out

Rewrites the per-target loop to:
  - Wrap inner work in PerKeyMutex(accountId) so two reminders on the
    same account take turns; different accounts run in parallel.
  - Bulk-load groups and media rows up front (drops ~3000 round-trips
    to ~3 for a 1000-group run).
  - Pre-create run_target rows with status='pending' so the Activity
    tab shows progress mid-run.
  - Pre-upload each unique media via MediaUploadCache (one
    generateWAMessageContent call per mediaId, then relayMessage to
    every group). For 1000 groups × 5 MB image, this turns 5 GB of
    upload into 5 MB.
  - Run BOT_GROUP_CONCURRENCY (default 3) groups in parallel within
    one account; each group's parts stay serial for visible order.
  - Gate every socket call on a per-account TokenBucket
    (BOT_MAX_SEND_PER_MINUTE, default 40).
  - Replace the rigid 1.5s inter-part sleep with 200..499 ms jitter.
  - Window-end gate: stop sending once today's end-hour (in the
    reminder's timezone) has passed; mark unstarted targets skipped;
    surface a partial-status message that names the timezone, the
    delivered/total count, and points at multi-account offload.

reminder-jobs.ts now passes teamSize=BOT_FIRE_CONCURRENCY (default 8)
to boss.work so up to 8 different-account reminders run concurrently."

Task 8: Web wiring — schema, action, wizard input, notification body

Files:

  • Modify: apps/web/src/actions/reminders.ts

  • Modify: apps/web/src/components/reminder-wizard/when-form-client.tsx

  • Modify: apps/web/src/components/reminder-edit/edit-when-form.tsx

  • Modify: apps/web/src/lib/notifications.ts

  • Create: apps/web/src/lib/notifications-body.test.ts (extends existing notifications tests with the partial-status body)

  • Step 1: Update the create/update Zod schemas in apps/web/src/actions/reminders.ts

Find the createReminderSchema definition (around lines 220250) and add the two fields. Schema chunk:

const createReminderSchema = z
  .object({
    accountId: z.string().uuid(),
    groupIds: z.array(z.string().uuid()),
    messages: z.array(messagePartSchema).optional(),
    name: z.string().nullable().optional(),
    text: z.string().nullable().optional(),
    mediaId: z.string().uuid().nullable().optional(),
    caption: z.string().nullable().optional(),
    scheduledAtIso: z.string().datetime({ offset: true }),
    rrule: z.string().nullable().optional(),
    timezone: z.string().default(DEFAULT_TIMEZONE),
    // Delivery window. End hour is enforced at runtime by fire-reminder;
    // start hour is documented but not gated in v1.
    deliveryWindowStartHour: z.number().int().min(0).max(24).default(6),
    deliveryWindowEndHour: z.number().int().min(0).max(24).default(18),
  })
  .refine(
    (d) =>
      (d.messages && d.messages.length > 0) ||
      Boolean(d.text?.trim()) ||
      Boolean(d.mediaId),
    {
      message: "Add a message or attach a file",
      path: ["messages"],
    },
  )
  .refine((d) => d.deliveryWindowStartHour < d.deliveryWindowEndHour, {
    message: "Delivery window start must be earlier than end",
    path: ["deliveryWindowStartHour"],
  });

Then in BOTH createReminderAction and updateReminderAction, where the db.insert(reminders).values({...}) / db.update(reminders).set({...}) happens, include the two new fields. Find the values({ blocks (around line 350 and line 460) and add:

deliveryWindowStartHour: parsed.data.deliveryWindowStartHour,
deliveryWindowEndHour: parsed.data.deliveryWindowEndHour,

immediately after timezone,.

  • Step 2: Add the input controls to apps/web/src/components/reminder-wizard/when-form-client.tsx

Add to the props interface and component state. Top-of-file imports stay; add a new state and a new section in the JSX. The whole edit:

Find the interface WhenFormClientProps block. Add:

interface WhenFormClientProps {
  accountId: string;
  groupIds: string;
  timezone: string;
  initialDefaultIso: string;
  initialSpec?: RecurrenceSpec;
  initialDeliveryStartHour?: number;
  initialDeliveryEndHour?: number;
  passThroughParams: PassThroughParams;
}

In the component body (around state declarations near the top), add:

const [deliveryStartHour, setDeliveryStartHour] = useState<number>(
  props.initialDeliveryStartHour ?? 6,
);
const [deliveryEndHour, setDeliveryEndHour] = useState<number>(
  props.initialDeliveryEndHour ?? 18,
);

In handleContinue, when building the URLSearchParams, add (after the existing params):

sp.set("deliveryStartHour", String(deliveryStartHour));
sp.set("deliveryEndHour", String(deliveryEndHour));

In the JSX, just before the existing <RecurrencePicker> block, add a new "Delivery hours" card:

<div className="space-y-2">
  <Label className="flex items-center gap-1.5">
    <ClockIcon className="size-3.5" />
    Delivery hours
  </Label>
  <div className="flex items-center gap-2">
    <Input
      type="number"
      min={0}
      max={24}
      step={1}
      value={deliveryStartHour}
      onChange={(e) => setDeliveryStartHour(Number(e.target.value))}
      className="h-9 w-20"
      aria-label="Delivery start hour"
    />
    <span className="text-sm text-muted-foreground">to</span>
    <Input
      type="number"
      min={0}
      max={24}
      step={1}
      value={deliveryEndHour}
      onChange={(e) => setDeliveryEndHour(Number(e.target.value))}
      className="h-9 w-20"
      aria-label="Delivery end hour"
    />
    <span className="text-xs text-muted-foreground">
      (24-hour, in {timezone})
    </span>
  </div>
  <p className="text-xs text-muted-foreground">
    The bot stops sending after the end hour. Long fan-outs that don&apos;t
    finish in this window are paused  you can resume them from the
    Activity tab.
  </p>
</div>

Add ClockIcon to the lucide-react imports if it's not already there (file currently imports CalendarIcon, ClockIcon, AlertCircleIcon — already there).

  • Step 3: Wire the new params into apps/web/src/components/reminder-wizard/step-when.tsx

The wizard reads initialDeliveryStartHour / initialDeliveryEndHour from URL and passes them to <WhenFormClient>. Find where <WhenFormClient> is rendered and add the two new props:

<WhenFormClient
  accountId={accountId}
  groupIds={groupIds}
  timezone={op.defaultTimezone ?? "UTC"}
  initialDefaultIso={initialDefaultIso}
  initialSpec={initialSpec}
  initialDeliveryStartHour={
    sp.deliveryStartHour ? Number(sp.deliveryStartHour) : undefined
  }
  initialDeliveryEndHour={
    sp.deliveryEndHour ? Number(sp.deliveryEndHour) : undefined
  }
  passThroughParams={passThroughParams}
/>

Also update the interface StepWhenParams (or wherever the searchParams type lives in step-when.tsx) to include deliveryStartHour?: string; deliveryEndHour?: string;.

  • Step 4: Pass-through in step-groups.tsx and step-review.tsx

Both step pages thread URL params through. Open each and find where they re-emit the URL (look for the URLSearchParams builder or editLink helper). Add:

if (sp.deliveryStartHour) backParams.set("deliveryStartHour", sp.deliveryStartHour);
if (sp.deliveryEndHour) backParams.set("deliveryEndHour", sp.deliveryEndHour);

Add the same to editLink in step-review.tsx so the "Edit when" link from the review page round-trips the values.

In review-submit-client.tsx, where the action payload is built, include the two fields:

const payload = {
  accountId,
  groupIds: groupIds ? groupIds.split(",").filter(Boolean) : [],
  name: name?.trim() || null,
  messages,
  scheduledAtIso: scheduledAt,
  rrule: rrule ?? null,
  timezone,
  deliveryWindowStartHour: deliveryStartHour ?? 6,
  deliveryWindowEndHour: deliveryEndHour ?? 18,
};

(And add deliveryStartHour?: number; deliveryEndHour?: number; to the props interface and pass them in from step-review.tsx after parsing the URL.)

  • Step 5: Update apps/web/src/components/reminder-edit/edit-when-form.tsx similarly

Add the same Delivery hours card into the form, same state vars, and include the two fields in the updateReminderAction payload (find the existing updateReminderAction({...}) call). Pull the initial values from the loaded reminder row in the parent edit page (apps/web/src/app/reminders/[id]/edit/when/page.tsx):

<EditWhenForm
  reminderId={reminder.id}
  accountId={reminder.accountId}
  groupIds={targets.map((t) => t.groupId)}
  messages={initialMessages}
  name={reminder.name}
  initialIso={(reminder.scheduledAt ?? new Date()).toISOString()}
  initialSpec={specFromRrule(reminder.rrule)}
  timezone={reminder.timezone}
  initialDeliveryStartHour={reminder.deliveryWindowStartHour}
  initialDeliveryEndHour={reminder.deliveryWindowEndHour}
/>

The form's prop interface gets:

initialDeliveryStartHour?: number;
initialDeliveryEndHour?: number;
  • Step 6: Extend the notification body for paused + partial in apps/web/src/lib/notifications.ts

Find reminderFiredToNotification. The existing function takes { reminderId, runId, status }. Extend the event shape to carry sent/total and handle paused as a first-class status:

export function reminderFiredToNotification(event: {
  type: "reminder.fired";
  reminderId: string;
  runId: string;
  status: string;
  sent?: number;
  total?: number;
}): ShowNotificationOptions | null {
  if (event.status === "skipped") return null;
  const headline =
    event.status === "success"
      ? "Reminder sent"
      : event.status === "paused"
        ? "Reminder paused"
        : event.status === "partial"
          ? "Reminder partly sent"
          : "Reminder failed";
  let body =
    event.status === "success"
      ? "All groups received the message."
      : event.status === "paused"
        ? "Delivery window closed before all groups got the message."
        : event.status === "partial"
          ? "Some groups received the message; others failed. See activity."
          : "No groups received the message. See activity.";
  if (event.status === "paused" && event.sent !== undefined && event.total !== undefined) {
    body = `${event.sent} of ${event.total} groups delivered. Tap to resume or cancel.`;
  } else if (event.status === "partial" && event.sent !== undefined && event.total !== undefined) {
    body = `${event.sent} of ${event.total} groups delivered. See activity for details.`;
  }
  return {
    title: headline,
    body,
    tag: `reminder:${event.reminderId}`,
    href: `/reminders/${event.reminderId}`,
  };
}

Also update the SSE event emitter (search for reminder.fired in apps/bot/src/scheduler/fire-reminder.ts) to include sent and total on the published event payload — Task 7 already wires this in the new fire-reminder. Confirm the web-side receiver passes those fields through to reminderFiredToNotification.

  • Step 7: Add tests for the extended notification body

Append to apps/web/src/lib/notifications.test.ts (inside the existing describe("reminderFiredToNotification mapping", () => {...}) block):

it("paused with sent/total renders 'Tap to resume or cancel'", () => {
  const args = reminderFiredToNotification({
    type: "reminder.fired",
    reminderId: "r-1",
    runId: "run-1",
    status: "paused",
    sent: 412,
    total: 1000,
  });
  expect(args?.title).toBe("Reminder paused");
  expect(args?.body).toBe("412 of 1000 groups delivered. Tap to resume or cancel.");
});

it("paused without sent/total falls back to a generic paused body", () => {
  const args = reminderFiredToNotification({
    type: "reminder.fired",
    reminderId: "r-1",
    runId: "run-1",
    status: "paused",
  });
  expect(args?.title).toBe("Reminder paused");
  expect(args?.body).toMatch(/Delivery window closed/);
});

it("partial with sent/total renders 'X of Y groups delivered' instead of the generic body", () => {
  const args = reminderFiredToNotification({
    type: "reminder.fired",
    reminderId: "r-1",
    runId: "run-1",
    status: "partial",
    sent: 412,
    total: 1000,
  });
  expect(args?.body).toBe("412 of 1000 groups delivered. See activity for details.");
});

it("partial without sent/total falls back to the generic body", () => {
  const args = reminderFiredToNotification({
    type: "reminder.fired",
    reminderId: "r-1",
    runId: "run-1",
    status: "partial",
  });
  expect(args?.body).toMatch(/Some groups received/);
});
  • Step 8: Update existing edit-section-forms.test.tsx fixtures so they don't break on the new required props

Find apps/web/src/components/reminder-edit/edit-section-forms.test.tsx. Both EditAccountForm and EditGroupsForm may need additional defaults. If their props don't reference the new fields, the tests still pass — re-run them and patch only if compilation fails. Run:

NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web typecheck

If typecheck reports Property 'initialDeliveryStartHour' is missing on any test, add initialDeliveryStartHour: 6, initialDeliveryEndHour: 18 to that fixture.

  • Step 9: Run web tests + bot tests
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run

Expected: web 298+ pass, bot 26+25 = 51 pass. Total ~380 across the project including shared.

  • Step 10: Restart web container so the server bundle picks up the schema change
NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml restart web
  • Step 11: Commit
git add apps/web
git commit -m "feat(web): delivery-window inputs in wizard + edit; partial body extension

Wizard 'When' step and the per-section edit-when page get two number
inputs for the delivery window hours (default 6/18). Server actions
accept them on the create/update Zod schemas, validate
0 <= start < end <= 24, and persist to the new reminders columns.

Notification body for paused-status now reads
'412 of 1000 groups delivered. Tap to resume or cancel.'; partial
status uses the same delivered/total wording when sent/total are
present."

Task 9: Run-ETA helper + ETA pill in wizard review (TDD)

Files:

  • Create: apps/web/src/lib/run-eta.ts

  • Create: apps/web/src/lib/run-eta.test.ts

  • Create: apps/web/src/components/reminder-wizard/run-eta-pill.tsx

  • Create: apps/web/src/components/reminder-wizard/run-eta-pill.test.tsx

  • Modify: apps/web/src/components/reminder-wizard/review-submit-client.tsx

  • Modify: apps/web/src/components/reminder-edit/edit-when-form.tsx

  • Modify: apps/web/src/components/reminder-edit/edit-groups-form.tsx

  • Step 1: Write the failing run-eta test

Create apps/web/src/lib/run-eta.test.ts:

import { describe, it, expect } from "vitest";
import { estimateRunDuration, ASSUMED_RATE_PER_MINUTE } from "./run-eta";

describe("estimateRunDuration", () => {
  it("uses target count / rate plus a 15% buffer, ceiling-rounded to whole minutes", () => {
    const r = estimateRunDuration({
      targetCount: 1000,
      ratePerMinute: 40,
      fireAt: new Date("2026-05-13T09:00:00.000+08:00"),
    });
    // 1000 / 40 = 25 min; +15% = 28.75 → ceil = 29
    expect(r.durationMinutes).toBe(29);
    expect(r.estimatedFinishAt.toISOString()).toBe(
      new Date("2026-05-13T09:29:00.000+08:00").toISOString(),
    );
  });

  it("returns a 1-minute floor for very small runs", () => {
    const r = estimateRunDuration({
      targetCount: 1,
      ratePerMinute: 40,
      fireAt: new Date("2026-05-13T09:00:00.000+08:00"),
    });
    expect(r.durationMinutes).toBe(1);
  });

  it("returns 0 minutes and finishAt = fireAt when targetCount is 0", () => {
    const fireAt = new Date("2026-05-13T09:00:00.000+08:00");
    const r = estimateRunDuration({ targetCount: 0, ratePerMinute: 40, fireAt });
    expect(r.durationMinutes).toBe(0);
    expect(r.estimatedFinishAt.toISOString()).toBe(fireAt.toISOString());
  });

  it("throws when ratePerMinute is 0 or negative", () => {
    expect(() =>
      estimateRunDuration({
        targetCount: 100,
        ratePerMinute: 0,
        fireAt: new Date(),
      }),
    ).toThrow();
    expect(() =>
      estimateRunDuration({
        targetCount: 100,
        ratePerMinute: -1,
        fireAt: new Date(),
      }),
    ).toThrow();
  });

  it("exports the configured default rate constant", () => {
    expect(typeof ASSUMED_RATE_PER_MINUTE).toBe("number");
    expect(ASSUMED_RATE_PER_MINUTE).toBeGreaterThan(0);
  });
});
  • Step 2: Run test to verify it fails
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run run-eta

Expected: FAIL — Cannot find module './run-eta'.

  • Step 3: Write the helper to pass

Create apps/web/src/lib/run-eta.ts:

/**
 * Default per-account send rate, mirroring `BOT_MAX_SEND_PER_MINUTE`
 * in the bot env. The web bundle hardcodes this — operators who tune
 * the bot env are expected to redeploy web with the matching value.
 */
export const ASSUMED_RATE_PER_MINUTE = 40;

const ETA_BUFFER = 1.15;

export function estimateRunDuration(opts: {
  targetCount: number;
  ratePerMinute?: number;
  fireAt: Date;
}): { durationMinutes: number; estimatedFinishAt: Date } {
  const rate = opts.ratePerMinute ?? ASSUMED_RATE_PER_MINUTE;
  if (rate <= 0) throw new Error("ratePerMinute must be > 0");
  if (opts.targetCount <= 0) {
    return { durationMinutes: 0, estimatedFinishAt: new Date(opts.fireAt) };
  }
  const raw = (opts.targetCount / rate) * ETA_BUFFER;
  const durationMinutes = Math.max(1, Math.ceil(raw));
  const estimatedFinishAt = new Date(
    opts.fireAt.getTime() + durationMinutes * 60_000,
  );
  return { durationMinutes, estimatedFinishAt };
}
  • Step 4: Run test to verify it passes
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run run-eta

Expected: PASS (5/5).

  • Step 5: Write the failing pill component test

Create apps/web/src/components/reminder-wizard/run-eta-pill.test.tsx:

import { describe, it, expect } from "vitest";
import { renderToStaticMarkup } from "react-dom/server";
import { RunEtaPill } from "./run-eta-pill";

describe("RunEtaPill", () => {
  it("renders green 'Fits in window' when estimatedFinishAt <= windowEndAt", () => {
    const html = renderToStaticMarkup(
      <RunEtaPill
        targetCount={500}
        fireAt={new Date("2026-05-13T09:00:00.000+08:00")}
        windowEndAt={new Date("2026-05-13T18:00:00.000+08:00")}
        timezone="Asia/Kuala_Lumpur"
      />,
    );
    expect(html).toMatch(/Fits in window/);
    expect(html).toMatch(/min/);
    expect(html).not.toMatch(/Likely to pause/);
  });

  it("renders amber 'Likely to pause' when ETA exceeds window", () => {
    const html = renderToStaticMarkup(
      <RunEtaPill
        targetCount={5000}
        fireAt={new Date("2026-05-13T17:00:00.000+08:00")}
        windowEndAt={new Date("2026-05-13T18:00:00.000+08:00")}
        timezone="Asia/Kuala_Lumpur"
      />,
    );
    expect(html).toMatch(/Likely to pause/);
    expect(html).toMatch(/Widen the window/);
  });

  it("renders nothing for zero targets", () => {
    const html = renderToStaticMarkup(
      <RunEtaPill
        targetCount={0}
        fireAt={new Date("2026-05-13T09:00:00.000+08:00")}
        windowEndAt={new Date("2026-05-13T18:00:00.000+08:00")}
        timezone="Asia/Kuala_Lumpur"
      />,
    );
    expect(html).toBe("");
  });
});
  • Step 6: Implement the pill

Create apps/web/src/components/reminder-wizard/run-eta-pill.tsx:

import { ClockIcon, AlertTriangleIcon } from "lucide-react";
import { estimateRunDuration } from "@/lib/run-eta";

interface RunEtaPillProps {
  targetCount: number;
  fireAt: Date;
  windowEndAt: Date;
  timezone: string;
}

/**
 * Visible at the wizard's review step and on the per-section edit
 * pages that affect ETA (groups, when). Advisory only — does NOT
 * block submission. The operator can still schedule a run that
 * pauses; the pause-and-resume flow covers that case.
 */
export function RunEtaPill({
  targetCount,
  fireAt,
  windowEndAt,
  timezone,
}: RunEtaPillProps) {
  if (targetCount <= 0) return null;

  const { durationMinutes, estimatedFinishAt } = estimateRunDuration({
    targetCount,
    fireAt,
  });
  const fits = estimatedFinishAt.getTime() <= windowEndAt.getTime();

  const finishLocal = new Intl.DateTimeFormat("en-GB", {
    hour: "2-digit",
    minute: "2-digit",
    timeZone: timezone,
  }).format(estimatedFinishAt);

  if (fits) {
    return (
      <div className="flex items-center gap-2 rounded-lg bg-emerald-500/10 px-3 py-2 text-xs text-emerald-700 dark:text-emerald-400">
        <ClockIcon className="size-3.5" />
        <span>
          ~{durationMinutes} min · finishes ~{finishLocal} · Fits in window
        </span>
      </div>
    );
  }
  return (
    <div className="flex items-start gap-2 rounded-lg bg-amber-500/10 px-3 py-2 text-xs text-amber-700 dark:text-amber-400">
      <AlertTriangleIcon className="size-3.5 mt-0.5 shrink-0" />
      <div className="space-y-0.5">
        <div>
          ~{durationMinutes} min · finishes ~{finishLocal} · Likely to pause
        </div>
        <div className="text-[11px] opacity-80">
          Widen the window or split into smaller runs.
        </div>
      </div>
    </div>
  );
}
  • Step 7: Run pill tests
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run run-eta-pill

Expected: 3/3 PASS.

  • Step 8: Wire the pill into review-submit-client

In apps/web/src/components/reminder-wizard/review-submit-client.tsx, add the pill above the Schedule button. The component already has access to groupIds, scheduledAt, timezone, and now deliveryEndHour (from Task 8). Compute the windowEndAt inline:

import { RunEtaPill } from "./run-eta-pill";
import { windowEndAt as computeWindowEndAt } from "@cmbot/shared"; // see note below

// inside the component, just before the action button:
{groupIds && scheduledAt && (() => {
  const ids = groupIds.split(",").filter(Boolean);
  const fireAt = new Date(scheduledAt);
  const wEnd = computeWindowEndAt(timezone, deliveryEndHour ?? 18, fireAt);
  return (
    <RunEtaPill
      targetCount={ids.length}
      fireAt={fireAt}
      windowEndAt={wEnd}
      timezone={timezone}
    />
  );
})()}

computeWindowEndAt is the helper Task 5 created in packages/shared/src/delivery-window.ts — both bundles import it from @cmbot/shared.

  • Step 9: Wire the pill into edit-groups-form and edit-when-form

Both forms know the reminder's targetCount, fireAt, timezone, and deliveryWindowEndHour. Add the pill above their Save button:

<RunEtaPill
  targetCount={selectedGroupIds.length}
  fireAt={new Date(scheduledAtIso)}
  windowEndAt={computeWindowEndAt(
    timezone,
    deliveryEndHour,
    new Date(scheduledAtIso),
  )}
  timezone={timezone}
/>
  • Step 10: Run all web tests
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/shared test -- --run
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run

Expected: all green. Web ≈304 (added 8 ETA tests + 3 pill tests, partially offset by no test removals). Shared adds the windowEndAt tests that moved from bot.

  • Step 11: Commit
git add apps/web packages/shared apps/bot
git commit -m "feat(web): ETA preview pill in wizard + edit-groups + edit-when

estimateRunDuration() computes a per-run ETA from BOT_MAX_SEND_PER_MINUTE
(hardcoded as ASSUMED_RATE_PER_MINUTE in the web bundle) plus a 15%
buffer. The RunEtaPill component shows a green 'Fits in window' or
amber 'Likely to pause' badge with a one-line suggestion. windowEndAt
moves from apps/bot/src/scheduler/delivery-window.ts to
packages/shared/src/delivery-window.ts so both bundles can import it."

Task 10: Resume + cancel actions and PausedRunBanner

Files:

  • Modify: apps/web/src/actions/reminders.ts (add resumeReminderRunAction, cancelReminderRunAction)

  • Create: apps/web/src/components/reminder-detail/paused-run-banner.tsx

  • Create: apps/web/src/components/reminder-detail/paused-run-banner.test.tsx

  • Modify: apps/web/src/app/reminders/[id]/page.tsx (mount the banner)

  • Modify: apps/web/src/app/activity/page.tsx (add Paused filter + Resume button per row)

  • Step 1: Write the failing banner test

Create apps/web/src/components/reminder-detail/paused-run-banner.test.tsx:

import { describe, it, expect, vi, beforeEach } from "vitest";
import { renderToStaticMarkup } from "react-dom/server";
import { PausedRunBanner } from "./paused-run-banner";

const resumeMock = vi.fn();
const cancelMock = vi.fn();
vi.mock("@/actions/reminders", () => ({
  resumeReminderRunAction: (...args: unknown[]) => resumeMock(...args),
  cancelReminderRunAction: (...args: unknown[]) => cancelMock(...args),
}));

describe("PausedRunBanner", () => {
  beforeEach(() => {
    resumeMock.mockReset();
    cancelMock.mockReset();
  });

  it("renders 'Resume' and 'Cancel run' buttons when latest run is paused", () => {
    const html = renderToStaticMarkup(
      <PausedRunBanner
        runId="run-1"
        sent={412}
        total={1000}
        windowEndHour={18}
        timezone="Asia/Kuala_Lumpur"
      />,
    );
    expect(html).toMatch(/Reminder paused/);
    expect(html).toMatch(/412 of 1000/);
    expect(html).toMatch(/Resume/);
    expect(html).toMatch(/Cancel run/);
  });
});
  • Step 2: Run to verify it fails
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run paused-run-banner

Expected: FAIL — module not found.

  • Step 3: Add resume + cancel actions to apps/web/src/actions/reminders.ts

Append (near the other run-related actions):

const runIdSchema = z.object({ runId: z.string().uuid() });

export async function resumeReminderRunAction(input: { runId: string }) {
  const op = await getSeededOperator();
  const parsed = runIdSchema.safeParse(input);
  if (!parsed.success) {
    return { ok: false as const, error: "Invalid runId" };
  }

  const run = await db.query.reminderRuns.findFirst({
    where: (r, { eq }) => eq(r.id, parsed.data.runId),
    with: { reminder: { columns: { operatorId: true, id: true } } },
  });
  if (!run || run.reminder.operatorId !== op.id) {
    return { ok: false as const, error: "Run not found" };
  }
  if (run.status !== "paused") {
    return { ok: false as const, error: `Cannot resume a ${run.status} run` };
  }

  await getBoss().send("reminder.fire", {
    reminderId: run.reminder.id,
    runId: run.id,
  });
  await writeAudit(op.id, "reminder.run.resumed", { runId: run.id });

  revalidatePath(`/reminders/${run.reminder.id}`);
  revalidatePath(`/activity`);
  return { ok: true as const };
}

export async function cancelReminderRunAction(input: { runId: string }) {
  const op = await getSeededOperator();
  const parsed = runIdSchema.safeParse(input);
  if (!parsed.success) {
    return { ok: false as const, error: "Invalid runId" };
  }

  const run = await db.query.reminderRuns.findFirst({
    where: (r, { eq }) => eq(r.id, parsed.data.runId),
    with: { reminder: { columns: { operatorId: true, id: true } } },
  });
  if (!run || run.reminder.operatorId !== op.id) {
    return { ok: false as const, error: "Run not found" };
  }
  if (run.status !== "paused") {
    return { ok: false as const, error: `Cannot cancel a ${run.status} run` };
  }

  await db.transaction(async (tx) => {
    await tx
      .update(reminderRunTargets)
      .set({ status: "skipped", error: "canceled by operator" })
      .where(
        and(
          eq(reminderRunTargets.runId, run.id),
          eq(reminderRunTargets.status, "pending"),
        ),
      );
    await tx
      .update(reminderRuns)
      .set({
        status: "partial",
        endedAt: new Date(),
        errorSummary: "Canceled by operator before all groups received the message.",
      })
      .where(eq(reminderRuns.id, run.id));
  });
  await writeAudit(op.id, "reminder.run.canceled", { runId: run.id });

  revalidatePath(`/reminders/${run.reminder.id}`);
  revalidatePath(`/activity`);
  return { ok: true as const };
}

(Imports needed at top of file: revalidatePath from next/cache, getBoss from @/lib/boss, writeAudit from @/lib/audit, reminderRuns and reminderRunTargets from @cmbot/db, and/eq from drizzle-orm.)

  • Step 4: Implement PausedRunBanner

Create apps/web/src/components/reminder-detail/paused-run-banner.tsx:

"use client";

import { useState, useTransition } from "react";
import { AlertCircleIcon, PlayIcon, XIcon, Loader2Icon } from "lucide-react";
import { Button } from "@/components/ui/button";
import {
  resumeReminderRunAction,
  cancelReminderRunAction,
} from "@/actions/reminders";

interface PausedRunBannerProps {
  runId: string;
  sent: number;
  total: number;
  windowEndHour: number;
  timezone: string;
}

export function PausedRunBanner({
  runId,
  sent,
  total,
  windowEndHour,
  timezone,
}: PausedRunBannerProps) {
  const [pending, startTransition] = useTransition();
  const [error, setError] = useState<string | null>(null);

  const onResume = () =>
    startTransition(async () => {
      setError(null);
      const r = await resumeReminderRunAction({ runId });
      if (!r.ok) setError(r.error);
    });

  const onCancel = () =>
    startTransition(async () => {
      setError(null);
      const r = await cancelReminderRunAction({ runId });
      if (!r.ok) setError(r.error);
    });

  return (
    <div className="rounded-lg border border-amber-500/40 bg-amber-500/5 p-4 space-y-3">
      <div className="flex items-start gap-2">
        <AlertCircleIcon className="size-4 text-amber-600 dark:text-amber-400 mt-0.5 shrink-0" />
        <div className="space-y-1">
          <p className="text-sm font-medium">Reminder paused</p>
          <p className="text-xs text-muted-foreground">
            {sent} of {total} groups delivered. The delivery window
            closed at {windowEndHour}:00 ({timezone}). Resume to send
            the remaining {total - sent} groups, or cancel the run.
          </p>
        </div>
      </div>
      {error && (
        <div className="text-xs text-destructive">{error}</div>
      )}
      <div className="flex gap-2">
        <Button size="sm" onClick={onResume} disabled={pending} className="gap-2">
          {pending ? <Loader2Icon className="size-3.5 animate-spin" /> : <PlayIcon className="size-3.5" />}
          Resume
        </Button>
        <Button size="sm" variant="outline" onClick={onCancel} disabled={pending} className="gap-2">
          <XIcon className="size-3.5" />
          Cancel run
        </Button>
      </div>
    </div>
  );
}
  • Step 5: Run banner test
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run paused-run-banner

Expected: PASS.

  • Step 6: Mount the banner on the reminder detail page

In apps/web/src/app/reminders/[id]/page.tsx, after loading reminder and runs, find the latest paused run and conditionally render the banner above the section list:

const latestPausedRun = runs.find((r) => r.status === "paused");

// in JSX, near the top of the page content:
{latestPausedRun && (
  <PausedRunBanner
    runId={latestPausedRun.id}
    sent={latestPausedRun.sentCount ?? 0}
    total={latestPausedRun.totalCount ?? 0}
    windowEndHour={reminder.deliveryWindowEndHour}
    timezone={reminder.timezone}
  />
)}

(sentCount and totalCount need to be derived in the query — adjust getReminderWithRuns to count run-target rows by status. If not present, add a count step there.)

  • Step 7: Add Paused filter + Resume button on Activity page

In apps/web/src/app/activity/page.tsx, extend the status filter pills to include "paused" (amber). For each row whose status is paused, render a small Resume button inline (use the same resumeReminderRunAction).

Re-use a small client component ResumeRunButton that wraps the action call (mirrors the existing inline buttons elsewhere). Place it in apps/web/src/components/activity/resume-run-button.tsx.

  • Step 8: Run all web tests
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run

Expected: all green.

  • Step 9: Restart web container so SSR picks up the new client components
NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml restart web
  • Step 10: Commit
git add apps/web
git commit -m "feat(web): paused-run banner with Resume / Cancel buttons

resumeReminderRunAction re-enqueues the existing run via pg-boss with
runId in the payload (Task 7's fire-reminder accepts that). Cancel
action flips remaining pending targets to skipped and resolves the
run to partial. Activity tab gets a Paused filter and inline Resume
button on each paused row."

Acceptance check (manual)

After all 10 tasks land:

  • Smoke 1. Create a reminder for 1 group with default delivery hours and a 30-second future fire time. Verify it lands; verify the run's error_summary is null and status success.

  • Smoke 2. Create a reminder with end hour set to a value that's already past in the operator's timezone. Schedule it to fire NOW. Verify the run resolves failed and the error_summary reads "Delivery window closed at H:00 (TZ) before any group could be sent."

  • Smoke 3. Create two reminders on TWO different paired accounts with the same scheduledAt. Watch bot logs:

    NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml logs --tail=60 bot 2>&1 | grep -E "reminder|fire-reminder"
    

    Expected: both reminders' fire-reminder: done entries land within seconds of each other (parallel), not sequentially separated by a full fan-out.

  • Smoke 4. Create a reminder with a 5-MB JPEG and 3 groups. Fire it. Bot logs should show only ONE prepareWAMessageMedia / upload entry (look for the upload size in the Baileys debug log) followed by 3 relayMessage events.

  • Smoke 5. Wizard ETA pill: pick 5 groups + default 6/18 hours. Pill should be green ("Fits in window"). Pick 5000 groups (clone an existing one): pill should flip amber ("Likely to pause") with the "Widen the window" hint.

  • Smoke 6. Trigger a paused run on purpose: set BOT_MAX_SEND_PER_MINUTE=2 in .env.development, restart bot, fire a 10-group reminder with end hour set ~3 minutes from now. Verify:

    • The run resolves paused (~6 groups sent).
    • A "Reminder paused" notification appears (with sent/total in body).
    • The detail page shows the banner with Resume and Cancel run.
    • Click Resume → run continues from the unsent targets, eventually resolving to success (or paused again if the window closes again).
    • Reset BOT_MAX_SEND_PER_MINUTE after the test.
  • Smoke 7. Cancel-run: same setup as Smoke 6, but click Cancel run instead of Resume. Verify remaining pending targets become skipped, run resolves partial with errorSummary "Canceled by operator before all groups received the message.", banner disappears.

  • Final test sweep:

    NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run
    NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run
    NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/shared test -- --run
    

    Expected: all green. ~395 total (web ≈315 with the new ETA + banner + notification tests; shared adds windowEndAt suite that moved from bot).