Folds in three rounds of requirement evolution: * Pause/resume on window close (was stop-and-report-partial). * ETA preview pill at compose / edit time so the operator sees whether their chosen window will fit before scheduling. * Interactive paused-run banner with Resume / Cancel buttons on the detail page; pause notification deep-links to it. Helper relocations: * windowEndAt() moves to packages/shared so both bot fire-reminder and the web ETA pill can import the same calculator. Plan grows from 8 to 10 tasks: adds Task 9 (run-eta + RunEtaPill, TDD) and Task 10 (resume/cancel actions + PausedRunBanner). Acceptance gains two paused-flow smoke tests. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2549 lines
89 KiB
Markdown
2549 lines
89 KiB
Markdown
# Windowed, Pacing-Safe Reminder Fan-Out — Implementation Plan
|
||
|
||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||
|
||
**Goal:** Deliver reminders to many groups (1000+) safely within a per-reminder delivery window (default 6am–6pm). On window-close mid-run, **pause** with a clear "account at capacity / consider offload or smaller media" message and let the operator **resume** later from the UI.
|
||
|
||
**Architecture:** Replace the current serial fan-out with a per-account isolation model. pg-boss `teamSize` raises so different-account reminders run in parallel; a per-key (accountId) async mutex serialises same-account fan-outs; a per-account token-bucket rate limiter paces sends; a per-run media-upload cache reuses each unique file across every group via Baileys' `prepareWAMessageMedia` + `relayMessage`; a window-end gate flips the run to `paused` (un-sent targets stay `pending`); a `resumeReminderRunAction` re-enters fire-reminder with the existing runId so the loop picks up only the still-pending targets.
|
||
|
||
**Tech Stack:** Node.js + TypeScript, Baileys 7.0.0-rc10 (`prepareWAMessageMedia`, `relayMessage`, `generateMessageID`), pg-boss v12 (`boss.work` with `teamSize`), Drizzle ORM + Postgres, vitest.
|
||
|
||
---
|
||
|
||
## File Structure
|
||
|
||
| File | Role |
|
||
| --- | --- |
|
||
| `packages/db/migrations/0008_*.sql` (generated) | add `delivery_window_start_hour`, `delivery_window_end_hour` to `reminders` |
|
||
| `packages/db/src/schema.ts` | drizzle alignment for the two new columns |
|
||
| `apps/bot/src/env.ts` | three new env vars: `BOT_FIRE_CONCURRENCY`, `BOT_GROUP_CONCURRENCY`, `BOT_MAX_SEND_PER_MINUTE` |
|
||
| `apps/bot/src/scheduler/per-key-mutex.ts` (new) | accountId-keyed async mutex |
|
||
| `apps/bot/src/scheduler/per-key-mutex.test.ts` (new) | unit tests |
|
||
| `apps/bot/src/scheduler/rate-limiter.ts` (new) | per-account token bucket |
|
||
| `apps/bot/src/scheduler/rate-limiter.test.ts` (new) | fake-clock unit tests |
|
||
| `packages/shared/src/delivery-window.ts` (new) | pure window-end calculator (shared bot+web) |
|
||
| `packages/shared/src/delivery-window.test.ts` (new) | unit tests |
|
||
| `apps/bot/src/scheduler/media-upload-cache.ts` (new) | `prepareWAMessageMedia` results, keyed by mediaId |
|
||
| `apps/bot/src/scheduler/media-upload-cache.test.ts` (new) | mock-socket unit tests |
|
||
| `apps/bot/src/scheduler/fire-reminder.ts` (rewrite) | new loop using all of the above; accepts optional `runId` for resume |
|
||
| `apps/bot/src/scheduler/reminder-jobs.ts` | pass `teamSize` config |
|
||
| `apps/web/src/actions/reminders.ts` | accept the two new fields; add `resumeReminderRunAction`, `cancelReminderRunAction` |
|
||
| `apps/web/src/components/reminder-wizard/when-form-client.tsx` | "Delivery hours" inputs |
|
||
| `apps/web/src/components/reminder-edit/edit-when-form.tsx` | same |
|
||
| `apps/web/src/lib/run-eta.ts` (new) | pure ETA calculator |
|
||
| `apps/web/src/components/reminder-wizard/run-eta-pill.tsx` (new) | green/amber ETA pill |
|
||
| `apps/web/src/components/reminder-detail/paused-run-banner.tsx` (new) | Resume / Cancel run banner on detail page |
|
||
| `apps/web/src/lib/notifications.ts` | paused + partial notification body extension |
|
||
|
||
---
|
||
|
||
## Task 1: Schema migration — delivery window columns
|
||
|
||
**Files:**
|
||
- Modify: `packages/db/src/schema.ts` (lines around the `reminders` table — add 2 columns)
|
||
- Generate: `packages/db/migrations/0008_<name>.sql`
|
||
|
||
- [ ] **Step 1: Edit `packages/db/src/schema.ts` — add the two columns to `reminders`**
|
||
|
||
Find the `reminders` table block and append the two new columns just before the closing `});`. The existing block ends with `lastFiredAt: timestamp(...)`. Add:
|
||
|
||
```ts
|
||
export const reminders = pgTable("reminders", {
|
||
id: uuid("id").primaryKey().defaultRandom(),
|
||
accountId: uuid("account_id").notNull().references(() => whatsappAccounts.id, { onDelete: "cascade" }),
|
||
name: text("name").notNull(),
|
||
scheduleKind: text("schedule_kind").notNull(),
|
||
scheduledAt: timestamp("scheduled_at", { withTimezone: true }),
|
||
rrule: text("rrule"),
|
||
timezone: text("timezone").notNull(),
|
||
endsAt: timestamp("ends_at", { withTimezone: true }),
|
||
maxRuns: integer("max_runs"),
|
||
status: text("status").notNull().default("active"),
|
||
createdBy: uuid("created_by").notNull().references(() => operators.id),
|
||
createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(),
|
||
updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(),
|
||
lastFiredAt: timestamp("last_fired_at", { withTimezone: true }),
|
||
// Delivery window — operator-supplied hours (in the row's timezone).
|
||
// Only the END hour is enforced at runtime in v1: the run loop stops
|
||
// sending once `now()` crosses today's end hour. The START hour is
|
||
// documented for the UI; not gated. See spec
|
||
// docs/superpowers/specs/2026-05-10-windowed-fanout-design.md.
|
||
deliveryWindowStartHour: integer("delivery_window_start_hour").notNull().default(6),
|
||
deliveryWindowEndHour: integer("delivery_window_end_hour").notNull().default(18),
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2: Generate the migration**
|
||
|
||
Run:
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/db generate
|
||
```
|
||
|
||
Expected: a file appears at `packages/db/migrations/0008_<name>.sql` containing `ALTER TABLE "reminders" ADD COLUMN "delivery_window_start_hour" integer DEFAULT 6 NOT NULL;` and a similar line for `delivery_window_end_hour`.
|
||
|
||
- [ ] **Step 3: Apply the migration**
|
||
|
||
Run:
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/db.sh migrate
|
||
```
|
||
|
||
Expected: `Migrations applied.` Verify with:
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/db build
|
||
```
|
||
|
||
Expected: clean tsc output (the @cmbot/db package re-builds so downstream consumers pick up the new columns).
|
||
|
||
- [ ] **Step 4: Typecheck the bot + web to confirm no schema-consumer regressions**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot typecheck
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web typecheck
|
||
```
|
||
|
||
Expected: both pass with no errors.
|
||
|
||
- [ ] **Step 5: Commit**
|
||
|
||
```bash
|
||
git add packages/db
|
||
git commit -m "feat(db): add delivery_window_start_hour / end_hour to reminders
|
||
|
||
Both default 6 / 18, stored as int (hour-of-day in the row's
|
||
timezone). Only the end hour is gated at runtime — see spec
|
||
docs/superpowers/specs/2026-05-10-windowed-fanout-design.md."
|
||
```
|
||
|
||
---
|
||
|
||
## Task 2: Bot env vars — concurrency + rate caps
|
||
|
||
**Files:**
|
||
- Modify: `apps/bot/src/env.ts`
|
||
|
||
- [ ] **Step 1: Replace `apps/bot/src/env.ts` contents**
|
||
|
||
Whole file:
|
||
|
||
```ts
|
||
import { z } from "zod";
|
||
|
||
const numberFromString = z.string().regex(/^\d+$/).transform((s) => Number(s));
|
||
|
||
const envSchema = z.object({
|
||
DATABASE_URL: z.string().url(),
|
||
DATA_DIR: z.string().min(1),
|
||
SESSIONS_DIR: z.string().min(1),
|
||
MEDIA_DIR: z.string().min(1),
|
||
BOT_HEALTH_PORT: numberFromString,
|
||
BOT_LOG_LEVEL: z.enum(["fatal", "error", "warn", "info", "debug", "trace"]).default("info"),
|
||
|
||
// Reminder fan-out tuning. Defaults aim for an established WhatsApp
|
||
// account (~30-60 msg/min safe band).
|
||
// - BOT_FIRE_CONCURRENCY: pg-boss worker pool size for the
|
||
// `reminder.fire` queue. Sets the max number of accounts that can
|
||
// run their fan-outs simultaneously. 8 is a sane default; raise if
|
||
// you have more paired accounts firing concurrently.
|
||
// - BOT_GROUP_CONCURRENCY: per-account parallel group sends. Each
|
||
// group's parts stay serial (preserves visible order in the chat).
|
||
// 3 has been empirically stable on real traffic; anything above 5
|
||
// without observation is asking for trouble.
|
||
// - BOT_MAX_SEND_PER_MINUTE: per-account token-bucket rate. 40 is the
|
||
// safe default; loosen to 60 only after a few weeks of running
|
||
// without flags. Tighten to 20 if WA returns rate-limit errors.
|
||
BOT_FIRE_CONCURRENCY: numberFromString.default("8"),
|
||
BOT_GROUP_CONCURRENCY: numberFromString.default("3"),
|
||
BOT_MAX_SEND_PER_MINUTE: numberFromString.default("40"),
|
||
});
|
||
|
||
export type Env = z.infer<typeof envSchema>;
|
||
|
||
export function parseEnv(input: Record<string, string | undefined>): Env {
|
||
return envSchema.parse(input);
|
||
}
|
||
|
||
export const env = parseEnv(process.env);
|
||
```
|
||
|
||
- [ ] **Step 2: Typecheck**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot typecheck
|
||
```
|
||
|
||
Expected: passes.
|
||
|
||
- [ ] **Step 3: Run the bot tests to confirm `parseEnv` defaults still validate**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run env
|
||
```
|
||
|
||
Expected: all `env.test.ts` tests pass (existing tests don't supply the new vars; defaults must kick in).
|
||
|
||
- [ ] **Step 4: Commit**
|
||
|
||
```bash
|
||
git add apps/bot/src/env.ts
|
||
git commit -m "feat(bot): add fan-out tuning env vars
|
||
|
||
BOT_FIRE_CONCURRENCY (8) — pg-boss worker pool, gates max accounts
|
||
running fan-outs simultaneously.
|
||
BOT_GROUP_CONCURRENCY (3) — per-account parallel group sends.
|
||
BOT_MAX_SEND_PER_MINUTE (40) — per-account rate-limit budget.
|
||
|
||
Defaults are tuned for an established WhatsApp account. See spec for
|
||
the safe band rationale."
|
||
```
|
||
|
||
---
|
||
|
||
## Task 3: PerKeyMutex (TDD)
|
||
|
||
**Files:**
|
||
- Create: `apps/bot/src/scheduler/per-key-mutex.test.ts`
|
||
- Create: `apps/bot/src/scheduler/per-key-mutex.ts`
|
||
|
||
- [ ] **Step 1: Write the failing test file at `apps/bot/src/scheduler/per-key-mutex.test.ts`**
|
||
|
||
```ts
|
||
import { describe, it, expect } from "vitest";
|
||
import { PerKeyMutex } from "./per-key-mutex.js";
|
||
|
||
/** Tiny clock-free helper: returns a Promise that resolves after
|
||
* `n` microtasks. Lets us check ordering without real timers. */
|
||
function tickN(n: number): Promise<void> {
|
||
let p: Promise<void> = Promise.resolve();
|
||
for (let i = 0; i < n; i++) p = p.then();
|
||
return p;
|
||
}
|
||
|
||
describe("PerKeyMutex", () => {
|
||
it("allows a single call against one key to run immediately", async () => {
|
||
const m = new PerKeyMutex();
|
||
const result = await m.run("k1", async () => 42);
|
||
expect(result).toBe(42);
|
||
});
|
||
|
||
it("serialises two calls against the same key", async () => {
|
||
const m = new PerKeyMutex();
|
||
const order: string[] = [];
|
||
|
||
const a = m.run("k1", async () => {
|
||
order.push("a-start");
|
||
await tickN(5);
|
||
order.push("a-end");
|
||
});
|
||
const b = m.run("k1", async () => {
|
||
order.push("b-start");
|
||
order.push("b-end");
|
||
});
|
||
|
||
await Promise.all([a, b]);
|
||
// b cannot start until a has finished.
|
||
expect(order).toEqual(["a-start", "a-end", "b-start", "b-end"]);
|
||
});
|
||
|
||
it("runs different keys in parallel", async () => {
|
||
const m = new PerKeyMutex();
|
||
const order: string[] = [];
|
||
|
||
const a = m.run("k1", async () => {
|
||
order.push("a-start");
|
||
await tickN(5);
|
||
order.push("a-end");
|
||
});
|
||
const b = m.run("k2", async () => {
|
||
order.push("b-start");
|
||
order.push("b-end");
|
||
});
|
||
|
||
await Promise.all([a, b]);
|
||
// b doesn't wait for a — interleaving expected.
|
||
expect(order[0]).toBe("a-start");
|
||
expect(order).toContain("b-start");
|
||
expect(order).toContain("b-end");
|
||
// b's pair lands before a's end.
|
||
expect(order.indexOf("b-end")).toBeLessThan(order.indexOf("a-end"));
|
||
});
|
||
|
||
it("releases the lock when the handler throws", async () => {
|
||
const m = new PerKeyMutex();
|
||
await expect(
|
||
m.run("k1", async () => {
|
||
throw new Error("boom");
|
||
}),
|
||
).rejects.toThrow("boom");
|
||
// Next call on the same key must NOT hang.
|
||
const result = await m.run("k1", async () => "after");
|
||
expect(result).toBe("after");
|
||
});
|
||
|
||
it("forwards the resolved value of the handler", async () => {
|
||
const m = new PerKeyMutex();
|
||
const out = await m.run("k1", async () => ({ ok: true, n: 7 }));
|
||
expect(out).toEqual({ ok: true, n: 7 });
|
||
});
|
||
|
||
it("cleans up internal state for keys with no waiters", async () => {
|
||
const m = new PerKeyMutex();
|
||
await m.run("k1", async () => {});
|
||
expect(m.activeKeyCount()).toBe(0);
|
||
});
|
||
|
||
it("retains a key while a chain is in flight, then drops it", async () => {
|
||
const m = new PerKeyMutex();
|
||
let release!: () => void;
|
||
const gate = new Promise<void>((r) => (release = r));
|
||
|
||
const inFlight = m.run("k1", () => gate);
|
||
expect(m.activeKeyCount()).toBe(1);
|
||
release();
|
||
await inFlight;
|
||
expect(m.activeKeyCount()).toBe(0);
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2: Run the failing test to confirm it fails for the right reason**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run per-key-mutex
|
||
```
|
||
|
||
Expected: FAIL with "Cannot find module './per-key-mutex.js'".
|
||
|
||
- [ ] **Step 3: Implement `apps/bot/src/scheduler/per-key-mutex.ts`**
|
||
|
||
```ts
|
||
/**
|
||
* Async mutex keyed by a string. Different keys run in parallel;
|
||
* same-key calls serialise.
|
||
*
|
||
* Used by fire-reminder so two reminders on the SAME WhatsApp account
|
||
* take turns (running them concurrently would double the effective
|
||
* send rate and risk a ban), while reminders on DIFFERENT accounts
|
||
* proceed in parallel.
|
||
*
|
||
* The implementation is a chain-per-key Promise: each call appends
|
||
* its work to the key's tail. Empty chains are cleaned up so the
|
||
* Map doesn't grow unbounded across the bot's lifetime.
|
||
*/
|
||
export class PerKeyMutex {
|
||
private chains = new Map<string, Promise<void>>();
|
||
|
||
/** Run `fn` exclusively against `key`. Returns whatever `fn` returns. */
|
||
async run<T>(key: string, fn: () => Promise<T>): Promise<T> {
|
||
const prev = this.chains.get(key) ?? Promise.resolve();
|
||
|
||
let release!: () => void;
|
||
const completion = new Promise<void>((r) => (release = r));
|
||
// The chain we publish is "what was already there + this completion".
|
||
// The next caller awaits THIS, so order is preserved.
|
||
const chained = prev.then(() => completion);
|
||
this.chains.set(key, chained);
|
||
|
||
try {
|
||
await prev;
|
||
return await fn();
|
||
} finally {
|
||
release();
|
||
// Drop the entry only if no later caller has appended in the
|
||
// meantime — otherwise we'd evict the in-flight chain.
|
||
if (this.chains.get(key) === chained) {
|
||
this.chains.delete(key);
|
||
}
|
||
}
|
||
}
|
||
|
||
/** Diagnostic: how many keys currently have an in-flight or queued chain. */
|
||
activeKeyCount(): number {
|
||
return this.chains.size;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Singleton mutex used by fire-reminder, keyed by accountId. Lives at
|
||
* module scope so multiple pg-boss workers in the same process share
|
||
* state.
|
||
*/
|
||
export const accountMutex = new PerKeyMutex();
|
||
```
|
||
|
||
- [ ] **Step 4: Run the test to confirm it passes**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run per-key-mutex
|
||
```
|
||
|
||
Expected: PASS, 7 tests.
|
||
|
||
- [ ] **Step 5: Commit**
|
||
|
||
```bash
|
||
git add apps/bot/src/scheduler/per-key-mutex.ts apps/bot/src/scheduler/per-key-mutex.test.ts
|
||
git commit -m "feat(bot): PerKeyMutex for accountId-keyed serialisation
|
||
|
||
Same key serialises, different keys run in parallel. Used by
|
||
fire-reminder to prevent two same-account fan-outs from doubling
|
||
the effective send rate (which would risk a WhatsApp ban). Chains
|
||
auto-clean empty entries so the Map doesn't leak."
|
||
```
|
||
|
||
---
|
||
|
||
## Task 4: TokenBucket rate limiter (TDD)
|
||
|
||
**Files:**
|
||
- Create: `apps/bot/src/scheduler/rate-limiter.test.ts`
|
||
- Create: `apps/bot/src/scheduler/rate-limiter.ts`
|
||
|
||
- [ ] **Step 1: Write the failing test file at `apps/bot/src/scheduler/rate-limiter.test.ts`**
|
||
|
||
```ts
|
||
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
|
||
import { TokenBucket, accountRateLimiter } from "./rate-limiter.js";
|
||
|
||
describe("TokenBucket", () => {
|
||
beforeEach(() => {
|
||
vi.useFakeTimers();
|
||
vi.setSystemTime(new Date("2026-05-10T10:00:00Z"));
|
||
});
|
||
afterEach(() => {
|
||
vi.useRealTimers();
|
||
});
|
||
|
||
it("starts full: first N=capacity acquires resolve immediately", async () => {
|
||
const b = new TokenBucket({ ratePerMinute: 60, capacity: 5 });
|
||
for (let i = 0; i < 5; i++) {
|
||
await b.acquire(); // should not stall
|
||
}
|
||
// The 6th is a different test — see below.
|
||
});
|
||
|
||
it("blocks the (capacity+1)th acquire until a token regenerates", async () => {
|
||
// 60/min = 1 token per second. Capacity 2.
|
||
const b = new TokenBucket({ ratePerMinute: 60, capacity: 2 });
|
||
await b.acquire();
|
||
await b.acquire();
|
||
|
||
let resolved = false;
|
||
const pending = b.acquire().then(() => {
|
||
resolved = true;
|
||
});
|
||
|
||
// Immediately after draining, no token available.
|
||
await Promise.resolve();
|
||
expect(resolved).toBe(false);
|
||
|
||
// Advance the clock by exactly 1 second — one token should be back.
|
||
await vi.advanceTimersByTimeAsync(1000);
|
||
await pending;
|
||
expect(resolved).toBe(true);
|
||
});
|
||
|
||
it("FIFO: pending acquires resolve in the order they arrived", async () => {
|
||
const b = new TokenBucket({ ratePerMinute: 60, capacity: 1 });
|
||
await b.acquire(); // bucket empty
|
||
|
||
const order: number[] = [];
|
||
const a = b.acquire().then(() => order.push(1));
|
||
const c = b.acquire().then(() => order.push(2));
|
||
|
||
// Two seconds → two tokens regenerate, in order.
|
||
await vi.advanceTimersByTimeAsync(2000);
|
||
await Promise.all([a, c]);
|
||
expect(order).toEqual([1, 2]);
|
||
});
|
||
|
||
it("does not over-fill past capacity even if the clock leaps forward", async () => {
|
||
const b = new TokenBucket({ ratePerMinute: 60, capacity: 3 });
|
||
// Drain.
|
||
await b.acquire();
|
||
await b.acquire();
|
||
await b.acquire();
|
||
// Leap an hour. Naive impl would credit 3600 tokens; we should cap at 3.
|
||
await vi.advanceTimersByTimeAsync(3_600_000);
|
||
// 3 immediate acquires should resolve.
|
||
await b.acquire();
|
||
await b.acquire();
|
||
await b.acquire();
|
||
// The 4th waits for fresh regeneration.
|
||
let resolved = false;
|
||
b.acquire().then(() => (resolved = true));
|
||
await Promise.resolve();
|
||
expect(resolved).toBe(false);
|
||
});
|
||
|
||
it("ratePerMinute=0 is rejected at construction (caller bug, fail loud)", () => {
|
||
expect(() => new TokenBucket({ ratePerMinute: 0, capacity: 1 })).toThrow();
|
||
});
|
||
});
|
||
|
||
describe("accountRateLimiter (singleton)", () => {
|
||
beforeEach(() => {
|
||
vi.useFakeTimers();
|
||
vi.setSystemTime(new Date("2026-05-10T10:00:00Z"));
|
||
});
|
||
afterEach(() => {
|
||
vi.useRealTimers();
|
||
});
|
||
|
||
it("returns the SAME bucket for repeated lookups of one accountId", () => {
|
||
const a1 = accountRateLimiter.get("acct-1");
|
||
const a2 = accountRateLimiter.get("acct-1");
|
||
expect(a1).toBe(a2);
|
||
});
|
||
|
||
it("returns DIFFERENT buckets for different accountIds (isolation)", () => {
|
||
const a = accountRateLimiter.get("acct-A");
|
||
const b = accountRateLimiter.get("acct-B");
|
||
expect(a).not.toBe(b);
|
||
});
|
||
|
||
it("a drained account A bucket does not block account B", async () => {
|
||
const a = accountRateLimiter.get("acct-A");
|
||
const b = accountRateLimiter.get("acct-B");
|
||
// Drain A entirely (capacity defaults to ratePerMinute, so we
|
||
// need to pull at least that many).
|
||
for (let i = 0; i < 40; i++) await a.acquire();
|
||
|
||
// B should still grant immediately.
|
||
let bResolved = false;
|
||
b.acquire().then(() => (bResolved = true));
|
||
await Promise.resolve();
|
||
expect(bResolved).toBe(true);
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2: Run the failing test**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run rate-limiter
|
||
```
|
||
|
||
Expected: FAIL with "Cannot find module './rate-limiter.js'".
|
||
|
||
- [ ] **Step 3: Implement `apps/bot/src/scheduler/rate-limiter.ts`**
|
||
|
||
```ts
|
||
import { env } from "../env.js";
|
||
|
||
/**
|
||
* Token bucket for per-account send pacing.
|
||
*
|
||
* Tokens regenerate at `ratePerMinute / 60` per second. Capacity caps
|
||
* how many can accumulate during idle periods (so the operator can't
|
||
* burst 1000 messages just because the account was quiet for a day).
|
||
*
|
||
* `acquire()` resolves when a token is available, FIFO across waiters.
|
||
* Used by fire-reminder to gate every `socket.sendMessage` call.
|
||
*/
|
||
export interface TokenBucketOptions {
|
||
ratePerMinute: number;
|
||
/** Defaults to ratePerMinute (one minute's worth). */
|
||
capacity?: number;
|
||
}
|
||
|
||
export class TokenBucket {
|
||
private readonly ratePerMs: number;
|
||
private readonly capacity: number;
|
||
private tokens: number;
|
||
private lastRefillMs: number;
|
||
private waiters: Array<() => void> = [];
|
||
|
||
constructor(opts: TokenBucketOptions) {
|
||
if (opts.ratePerMinute <= 0) {
|
||
throw new Error(`TokenBucket: ratePerMinute must be > 0, got ${opts.ratePerMinute}`);
|
||
}
|
||
this.ratePerMs = opts.ratePerMinute / 60_000;
|
||
this.capacity = opts.capacity ?? opts.ratePerMinute;
|
||
this.tokens = this.capacity;
|
||
this.lastRefillMs = Date.now();
|
||
}
|
||
|
||
/** Resolve when a token is available. FIFO across concurrent waiters. */
|
||
async acquire(): Promise<void> {
|
||
this.refill();
|
||
if (this.tokens >= 1 && this.waiters.length === 0) {
|
||
this.tokens -= 1;
|
||
return;
|
||
}
|
||
return new Promise<void>((resolve) => {
|
||
this.waiters.push(resolve);
|
||
this.scheduleNext();
|
||
});
|
||
}
|
||
|
||
private refill(): void {
|
||
const now = Date.now();
|
||
const elapsed = now - this.lastRefillMs;
|
||
if (elapsed <= 0) return;
|
||
const gained = elapsed * this.ratePerMs;
|
||
this.tokens = Math.min(this.capacity, this.tokens + gained);
|
||
this.lastRefillMs = now;
|
||
}
|
||
|
||
private scheduleNext(): void {
|
||
// Wait until at least one token is available, then drain waiters
|
||
// FIFO. We compute the gap from current fractional token deficit.
|
||
this.refill();
|
||
while (this.tokens >= 1 && this.waiters.length > 0) {
|
||
this.tokens -= 1;
|
||
const w = this.waiters.shift()!;
|
||
w();
|
||
}
|
||
if (this.waiters.length === 0) return;
|
||
|
||
const tokensShort = 1 - this.tokens;
|
||
const waitMs = Math.max(1, Math.ceil(tokensShort / this.ratePerMs));
|
||
setTimeout(() => this.scheduleNext(), waitMs);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Per-accountId TokenBucket registry. Each account gets its own
|
||
* pacing budget, so a slow account A never throttles account B.
|
||
*/
|
||
class AccountRateLimiter {
|
||
private buckets = new Map<string, TokenBucket>();
|
||
private ratePerMinute: number;
|
||
|
||
constructor(ratePerMinute: number) {
|
||
this.ratePerMinute = ratePerMinute;
|
||
}
|
||
|
||
get(accountId: string): TokenBucket {
|
||
let b = this.buckets.get(accountId);
|
||
if (!b) {
|
||
b = new TokenBucket({ ratePerMinute: this.ratePerMinute });
|
||
this.buckets.set(accountId, b);
|
||
}
|
||
return b;
|
||
}
|
||
}
|
||
|
||
export const accountRateLimiter = new AccountRateLimiter(env.BOT_MAX_SEND_PER_MINUTE);
|
||
```
|
||
|
||
- [ ] **Step 4: Run the tests**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run rate-limiter
|
||
```
|
||
|
||
Expected: PASS, 8 tests.
|
||
|
||
- [ ] **Step 5: Commit**
|
||
|
||
```bash
|
||
git add apps/bot/src/scheduler/rate-limiter.ts apps/bot/src/scheduler/rate-limiter.test.ts
|
||
git commit -m "feat(bot): per-account token-bucket rate limiter
|
||
|
||
TokenBucket gates each socket.sendMessage call. Tokens regenerate at
|
||
ratePerMinute/60 per second, capped at one minute's worth so quiet
|
||
accounts can't burst. FIFO drain across concurrent waiters.
|
||
|
||
accountRateLimiter (singleton) hands out one bucket per accountId, so
|
||
account A's drain never throttles account B. Default rate is
|
||
BOT_MAX_SEND_PER_MINUTE (40) — the safe band for an established
|
||
WhatsApp account."
|
||
```
|
||
|
||
---
|
||
|
||
## Task 5: Delivery window helper (TDD)
|
||
|
||
The helper lives in `packages/shared` (NOT the bot) because both bundles need it: bot's fire-reminder loop gates on it, and web's ETA pill (Task 9) compares ETA against it to flip the green/amber state.
|
||
|
||
**Files:**
|
||
- Create: `packages/shared/src/delivery-window.test.ts`
|
||
- Create: `packages/shared/src/delivery-window.ts`
|
||
- Modify: `packages/shared/src/index.ts` (re-export `windowEndAt`)
|
||
|
||
- [ ] **Step 1: Write the failing test file at `packages/shared/src/delivery-window.test.ts`**
|
||
|
||
```ts
|
||
import { describe, it, expect } from "vitest";
|
||
import { windowEndAt } from "./delivery-window.js";
|
||
|
||
const TZ = "Asia/Kuala_Lumpur"; // UTC+8
|
||
|
||
describe("windowEndAt", () => {
|
||
it("returns today's end-hour boundary in the given timezone", () => {
|
||
// Fire at 2026-05-10 10:00 KL == 02:00 UTC. End hour 18 == 18:00 KL == 10:00 UTC.
|
||
const fireAt = new Date("2026-05-10T02:00:00.000Z");
|
||
const out = windowEndAt(TZ, 18, fireAt);
|
||
expect(out.toISOString()).toBe("2026-05-10T10:00:00.000Z");
|
||
});
|
||
|
||
it("returns a past timestamp when fireAt is already after the end hour", () => {
|
||
// Fire at 2026-05-10 19:00 KL == 11:00 UTC. End hour 18 → today's 18:00 KL == 10:00 UTC.
|
||
// That's BEFORE fireAt. The caller's first window-gate check trips immediately.
|
||
const fireAt = new Date("2026-05-10T11:00:00.000Z");
|
||
const out = windowEndAt(TZ, 18, fireAt);
|
||
expect(out.toISOString()).toBe("2026-05-10T10:00:00.000Z");
|
||
expect(out.getTime()).toBeLessThan(fireAt.getTime());
|
||
});
|
||
|
||
it("respects the timezone (UTC+0 vs UTC+8)", () => {
|
||
const fireAt = new Date("2026-05-10T02:00:00.000Z");
|
||
const inUtc = windowEndAt("UTC", 18, fireAt);
|
||
expect(inUtc.toISOString()).toBe("2026-05-10T18:00:00.000Z");
|
||
const inKl = windowEndAt("Asia/Kuala_Lumpur", 18, fireAt);
|
||
expect(inKl.toISOString()).toBe("2026-05-10T10:00:00.000Z");
|
||
});
|
||
|
||
it("handles end hour 24 as midnight of the same calendar day's end", () => {
|
||
// 2026-05-10 in KL ends at 2026-05-11 00:00 KL == 2026-05-10 16:00 UTC.
|
||
const fireAt = new Date("2026-05-10T02:00:00.000Z");
|
||
const out = windowEndAt(TZ, 24, fireAt);
|
||
expect(out.toISOString()).toBe("2026-05-10T16:00:00.000Z");
|
||
});
|
||
|
||
it("DST transition day stays on the SAME calendar day (no skipping forward)", () => {
|
||
// US/Eastern has DST. On 2026-03-08 (DST starts), 18:00 EDT is a real time.
|
||
// Fire at 2026-03-08 10:00 EST (15:00 UTC). End at 2026-03-08 18:00 EDT (22:00 UTC).
|
||
const fireAt = new Date("2026-03-08T15:00:00.000Z");
|
||
const out = windowEndAt("America/New_York", 18, fireAt);
|
||
expect(out.toISOString()).toBe("2026-03-08T22:00:00.000Z");
|
||
});
|
||
|
||
it("rejects end hour outside 0..24", () => {
|
||
const fireAt = new Date("2026-05-10T00:00:00Z");
|
||
expect(() => windowEndAt(TZ, -1, fireAt)).toThrow();
|
||
expect(() => windowEndAt(TZ, 25, fireAt)).toThrow();
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2: Run the failing test**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/shared test -- --run delivery-window
|
||
```
|
||
|
||
Expected: FAIL with "Cannot find module './delivery-window.js'".
|
||
|
||
- [ ] **Step 3: Implement `packages/shared/src/delivery-window.ts`**
|
||
|
||
```ts
|
||
import { DateTime } from "luxon";
|
||
|
||
/**
|
||
* Returns the end-of-window timestamp for the calendar day `fireAt`
|
||
* falls on, in the operator's timezone.
|
||
*
|
||
* windowEndAt("Asia/Kuala_Lumpur", 18, fireAt)
|
||
* → today's 18:00 KL (which may be in the past if fireAt is already
|
||
* past 18:00 KL — caller's first window-gate fires immediately).
|
||
*
|
||
* `endHour` is 0..24. Hour 24 is treated as midnight of the next
|
||
* calendar day (i.e. "end of today" inclusive).
|
||
*
|
||
* Pure: no I/O, no Date.now() reads, no clock dependency. Easy to
|
||
* test with fixture inputs.
|
||
*/
|
||
export function windowEndAt(
|
||
timezone: string,
|
||
endHour: number,
|
||
fireAt: Date,
|
||
): Date {
|
||
if (!Number.isInteger(endHour) || endHour < 0 || endHour > 24) {
|
||
throw new Error(`windowEndAt: endHour must be 0..24, got ${endHour}`);
|
||
}
|
||
|
||
const dt = DateTime.fromJSDate(fireAt).setZone(timezone);
|
||
if (!dt.isValid) {
|
||
throw new Error(`windowEndAt: invalid timezone "${timezone}"`);
|
||
}
|
||
|
||
// For hour 24, "end of day" is the next midnight. Luxon's `set` with
|
||
// hour=24 normalises into hour=0 of the next day, which is exactly
|
||
// what we want.
|
||
const end = dt.set({ hour: endHour, minute: 0, second: 0, millisecond: 0 });
|
||
return end.toJSDate();
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4: Re-export from `packages/shared/src/index.ts`**
|
||
|
||
Append:
|
||
|
||
```ts
|
||
export { windowEndAt } from "./delivery-window.js";
|
||
```
|
||
|
||
- [ ] **Step 5: Run the tests**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/shared test -- --run delivery-window
|
||
```
|
||
|
||
Expected: PASS, 6 tests.
|
||
|
||
- [ ] **Step 6: Commit**
|
||
|
||
```bash
|
||
git add packages/shared/src/delivery-window.ts packages/shared/src/delivery-window.test.ts packages/shared/src/index.ts
|
||
git commit -m "feat(shared): pure delivery-window end calculator
|
||
|
||
windowEndAt(timezone, endHour, fireAt) returns the end-of-window for
|
||
the day fireAt is on. If fireAt is already past, the result is a
|
||
past timestamp — the run loop's first window gate trips immediately
|
||
and the entire run resolves as failed (zero sent), which is the
|
||
right behaviour for 'we can't send after window close'."
|
||
```
|
||
|
||
---
|
||
|
||
## Task 6: MediaUploadCache (TDD with mock socket)
|
||
|
||
**Files:**
|
||
- Create: `apps/bot/src/scheduler/media-upload-cache.test.ts`
|
||
- Create: `apps/bot/src/scheduler/media-upload-cache.ts`
|
||
|
||
- [ ] **Step 1: Write the failing test file at `apps/bot/src/scheduler/media-upload-cache.test.ts`**
|
||
|
||
```ts
|
||
import { describe, it, expect, vi } from "vitest";
|
||
import { MediaUploadCache } from "./media-upload-cache.js";
|
||
|
||
describe("MediaUploadCache", () => {
|
||
it("uploads each unique mediaId exactly once across N gets", async () => {
|
||
const prepare = vi.fn(async (mediaId: string) => ({
|
||
kind: "prepared",
|
||
mediaId,
|
||
}));
|
||
const cache = new MediaUploadCache(prepare);
|
||
|
||
const a1 = await cache.get("media-A");
|
||
const a2 = await cache.get("media-A");
|
||
const b1 = await cache.get("media-B");
|
||
|
||
expect(prepare).toHaveBeenCalledTimes(2);
|
||
expect(prepare).toHaveBeenCalledWith("media-A");
|
||
expect(prepare).toHaveBeenCalledWith("media-B");
|
||
// Same handle returned for repeated lookups of A.
|
||
expect(a1).toBe(a2);
|
||
expect(a1).not.toBe(b1);
|
||
});
|
||
|
||
it("coalesces concurrent gets of the same mediaId into ONE prepare call", async () => {
|
||
let resolveA: (v: unknown) => void = () => {};
|
||
const aPromise = new Promise((r) => (resolveA = r));
|
||
const prepare = vi.fn(async (mediaId: string) => {
|
||
if (mediaId === "media-A") return aPromise;
|
||
return { kind: "prepared", mediaId };
|
||
});
|
||
const cache = new MediaUploadCache(prepare);
|
||
|
||
const p1 = cache.get("media-A");
|
||
const p2 = cache.get("media-A");
|
||
const p3 = cache.get("media-A");
|
||
|
||
// Resolve the in-flight prepare.
|
||
resolveA({ kind: "prepared", mediaId: "media-A" });
|
||
|
||
const [r1, r2, r3] = await Promise.all([p1, p2, p3]);
|
||
expect(prepare).toHaveBeenCalledTimes(1); // de-duplicated
|
||
expect(r1).toBe(r2);
|
||
expect(r2).toBe(r3);
|
||
});
|
||
|
||
it("a thrown prepare is NOT cached — next get retries", async () => {
|
||
let attempt = 0;
|
||
const prepare = vi.fn(async (_mediaId: string) => {
|
||
attempt++;
|
||
if (attempt === 1) throw new Error("upload network blip");
|
||
return { kind: "prepared", attempt };
|
||
});
|
||
const cache = new MediaUploadCache(prepare);
|
||
|
||
await expect(cache.get("media-A")).rejects.toThrow("upload network blip");
|
||
// Second attempt must call prepare again — DON'T cache failures.
|
||
const r = await cache.get("media-A");
|
||
expect(prepare).toHaveBeenCalledTimes(2);
|
||
expect(r).toEqual({ kind: "prepared", attempt: 2 });
|
||
});
|
||
|
||
it("size() reflects the number of cached unique mediaIds", async () => {
|
||
const prepare = async (mediaId: string) => ({ mediaId });
|
||
const cache = new MediaUploadCache(prepare);
|
||
expect(cache.size()).toBe(0);
|
||
await cache.get("a");
|
||
expect(cache.size()).toBe(1);
|
||
await cache.get("b");
|
||
expect(cache.size()).toBe(2);
|
||
await cache.get("a"); // already cached
|
||
expect(cache.size()).toBe(2);
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2: Run the failing test**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run media-upload-cache
|
||
```
|
||
|
||
Expected: FAIL with "Cannot find module './media-upload-cache.js'".
|
||
|
||
- [ ] **Step 3: Implement `apps/bot/src/scheduler/media-upload-cache.ts`**
|
||
|
||
```ts
|
||
/**
|
||
* Per-run cache of `prepareWAMessageMedia` results, keyed by
|
||
* `mediaId`. The point: when a reminder fans out to 1000 groups with
|
||
* one image, we want to upload that image to WhatsApp's CDN ONCE, not
|
||
* 1000 times. Subsequent group sends reuse the prepared message
|
||
* (with embedded directPath / mediaKey) via socket.relayMessage.
|
||
*
|
||
* Lifecycle: one cache instance per fire-reminder run. After the run
|
||
* completes, the cache is dropped — we don't share uploads across
|
||
* runs because WA media tokens are short-lived.
|
||
*
|
||
* Concurrent gets of the same mediaId are coalesced into a single
|
||
* prepare call. Failed prepares are NOT cached so the next attempt
|
||
* retries (network blips at upload time shouldn't poison the cache).
|
||
*/
|
||
export class MediaUploadCache<T> {
|
||
private readonly prepare: (mediaId: string) => Promise<T>;
|
||
private readonly entries = new Map<string, Promise<T>>();
|
||
|
||
constructor(prepare: (mediaId: string) => Promise<T>) {
|
||
this.prepare = prepare;
|
||
}
|
||
|
||
async get(mediaId: string): Promise<T> {
|
||
const existing = this.entries.get(mediaId);
|
||
if (existing) return existing;
|
||
|
||
const inflight = this.prepare(mediaId);
|
||
// Insert eagerly so concurrent gets dedupe.
|
||
this.entries.set(mediaId, inflight);
|
||
|
||
try {
|
||
return await inflight;
|
||
} catch (err) {
|
||
// Don't cache failures — the next caller should retry.
|
||
this.entries.delete(mediaId);
|
||
throw err;
|
||
}
|
||
}
|
||
|
||
size(): number {
|
||
return this.entries.size;
|
||
}
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4: Run the tests**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run media-upload-cache
|
||
```
|
||
|
||
Expected: PASS, 4 tests.
|
||
|
||
- [ ] **Step 5: Commit**
|
||
|
||
```bash
|
||
git add apps/bot/src/scheduler/media-upload-cache.ts apps/bot/src/scheduler/media-upload-cache.test.ts
|
||
git commit -m "feat(bot): MediaUploadCache for once-per-run media prepare
|
||
|
||
One cache instance per fire-reminder run. Each unique mediaId gets
|
||
prepared (uploaded to WA CDN) exactly once, and subsequent group
|
||
sends within the run reuse the prepared message via relayMessage.
|
||
Concurrent gets coalesce into a single prepare. Failed prepares
|
||
don't poison the cache — next caller retries."
|
||
```
|
||
|
||
---
|
||
|
||
## Task 7: Rewrite `fire-reminder.ts` and bump pg-boss `teamSize`
|
||
|
||
**Files:**
|
||
- Modify: `apps/bot/src/scheduler/reminder-jobs.ts`
|
||
- Rewrite: `apps/bot/src/scheduler/fire-reminder.ts`
|
||
|
||
- [ ] **Step 1: Update `apps/bot/src/scheduler/reminder-jobs.ts` — pass `teamSize`**
|
||
|
||
Whole file:
|
||
|
||
```ts
|
||
import type { PgBoss } from "pg-boss";
|
||
import { logger } from "../logger.js";
|
||
import { env } from "../env.js";
|
||
import { fireReminder, type FireReminderPayload } from "./fire-reminder.js";
|
||
|
||
export const REMINDER_FIRE_QUEUE = "reminder.fire";
|
||
|
||
export async function registerReminderJobs(boss: PgBoss): Promise<void> {
|
||
await boss.createQueue(REMINDER_FIRE_QUEUE);
|
||
await boss.work<FireReminderPayload>(
|
||
REMINDER_FIRE_QUEUE,
|
||
{
|
||
// Up to BOT_FIRE_CONCURRENCY jobs in flight at once. Combined with
|
||
// the per-account mutex inside fireReminder, this lets reminders
|
||
// on DIFFERENT accounts run in parallel while same-account
|
||
// reminders take turns.
|
||
teamSize: env.BOT_FIRE_CONCURRENCY,
|
||
teamConcurrency: 1,
|
||
},
|
||
async (jobs) => {
|
||
const job = jobs[0];
|
||
if (!job) return;
|
||
logger.debug({ jobId: job.id, payload: job.data }, "reminder.fire: handling");
|
||
await fireReminder(job.data);
|
||
},
|
||
);
|
||
logger.info(
|
||
{ teamSize: env.BOT_FIRE_CONCURRENCY },
|
||
"reminder.fire: handler registered",
|
||
);
|
||
}
|
||
|
||
export async function scheduleReminderFire(
|
||
boss: PgBoss,
|
||
reminderId: string,
|
||
scheduledAt: Date,
|
||
): Promise<string | null> {
|
||
const id = await boss.send(
|
||
REMINDER_FIRE_QUEUE,
|
||
{ reminderId },
|
||
{
|
||
startAfter: scheduledAt,
|
||
retryLimit: 3,
|
||
retryDelay: 30,
|
||
retryBackoff: true,
|
||
// Use the reminderId as a singleton key so re-scheduling cancels the old job
|
||
singletonKey: `reminder:${reminderId}`,
|
||
},
|
||
);
|
||
logger.info({ reminderId, jobId: id, scheduledAt }, "reminder.fire: scheduled");
|
||
return id;
|
||
}
|
||
|
||
export async function cancelReminderFire(_boss: PgBoss, reminderId: string): Promise<void> {
|
||
// Soft cancel: pg-boss doesn't expose a clean cancel-by-singleton API in v12.
|
||
// The scheduled job will still fire, but `fireReminder` exits early when the
|
||
// reminder row is gone. Hard cancel can be added later by storing the jobId.
|
||
logger.info({ reminderId }, "reminder.fire: cancel requested (soft, fizzles on fire)");
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Rewrite `apps/bot/src/scheduler/fire-reminder.ts`**
|
||
|
||
Whole file:
|
||
|
||
```ts
|
||
import { and, eq, inArray } from "drizzle-orm";
|
||
import { reminderRuns, reminderRunTargets, reminders } from "@cmbot/db";
|
||
import {
|
||
generateWAMessageContent,
|
||
generateMessageID,
|
||
type AnyMessageContent,
|
||
type proto,
|
||
} from "@whiskeysockets/baileys";
|
||
import pLimit from "p-limit";
|
||
import { db } from "../db.js";
|
||
import { logger } from "../logger.js";
|
||
import { sessionManager } from "../whatsapp/session-manager.js";
|
||
import { absoluteMediaPath, nextOccurrence, resolveDeliveryKind } from "@cmbot/shared";
|
||
import { open as fsOpen, readFile } from "node:fs/promises";
|
||
import { env } from "../env.js";
|
||
import { writeAuditLog } from "../audit.js";
|
||
import { getReminderWithDetails } from "../reminders/crud.js";
|
||
import { getBoss } from "./pgboss-client.js";
|
||
import { scheduleReminderFire } from "./reminder-jobs.js";
|
||
import { pgNotifyWeb } from "../ipc/notify.js";
|
||
import { accountMutex } from "./per-key-mutex.js";
|
||
import { accountRateLimiter } from "./rate-limiter.js";
|
||
import { windowEndAt } from "@cmbot/shared";
|
||
import { MediaUploadCache } from "./media-upload-cache.js";
|
||
|
||
export type FireReminderPayload = {
|
||
reminderId: string;
|
||
/** Optional resume hook. When present, fire-reminder ATTACHES to
|
||
* the existing run instead of creating a new one, and only
|
||
* re-attempts targets in `pending` status. Set by the resume
|
||
* server action. */
|
||
runId?: string;
|
||
};
|
||
|
||
/** Read the first N bytes of a file (used to sniff HEIF/AVIF/MOV brands). */
|
||
async function readHeadBytes(filePath: string, n: number): Promise<Uint8Array> {
|
||
const fh = await fsOpen(filePath, "r");
|
||
try {
|
||
const buf = new Uint8Array(n);
|
||
await fh.read(buf, 0, n, 0);
|
||
return buf;
|
||
} finally {
|
||
await fh.close();
|
||
}
|
||
}
|
||
|
||
/** Random delay between same-group message parts (ms). Just enough for
|
||
* visible ordering in the chat at WA's natural pace. */
|
||
function partJitterMs(): number {
|
||
return 200 + Math.floor(Math.random() * 300); // 200..499
|
||
}
|
||
|
||
export async function fireReminder(payload: FireReminderPayload): Promise<void> {
|
||
const reminder = await getReminderWithDetails(payload.reminderId);
|
||
if (!reminder) {
|
||
logger.warn({ reminderId: payload.reminderId }, "fire-reminder: reminder not found");
|
||
return;
|
||
}
|
||
if (reminder.status !== "active") {
|
||
logger.info({ reminderId: reminder.id, status: reminder.status }, "fire-reminder: skipping (not active)");
|
||
return;
|
||
}
|
||
|
||
// Per-account mutex: two reminders on the SAME account take turns.
|
||
// Different accounts run in parallel (cross-account isolation).
|
||
await accountMutex.run(reminder.accountId, () => fireReminderInner(reminder, payload.runId));
|
||
}
|
||
|
||
async function fireReminderInner(
|
||
reminder: NonNullable<Awaited<ReturnType<typeof getReminderWithDetails>>>,
|
||
resumeRunId?: string,
|
||
): Promise<void> {
|
||
// Resume path: attach to the existing run; fresh path: create one.
|
||
let runId: string;
|
||
if (resumeRunId) {
|
||
const existing = await db.query.reminderRuns.findFirst({
|
||
where: (r, { eq }) => eq(r.id, resumeRunId),
|
||
});
|
||
if (!existing) {
|
||
logger.warn({ reminderId: reminder.id, resumeRunId }, "fire-reminder: resume target run missing");
|
||
return;
|
||
}
|
||
runId = existing.id;
|
||
// Re-mark as in-flight so the UI shows the run is no longer paused.
|
||
await db
|
||
.update(reminderRuns)
|
||
.set({ status: "pending", errorSummary: null })
|
||
.where(eq(reminderRuns.id, runId));
|
||
} else {
|
||
const [run] = await db
|
||
.insert(reminderRuns)
|
||
.values({
|
||
reminderId: reminder.id,
|
||
reminderName: reminder.name,
|
||
status: "pending",
|
||
})
|
||
.returning({ id: reminderRuns.id });
|
||
runId = run!.id;
|
||
}
|
||
|
||
const session = sessionManager.getSession(reminder.accountId);
|
||
if (!session) {
|
||
logger.warn({ reminderId: reminder.id }, "fire-reminder: account not connected");
|
||
await markAllSkipped(runId, reminder, "account not connected");
|
||
await db.update(reminderRuns).set({ status: "skipped", errorSummary: "account not connected" }).where(eq(reminderRuns.id, runId));
|
||
await pgNotifyWeb({ type: "reminder.fired", reminderId: reminder.id, runId, status: "skipped" });
|
||
return;
|
||
}
|
||
|
||
// Up-front: load all groups + media rows in TWO bulk queries.
|
||
const groupIds = reminder.targets.map((t) => t.groupId);
|
||
const groupRows = groupIds.length
|
||
? await db.query.whatsappGroups.findMany({ where: (g) => inArray(g.id, groupIds) })
|
||
: [];
|
||
const groupById = new Map(groupRows.map((g) => [g.id, g]));
|
||
|
||
const mediaIds = Array.from(
|
||
new Set(reminder.messages.map((m) => m.mediaId).filter((id): id is string => Boolean(id))),
|
||
);
|
||
const mediaRows = mediaIds.length
|
||
? await db.query.mediaFiles.findMany({ where: (m) => inArray(m.id, mediaIds) })
|
||
: [];
|
||
const mediaById = new Map(mediaRows.map((m) => [m.id, m]));
|
||
|
||
// Pre-create run_targets rows so progress is observable mid-run.
|
||
// On a RESUME, the rows already exist — only the original fire path
|
||
// inserts them. The resume path skips this; the loop below filters
|
||
// to only the still-pending rows.
|
||
if (!resumeRunId && reminder.targets.length > 0) {
|
||
await db.insert(reminderRunTargets).values(
|
||
reminder.targets.map((t) => ({
|
||
runId,
|
||
groupId: t.groupId,
|
||
groupLabel: groupById.get(t.groupId)?.name ?? null,
|
||
status: "pending" as const,
|
||
})),
|
||
);
|
||
}
|
||
|
||
// On a RESUME, only the still-pending targets need attention. On
|
||
// a fresh fire, every target is pending. Either way we read the
|
||
// current run_target rows from the DB to be the source of truth
|
||
// about what's left to do.
|
||
const pendingRows = await db.query.reminderRunTargets.findMany({
|
||
where: (t, { eq, and: drizzleAnd }) =>
|
||
drizzleAnd(eq(t.runId, runId), eq(t.status, "pending")),
|
||
});
|
||
const pendingGroupIds = new Set(pendingRows.map((r) => r.groupId));
|
||
const targetsToProcess = reminder.targets.filter((t) =>
|
||
pendingGroupIds.has(t.groupId),
|
||
);
|
||
|
||
// Already-sent count from prior run (so the final tally adds to total).
|
||
const priorSentCount = resumeRunId
|
||
? (
|
||
await db.query.reminderRunTargets.findMany({
|
||
where: (t, { eq, and: drizzleAnd }) =>
|
||
drizzleAnd(eq(t.runId, runId), eq(t.status, "sent")),
|
||
})
|
||
).length
|
||
: 0;
|
||
const priorFailedCount = resumeRunId
|
||
? (
|
||
await db.query.reminderRunTargets.findMany({
|
||
where: (t, { eq, and: drizzleAnd }) =>
|
||
drizzleAnd(eq(t.runId, runId), eq(t.status, "failed")),
|
||
})
|
||
).length
|
||
: 0;
|
||
|
||
// Window-end timestamp. If the reminder fires AFTER today's end-hour
|
||
// (e.g. cron miss-fired late) this is in the past — every iteration
|
||
// will trip the gate and the run resolves as failed.
|
||
const windowEnd = windowEndAt(
|
||
reminder.timezone,
|
||
reminder.deliveryWindowEndHour,
|
||
new Date(),
|
||
);
|
||
|
||
// Per-run media upload cache. Each unique mediaId is prepared via
|
||
// generateWAMessageContent ONCE (which uploads to WA's CDN); the
|
||
// resulting proto.Message is reused for every group via relayMessage.
|
||
// socket.waUploadToServer is the upload helper Baileys exposes.
|
||
const uploadCache = new MediaUploadCache<proto.Message>(async (mediaId) => {
|
||
const media = mediaById.get(mediaId);
|
||
if (!media) throw new Error(`media row missing: ${mediaId}`);
|
||
const filePath = absoluteMediaPath(media.storagePath, env.MEDIA_DIR);
|
||
const buffer = await readFile(filePath);
|
||
const head = buffer.subarray(0, 12);
|
||
const resolved = resolveDeliveryKind(media.mimeType, head);
|
||
const senderKind: "image" | "video" | "document" =
|
||
resolved === "image" || resolved === "video" ? resolved : "document";
|
||
const content: AnyMessageContent =
|
||
senderKind === "image"
|
||
? { image: buffer, mimetype: media.mimeType }
|
||
: senderKind === "video"
|
||
? { video: buffer, mimetype: media.mimeType }
|
||
: { document: buffer, fileName: media.filenameOriginal, mimetype: media.mimeType };
|
||
return generateWAMessageContent(content, {
|
||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||
upload: (session.socket as any).waUploadToServer,
|
||
});
|
||
});
|
||
|
||
// Per-account rate limiter — gates each socket.sendMessage / relayMessage call.
|
||
const rateLimiter = accountRateLimiter.get(reminder.accountId);
|
||
|
||
let sentCount = 0;
|
||
let failedCount = 0;
|
||
let skippedCount = 0;
|
||
let windowClosed = false;
|
||
|
||
const groupConcurrency = pLimit(env.BOT_GROUP_CONCURRENCY);
|
||
|
||
await Promise.all(
|
||
targetsToProcess.map((target) =>
|
||
groupConcurrency(async () => {
|
||
// Window-end gate. CRITICAL: leave the row as `pending` (NOT
|
||
// `skipped`) so the run can be resumed later. The run as a
|
||
// whole flips to `paused` after this loop.
|
||
if (Date.now() >= windowEnd.getTime()) {
|
||
windowClosed = true;
|
||
// Don't touch the row — it's already `pending`. Just count.
|
||
return;
|
||
}
|
||
|
||
const group = groupById.get(target.groupId);
|
||
if (!group) {
|
||
await db
|
||
.update(reminderRunTargets)
|
||
.set({ status: "skipped", error: "group missing from db" })
|
||
.where(
|
||
and(
|
||
eq(reminderRunTargets.runId, runId),
|
||
eq(reminderRunTargets.groupId, target.groupId),
|
||
),
|
||
);
|
||
skippedCount++;
|
||
return;
|
||
}
|
||
|
||
const start = Date.now();
|
||
try {
|
||
let lastMessageId: string | undefined;
|
||
for (const part of reminder.messages) {
|
||
await rateLimiter.acquire();
|
||
if (part.kind === "text" && part.textContent) {
|
||
const r = await session.socket.sendMessage(group.waGroupJid, {
|
||
text: part.textContent,
|
||
});
|
||
lastMessageId = r?.key?.id ?? undefined;
|
||
} else if (part.mediaId) {
|
||
const prebuilt = await uploadCache.get(part.mediaId);
|
||
// Inject the caption (if any) just before relaying — the
|
||
// prebuilt content carries the media but each relay uses
|
||
// a fresh messageId.
|
||
if (part.textContent) {
|
||
injectCaption(prebuilt, part.textContent);
|
||
}
|
||
const messageId = generateMessageID();
|
||
await session.socket.relayMessage(group.waGroupJid, prebuilt, { messageId });
|
||
lastMessageId = messageId;
|
||
}
|
||
await new Promise((r) => setTimeout(r, partJitterMs()));
|
||
}
|
||
await db
|
||
.update(reminderRunTargets)
|
||
.set({
|
||
status: "sent",
|
||
waMessageId: lastMessageId ?? null,
|
||
latencyMs: Date.now() - start,
|
||
})
|
||
.where(
|
||
and(
|
||
eq(reminderRunTargets.runId, runId),
|
||
eq(reminderRunTargets.groupId, target.groupId),
|
||
),
|
||
);
|
||
sentCount++;
|
||
} catch (err) {
|
||
logger.error(
|
||
{ err, reminderId: reminder.id, groupId: target.groupId },
|
||
"fire-reminder: send failed",
|
||
);
|
||
await db
|
||
.update(reminderRunTargets)
|
||
.set({ status: "failed", error: (err as Error).message })
|
||
.where(
|
||
and(
|
||
eq(reminderRunTargets.runId, runId),
|
||
eq(reminderRunTargets.groupId, target.groupId),
|
||
),
|
||
);
|
||
failedCount++;
|
||
}
|
||
}),
|
||
),
|
||
);
|
||
|
||
// Final status. The four shapes:
|
||
// - paused : window closed with at least one row STILL pending.
|
||
// Resumable. Sent rows stay sent, pending stays pending.
|
||
// - success : every target sent (no failures, no pending).
|
||
// - partial : every target was attempted; some sent, some failed
|
||
// or skipped. NOT resumable; failures are real.
|
||
// - failed : zero sent. Either every send errored, or the window
|
||
// was already closed when the run began (nothing
|
||
// attempted, but no pending-with-progress to resume).
|
||
const total = reminder.targets.length;
|
||
const totalSent = priorSentCount + sentCount;
|
||
const totalFailed = priorFailedCount + failedCount;
|
||
// Re-read pending count from the DB so the count reflects whatever
|
||
// the loop left behind (any window-skipped rows are still pending).
|
||
const remainingPending = (
|
||
await db.query.reminderRunTargets.findMany({
|
||
where: (t, { eq, and: drizzleAnd }) =>
|
||
drizzleAnd(eq(t.runId, runId), eq(t.status, "pending")),
|
||
})
|
||
).length;
|
||
|
||
let status: "success" | "partial" | "failed" | "paused";
|
||
let errorSummary: string | null = null;
|
||
if (windowClosed && remainingPending > 0 && totalSent > 0) {
|
||
status = "paused";
|
||
errorSummary = `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}). ${totalSent} of ${total} groups delivered, ${remainingPending} still pending. Resume from the Activity tab. If this happens repeatedly, consider offloading to another paired account, or shrinking the message body / media size to fit more groups in your daily window.`;
|
||
} else if (windowClosed && totalSent === 0) {
|
||
// Window was closed before any send happened. Not paused — there's
|
||
// nothing meaningful to resume. Counts as a hard failure.
|
||
status = "failed";
|
||
errorSummary = `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}) before any group could be sent. The reminder fired too late in the day.`;
|
||
} else if (totalSent === total) {
|
||
status = "success";
|
||
} else if (totalSent > 0) {
|
||
status = "partial";
|
||
errorSummary = `${totalSent} of ${total} groups delivered (${totalFailed} failed, ${skippedCount} skipped).`;
|
||
} else {
|
||
status = "failed";
|
||
errorSummary = `All ${total} sends failed.`;
|
||
}
|
||
|
||
await db
|
||
.update(reminderRuns)
|
||
.set({ status, errorSummary })
|
||
.where(eq(reminderRuns.id, runId));
|
||
|
||
await pgNotifyWeb({
|
||
type: "reminder.fired",
|
||
reminderId: reminder.id,
|
||
runId,
|
||
status,
|
||
});
|
||
|
||
// Lifecycle bookkeeping. Skip when the run is paused — the reminder
|
||
// shouldn't end or re-arm while a resume is still possible.
|
||
const runIsTerminal = status !== "paused";
|
||
|
||
if (runIsTerminal) {
|
||
if (reminder.scheduleKind === "one_off") {
|
||
await db
|
||
.update(reminders)
|
||
.set({ status: "ended", updatedAt: new Date() })
|
||
.where(eq(reminders.id, reminder.id));
|
||
} else if (reminder.scheduleKind === "recurring" && reminder.rrule) {
|
||
const next = nextOccurrence(reminder.rrule, reminder.timezone, new Date());
|
||
await db
|
||
.update(reminders)
|
||
.set({ lastFiredAt: new Date(), updatedAt: new Date() })
|
||
.where(eq(reminders.id, reminder.id));
|
||
if (next) {
|
||
try {
|
||
await scheduleReminderFire(getBoss(), reminder.id, next);
|
||
logger.info({ reminderId: reminder.id, next }, "fire-reminder: re-armed for next occurrence");
|
||
} catch (err) {
|
||
logger.error({ err, reminderId: reminder.id }, "fire-reminder: failed to re-arm next occurrence");
|
||
}
|
||
} else {
|
||
logger.info({ reminderId: reminder.id }, "fire-reminder: no further occurrences, ending");
|
||
await db.update(reminders).set({ status: "ended" }).where(eq(reminders.id, reminder.id));
|
||
}
|
||
}
|
||
} else {
|
||
logger.info(
|
||
{ reminderId: reminder.id, runId },
|
||
"fire-reminder: paused — leaving reminder lifecycle unchanged for resume",
|
||
);
|
||
}
|
||
|
||
await writeAuditLog(db, {
|
||
operatorId: reminder.createdBy,
|
||
source: "system",
|
||
action: "reminder.fired",
|
||
targetType: "reminder",
|
||
targetId: reminder.id,
|
||
payload: { runId, status, sent: sentCount, failed: failedCount, skipped: skippedCount },
|
||
});
|
||
|
||
logger.info(
|
||
{ reminderId: reminder.id, runId, status, sent: sentCount, failed: failedCount, skipped: skippedCount },
|
||
"fire-reminder: done",
|
||
);
|
||
}
|
||
|
||
/**
|
||
* Mark every target as skipped with the given error. Used when the
|
||
* account is offline before the loop even starts (no run_target rows
|
||
* have been created yet, so we INSERT instead of UPDATE).
|
||
*/
|
||
async function markAllSkipped(
|
||
runId: string,
|
||
reminder: NonNullable<Awaited<ReturnType<typeof getReminderWithDetails>>>,
|
||
error: string,
|
||
): Promise<void> {
|
||
if (reminder.targets.length === 0) return;
|
||
const rows = await db.query.whatsappGroups.findMany({
|
||
where: (g) => inArray(g.id, reminder.targets.map((t) => t.groupId)),
|
||
columns: { id: true, name: true },
|
||
});
|
||
const labelById = new Map(rows.map((r) => [r.id, r.name]));
|
||
await db.insert(reminderRunTargets).values(
|
||
reminder.targets.map((t) => ({
|
||
runId,
|
||
groupId: t.groupId,
|
||
groupLabel: labelById.get(t.groupId) ?? null,
|
||
status: "skipped" as const,
|
||
error,
|
||
})),
|
||
);
|
||
}
|
||
|
||
/**
|
||
* Mutates the prebuilt proto.Message to set the caption on whichever
|
||
* media variant it carries. Baileys' relayMessage does not let us
|
||
* pass the caption alongside; the protobuf already carries the slot.
|
||
*/
|
||
function injectCaption(msg: proto.Message, caption: string): void {
|
||
if (msg.imageMessage) msg.imageMessage.caption = caption;
|
||
else if (msg.videoMessage) msg.videoMessage.caption = caption;
|
||
else if (msg.documentMessage) msg.documentMessage.caption = caption;
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 3: Add `p-limit` to apps/bot deps**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm add -F @cmbot/bot p-limit
|
||
```
|
||
|
||
Expected: pnpm reports `+1` package. The lockfile updates.
|
||
|
||
- [ ] **Step 4: Typecheck the bot**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot typecheck
|
||
```
|
||
|
||
Expected: passes. If type errors mention `relayMessage` overloads, double-check the `@whiskeysockets/baileys` import added — the function comes from the socket instance, not the package's named exports.
|
||
|
||
- [ ] **Step 5: Run all bot tests**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run
|
||
```
|
||
|
||
Expected: PASS for all (existing 26 + 7 PerKeyMutex + 8 RateLimiter + 6 DeliveryWindow + 4 MediaUploadCache = 51).
|
||
|
||
- [ ] **Step 6: Restart the bot to pick up the rewrite**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh restart-bot
|
||
NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml logs --tail=40 bot 2>&1 | grep -i "reminder.fire"
|
||
```
|
||
|
||
Expected: A line like `reminder.fire: handler registered teamSize=8`.
|
||
|
||
- [ ] **Step 7: Commit**
|
||
|
||
```bash
|
||
git add apps/bot/src/scheduler/fire-reminder.ts apps/bot/src/scheduler/reminder-jobs.ts apps/bot/package.json pnpm-lock.yaml
|
||
git commit -m "feat(bot): windowed, pacing-safe fan-out
|
||
|
||
Rewrites the per-target loop to:
|
||
- Wrap inner work in PerKeyMutex(accountId) so two reminders on the
|
||
same account take turns; different accounts run in parallel.
|
||
- Bulk-load groups and media rows up front (drops ~3000 round-trips
|
||
to ~3 for a 1000-group run).
|
||
- Pre-create run_target rows with status='pending' so the Activity
|
||
tab shows progress mid-run.
|
||
- Pre-upload each unique media via MediaUploadCache (one
|
||
generateWAMessageContent call per mediaId, then relayMessage to
|
||
every group). For 1000 groups × 5 MB image, this turns 5 GB of
|
||
upload into 5 MB.
|
||
- Run BOT_GROUP_CONCURRENCY (default 3) groups in parallel within
|
||
one account; each group's parts stay serial for visible order.
|
||
- Gate every socket call on a per-account TokenBucket
|
||
(BOT_MAX_SEND_PER_MINUTE, default 40).
|
||
- Replace the rigid 1.5s inter-part sleep with 200..499 ms jitter.
|
||
- Window-end gate: stop sending once today's end-hour (in the
|
||
reminder's timezone) has passed; mark unstarted targets skipped;
|
||
surface a partial-status message that names the timezone, the
|
||
delivered/total count, and points at multi-account offload.
|
||
|
||
reminder-jobs.ts now passes teamSize=BOT_FIRE_CONCURRENCY (default 8)
|
||
to boss.work so up to 8 different-account reminders run concurrently."
|
||
```
|
||
|
||
---
|
||
|
||
## Task 8: Web wiring — schema, action, wizard input, notification body
|
||
|
||
**Files:**
|
||
- Modify: `apps/web/src/actions/reminders.ts`
|
||
- Modify: `apps/web/src/components/reminder-wizard/when-form-client.tsx`
|
||
- Modify: `apps/web/src/components/reminder-edit/edit-when-form.tsx`
|
||
- Modify: `apps/web/src/lib/notifications.ts`
|
||
- Create: `apps/web/src/lib/notifications-body.test.ts` (extends existing notifications tests with the partial-status body)
|
||
|
||
- [ ] **Step 1: Update the create/update Zod schemas in `apps/web/src/actions/reminders.ts`**
|
||
|
||
Find the `createReminderSchema` definition (around lines 220–250) and add the two fields. Schema chunk:
|
||
|
||
```ts
|
||
const createReminderSchema = z
|
||
.object({
|
||
accountId: z.string().uuid(),
|
||
groupIds: z.array(z.string().uuid()),
|
||
messages: z.array(messagePartSchema).optional(),
|
||
name: z.string().nullable().optional(),
|
||
text: z.string().nullable().optional(),
|
||
mediaId: z.string().uuid().nullable().optional(),
|
||
caption: z.string().nullable().optional(),
|
||
scheduledAtIso: z.string().datetime({ offset: true }),
|
||
rrule: z.string().nullable().optional(),
|
||
timezone: z.string().default(DEFAULT_TIMEZONE),
|
||
// Delivery window. End hour is enforced at runtime by fire-reminder;
|
||
// start hour is documented but not gated in v1.
|
||
deliveryWindowStartHour: z.number().int().min(0).max(24).default(6),
|
||
deliveryWindowEndHour: z.number().int().min(0).max(24).default(18),
|
||
})
|
||
.refine(
|
||
(d) =>
|
||
(d.messages && d.messages.length > 0) ||
|
||
Boolean(d.text?.trim()) ||
|
||
Boolean(d.mediaId),
|
||
{
|
||
message: "Add a message or attach a file",
|
||
path: ["messages"],
|
||
},
|
||
)
|
||
.refine((d) => d.deliveryWindowStartHour < d.deliveryWindowEndHour, {
|
||
message: "Delivery window start must be earlier than end",
|
||
path: ["deliveryWindowStartHour"],
|
||
});
|
||
```
|
||
|
||
Then in BOTH `createReminderAction` and `updateReminderAction`, where the `db.insert(reminders).values({...})` / `db.update(reminders).set({...})` happens, include the two new fields. Find the `values({` blocks (around line 350 and line 460) and add:
|
||
|
||
```ts
|
||
deliveryWindowStartHour: parsed.data.deliveryWindowStartHour,
|
||
deliveryWindowEndHour: parsed.data.deliveryWindowEndHour,
|
||
```
|
||
|
||
immediately after `timezone,`.
|
||
|
||
- [ ] **Step 2: Add the input controls to `apps/web/src/components/reminder-wizard/when-form-client.tsx`**
|
||
|
||
Add to the props interface and component state. Top-of-file imports stay; add a new state and a new section in the JSX. The whole edit:
|
||
|
||
Find the `interface WhenFormClientProps` block. Add:
|
||
|
||
```ts
|
||
interface WhenFormClientProps {
|
||
accountId: string;
|
||
groupIds: string;
|
||
timezone: string;
|
||
initialDefaultIso: string;
|
||
initialSpec?: RecurrenceSpec;
|
||
initialDeliveryStartHour?: number;
|
||
initialDeliveryEndHour?: number;
|
||
passThroughParams: PassThroughParams;
|
||
}
|
||
```
|
||
|
||
In the component body (around state declarations near the top), add:
|
||
|
||
```ts
|
||
const [deliveryStartHour, setDeliveryStartHour] = useState<number>(
|
||
props.initialDeliveryStartHour ?? 6,
|
||
);
|
||
const [deliveryEndHour, setDeliveryEndHour] = useState<number>(
|
||
props.initialDeliveryEndHour ?? 18,
|
||
);
|
||
```
|
||
|
||
In `handleContinue`, when building the URLSearchParams, add (after the existing params):
|
||
|
||
```ts
|
||
sp.set("deliveryStartHour", String(deliveryStartHour));
|
||
sp.set("deliveryEndHour", String(deliveryEndHour));
|
||
```
|
||
|
||
In the JSX, just before the existing `<RecurrencePicker>` block, add a new "Delivery hours" card:
|
||
|
||
```tsx
|
||
<div className="space-y-2">
|
||
<Label className="flex items-center gap-1.5">
|
||
<ClockIcon className="size-3.5" />
|
||
Delivery hours
|
||
</Label>
|
||
<div className="flex items-center gap-2">
|
||
<Input
|
||
type="number"
|
||
min={0}
|
||
max={24}
|
||
step={1}
|
||
value={deliveryStartHour}
|
||
onChange={(e) => setDeliveryStartHour(Number(e.target.value))}
|
||
className="h-9 w-20"
|
||
aria-label="Delivery start hour"
|
||
/>
|
||
<span className="text-sm text-muted-foreground">to</span>
|
||
<Input
|
||
type="number"
|
||
min={0}
|
||
max={24}
|
||
step={1}
|
||
value={deliveryEndHour}
|
||
onChange={(e) => setDeliveryEndHour(Number(e.target.value))}
|
||
className="h-9 w-20"
|
||
aria-label="Delivery end hour"
|
||
/>
|
||
<span className="text-xs text-muted-foreground">
|
||
(24-hour, in {timezone})
|
||
</span>
|
||
</div>
|
||
<p className="text-xs text-muted-foreground">
|
||
The bot stops sending after the end hour. Long fan-outs that don't
|
||
finish in this window are paused — you can resume them from the
|
||
Activity tab.
|
||
</p>
|
||
</div>
|
||
```
|
||
|
||
Add `ClockIcon` to the lucide-react imports if it's not already there (file currently imports `CalendarIcon`, `ClockIcon`, `AlertCircleIcon` — already there).
|
||
|
||
- [ ] **Step 3: Wire the new params into `apps/web/src/components/reminder-wizard/step-when.tsx`**
|
||
|
||
The wizard reads `initialDeliveryStartHour` / `initialDeliveryEndHour` from URL and passes them to `<WhenFormClient>`. Find where `<WhenFormClient>` is rendered and add the two new props:
|
||
|
||
```tsx
|
||
<WhenFormClient
|
||
accountId={accountId}
|
||
groupIds={groupIds}
|
||
timezone={op.defaultTimezone ?? "UTC"}
|
||
initialDefaultIso={initialDefaultIso}
|
||
initialSpec={initialSpec}
|
||
initialDeliveryStartHour={
|
||
sp.deliveryStartHour ? Number(sp.deliveryStartHour) : undefined
|
||
}
|
||
initialDeliveryEndHour={
|
||
sp.deliveryEndHour ? Number(sp.deliveryEndHour) : undefined
|
||
}
|
||
passThroughParams={passThroughParams}
|
||
/>
|
||
```
|
||
|
||
Also update the `interface StepWhenParams` (or wherever the searchParams type lives in step-when.tsx) to include `deliveryStartHour?: string; deliveryEndHour?: string;`.
|
||
|
||
- [ ] **Step 4: Pass-through in step-groups.tsx and step-review.tsx**
|
||
|
||
Both step pages thread URL params through. Open each and find where they re-emit the URL (look for the `URLSearchParams` builder or `editLink` helper). Add:
|
||
|
||
```ts
|
||
if (sp.deliveryStartHour) backParams.set("deliveryStartHour", sp.deliveryStartHour);
|
||
if (sp.deliveryEndHour) backParams.set("deliveryEndHour", sp.deliveryEndHour);
|
||
```
|
||
|
||
Add the same to `editLink` in step-review.tsx so the "Edit when" link from the review page round-trips the values.
|
||
|
||
In `review-submit-client.tsx`, where the action payload is built, include the two fields:
|
||
|
||
```ts
|
||
const payload = {
|
||
accountId,
|
||
groupIds: groupIds ? groupIds.split(",").filter(Boolean) : [],
|
||
name: name?.trim() || null,
|
||
messages,
|
||
scheduledAtIso: scheduledAt,
|
||
rrule: rrule ?? null,
|
||
timezone,
|
||
deliveryWindowStartHour: deliveryStartHour ?? 6,
|
||
deliveryWindowEndHour: deliveryEndHour ?? 18,
|
||
};
|
||
```
|
||
|
||
(And add `deliveryStartHour?: number; deliveryEndHour?: number;` to the props interface and pass them in from `step-review.tsx` after parsing the URL.)
|
||
|
||
- [ ] **Step 5: Update `apps/web/src/components/reminder-edit/edit-when-form.tsx` similarly**
|
||
|
||
Add the same `Delivery hours` card into the form, same state vars, and include the two fields in the `updateReminderAction` payload (find the existing `updateReminderAction({...})` call). Pull the initial values from the loaded reminder row in the parent edit page (`apps/web/src/app/reminders/[id]/edit/when/page.tsx`):
|
||
|
||
```tsx
|
||
<EditWhenForm
|
||
reminderId={reminder.id}
|
||
accountId={reminder.accountId}
|
||
groupIds={targets.map((t) => t.groupId)}
|
||
messages={initialMessages}
|
||
name={reminder.name}
|
||
initialIso={(reminder.scheduledAt ?? new Date()).toISOString()}
|
||
initialSpec={specFromRrule(reminder.rrule)}
|
||
timezone={reminder.timezone}
|
||
initialDeliveryStartHour={reminder.deliveryWindowStartHour}
|
||
initialDeliveryEndHour={reminder.deliveryWindowEndHour}
|
||
/>
|
||
```
|
||
|
||
The form's prop interface gets:
|
||
|
||
```ts
|
||
initialDeliveryStartHour?: number;
|
||
initialDeliveryEndHour?: number;
|
||
```
|
||
|
||
- [ ] **Step 6: Extend the notification body for paused + partial in `apps/web/src/lib/notifications.ts`**
|
||
|
||
Find `reminderFiredToNotification`. The existing function takes `{ reminderId, runId, status }`. Extend the event shape to carry `sent`/`total` and handle `paused` as a first-class status:
|
||
|
||
```ts
|
||
export function reminderFiredToNotification(event: {
|
||
type: "reminder.fired";
|
||
reminderId: string;
|
||
runId: string;
|
||
status: string;
|
||
sent?: number;
|
||
total?: number;
|
||
}): ShowNotificationOptions | null {
|
||
if (event.status === "skipped") return null;
|
||
const headline =
|
||
event.status === "success"
|
||
? "Reminder sent"
|
||
: event.status === "paused"
|
||
? "Reminder paused"
|
||
: event.status === "partial"
|
||
? "Reminder partly sent"
|
||
: "Reminder failed";
|
||
let body =
|
||
event.status === "success"
|
||
? "All groups received the message."
|
||
: event.status === "paused"
|
||
? "Delivery window closed before all groups got the message."
|
||
: event.status === "partial"
|
||
? "Some groups received the message; others failed. See activity."
|
||
: "No groups received the message. See activity.";
|
||
if (event.status === "paused" && event.sent !== undefined && event.total !== undefined) {
|
||
body = `${event.sent} of ${event.total} groups delivered. Tap to resume or cancel.`;
|
||
} else if (event.status === "partial" && event.sent !== undefined && event.total !== undefined) {
|
||
body = `${event.sent} of ${event.total} groups delivered. See activity for details.`;
|
||
}
|
||
return {
|
||
title: headline,
|
||
body,
|
||
tag: `reminder:${event.reminderId}`,
|
||
href: `/reminders/${event.reminderId}`,
|
||
};
|
||
}
|
||
```
|
||
|
||
Also update the SSE event emitter (search for `reminder.fired` in `apps/bot/src/scheduler/fire-reminder.ts`) to include `sent` and `total` on the published event payload — Task 7 already wires this in the new fire-reminder. Confirm the web-side receiver passes those fields through to `reminderFiredToNotification`.
|
||
|
||
- [ ] **Step 7: Add tests for the extended notification body**
|
||
|
||
Append to `apps/web/src/lib/notifications.test.ts` (inside the existing `describe("reminderFiredToNotification mapping", () => {...})` block):
|
||
|
||
```ts
|
||
it("paused with sent/total renders 'Tap to resume or cancel'", () => {
|
||
const args = reminderFiredToNotification({
|
||
type: "reminder.fired",
|
||
reminderId: "r-1",
|
||
runId: "run-1",
|
||
status: "paused",
|
||
sent: 412,
|
||
total: 1000,
|
||
});
|
||
expect(args?.title).toBe("Reminder paused");
|
||
expect(args?.body).toBe("412 of 1000 groups delivered. Tap to resume or cancel.");
|
||
});
|
||
|
||
it("paused without sent/total falls back to a generic paused body", () => {
|
||
const args = reminderFiredToNotification({
|
||
type: "reminder.fired",
|
||
reminderId: "r-1",
|
||
runId: "run-1",
|
||
status: "paused",
|
||
});
|
||
expect(args?.title).toBe("Reminder paused");
|
||
expect(args?.body).toMatch(/Delivery window closed/);
|
||
});
|
||
|
||
it("partial with sent/total renders 'X of Y groups delivered' instead of the generic body", () => {
|
||
const args = reminderFiredToNotification({
|
||
type: "reminder.fired",
|
||
reminderId: "r-1",
|
||
runId: "run-1",
|
||
status: "partial",
|
||
sent: 412,
|
||
total: 1000,
|
||
});
|
||
expect(args?.body).toBe("412 of 1000 groups delivered. See activity for details.");
|
||
});
|
||
|
||
it("partial without sent/total falls back to the generic body", () => {
|
||
const args = reminderFiredToNotification({
|
||
type: "reminder.fired",
|
||
reminderId: "r-1",
|
||
runId: "run-1",
|
||
status: "partial",
|
||
});
|
||
expect(args?.body).toMatch(/Some groups received/);
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 8: Update existing edit-section-forms.test.tsx fixtures so they don't break on the new required props**
|
||
|
||
Find `apps/web/src/components/reminder-edit/edit-section-forms.test.tsx`. Both `EditAccountForm` and `EditGroupsForm` may need additional defaults. If their props don't reference the new fields, the tests still pass — re-run them and patch only if compilation fails. Run:
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web typecheck
|
||
```
|
||
|
||
If typecheck reports `Property 'initialDeliveryStartHour' is missing` on any test, add `initialDeliveryStartHour: 6, initialDeliveryEndHour: 18` to that fixture.
|
||
|
||
- [ ] **Step 9: Run web tests + bot tests**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run
|
||
```
|
||
|
||
Expected: web 298+ pass, bot 26+25 = 51 pass. Total ~380 across the project including shared.
|
||
|
||
- [ ] **Step 10: Restart web container so the server bundle picks up the schema change**
|
||
|
||
```bash
|
||
NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml restart web
|
||
```
|
||
|
||
- [ ] **Step 11: Commit**
|
||
|
||
```bash
|
||
git add apps/web
|
||
git commit -m "feat(web): delivery-window inputs in wizard + edit; partial body extension
|
||
|
||
Wizard 'When' step and the per-section edit-when page get two number
|
||
inputs for the delivery window hours (default 6/18). Server actions
|
||
accept them on the create/update Zod schemas, validate
|
||
0 <= start < end <= 24, and persist to the new reminders columns.
|
||
|
||
Notification body for paused-status now reads
|
||
'412 of 1000 groups delivered. Tap to resume or cancel.'; partial
|
||
status uses the same delivered/total wording when sent/total are
|
||
present."
|
||
```
|
||
|
||
---
|
||
|
||
## Task 9: Run-ETA helper + ETA pill in wizard review (TDD)
|
||
|
||
**Files:**
|
||
- Create: `apps/web/src/lib/run-eta.ts`
|
||
- Create: `apps/web/src/lib/run-eta.test.ts`
|
||
- Create: `apps/web/src/components/reminder-wizard/run-eta-pill.tsx`
|
||
- Create: `apps/web/src/components/reminder-wizard/run-eta-pill.test.tsx`
|
||
- Modify: `apps/web/src/components/reminder-wizard/review-submit-client.tsx`
|
||
- Modify: `apps/web/src/components/reminder-edit/edit-when-form.tsx`
|
||
- Modify: `apps/web/src/components/reminder-edit/edit-groups-form.tsx`
|
||
|
||
- [ ] **Step 1: Write the failing run-eta test**
|
||
|
||
Create `apps/web/src/lib/run-eta.test.ts`:
|
||
|
||
```ts
|
||
import { describe, it, expect } from "vitest";
|
||
import { estimateRunDuration, ASSUMED_RATE_PER_MINUTE } from "./run-eta";
|
||
|
||
describe("estimateRunDuration", () => {
|
||
it("uses target count / rate plus a 15% buffer, ceiling-rounded to whole minutes", () => {
|
||
const r = estimateRunDuration({
|
||
targetCount: 1000,
|
||
ratePerMinute: 40,
|
||
fireAt: new Date("2026-05-13T09:00:00.000+08:00"),
|
||
});
|
||
// 1000 / 40 = 25 min; +15% = 28.75 → ceil = 29
|
||
expect(r.durationMinutes).toBe(29);
|
||
expect(r.estimatedFinishAt.toISOString()).toBe(
|
||
new Date("2026-05-13T09:29:00.000+08:00").toISOString(),
|
||
);
|
||
});
|
||
|
||
it("returns a 1-minute floor for very small runs", () => {
|
||
const r = estimateRunDuration({
|
||
targetCount: 1,
|
||
ratePerMinute: 40,
|
||
fireAt: new Date("2026-05-13T09:00:00.000+08:00"),
|
||
});
|
||
expect(r.durationMinutes).toBe(1);
|
||
});
|
||
|
||
it("returns 0 minutes and finishAt = fireAt when targetCount is 0", () => {
|
||
const fireAt = new Date("2026-05-13T09:00:00.000+08:00");
|
||
const r = estimateRunDuration({ targetCount: 0, ratePerMinute: 40, fireAt });
|
||
expect(r.durationMinutes).toBe(0);
|
||
expect(r.estimatedFinishAt.toISOString()).toBe(fireAt.toISOString());
|
||
});
|
||
|
||
it("throws when ratePerMinute is 0 or negative", () => {
|
||
expect(() =>
|
||
estimateRunDuration({
|
||
targetCount: 100,
|
||
ratePerMinute: 0,
|
||
fireAt: new Date(),
|
||
}),
|
||
).toThrow();
|
||
expect(() =>
|
||
estimateRunDuration({
|
||
targetCount: 100,
|
||
ratePerMinute: -1,
|
||
fireAt: new Date(),
|
||
}),
|
||
).toThrow();
|
||
});
|
||
|
||
it("exports the configured default rate constant", () => {
|
||
expect(typeof ASSUMED_RATE_PER_MINUTE).toBe("number");
|
||
expect(ASSUMED_RATE_PER_MINUTE).toBeGreaterThan(0);
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2: Run test to verify it fails**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run run-eta
|
||
```
|
||
|
||
Expected: FAIL — `Cannot find module './run-eta'`.
|
||
|
||
- [ ] **Step 3: Write the helper to pass**
|
||
|
||
Create `apps/web/src/lib/run-eta.ts`:
|
||
|
||
```ts
|
||
/**
|
||
* Default per-account send rate, mirroring `BOT_MAX_SEND_PER_MINUTE`
|
||
* in the bot env. The web bundle hardcodes this — operators who tune
|
||
* the bot env are expected to redeploy web with the matching value.
|
||
*/
|
||
export const ASSUMED_RATE_PER_MINUTE = 40;
|
||
|
||
const ETA_BUFFER = 1.15;
|
||
|
||
export function estimateRunDuration(opts: {
|
||
targetCount: number;
|
||
ratePerMinute?: number;
|
||
fireAt: Date;
|
||
}): { durationMinutes: number; estimatedFinishAt: Date } {
|
||
const rate = opts.ratePerMinute ?? ASSUMED_RATE_PER_MINUTE;
|
||
if (rate <= 0) throw new Error("ratePerMinute must be > 0");
|
||
if (opts.targetCount <= 0) {
|
||
return { durationMinutes: 0, estimatedFinishAt: new Date(opts.fireAt) };
|
||
}
|
||
const raw = (opts.targetCount / rate) * ETA_BUFFER;
|
||
const durationMinutes = Math.max(1, Math.ceil(raw));
|
||
const estimatedFinishAt = new Date(
|
||
opts.fireAt.getTime() + durationMinutes * 60_000,
|
||
);
|
||
return { durationMinutes, estimatedFinishAt };
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4: Run test to verify it passes**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run run-eta
|
||
```
|
||
|
||
Expected: PASS (5/5).
|
||
|
||
- [ ] **Step 5: Write the failing pill component test**
|
||
|
||
Create `apps/web/src/components/reminder-wizard/run-eta-pill.test.tsx`:
|
||
|
||
```tsx
|
||
import { describe, it, expect } from "vitest";
|
||
import { renderToStaticMarkup } from "react-dom/server";
|
||
import { RunEtaPill } from "./run-eta-pill";
|
||
|
||
describe("RunEtaPill", () => {
|
||
it("renders green 'Fits in window' when estimatedFinishAt <= windowEndAt", () => {
|
||
const html = renderToStaticMarkup(
|
||
<RunEtaPill
|
||
targetCount={500}
|
||
fireAt={new Date("2026-05-13T09:00:00.000+08:00")}
|
||
windowEndAt={new Date("2026-05-13T18:00:00.000+08:00")}
|
||
timezone="Asia/Kuala_Lumpur"
|
||
/>,
|
||
);
|
||
expect(html).toMatch(/Fits in window/);
|
||
expect(html).toMatch(/min/);
|
||
expect(html).not.toMatch(/Likely to pause/);
|
||
});
|
||
|
||
it("renders amber 'Likely to pause' when ETA exceeds window", () => {
|
||
const html = renderToStaticMarkup(
|
||
<RunEtaPill
|
||
targetCount={5000}
|
||
fireAt={new Date("2026-05-13T17:00:00.000+08:00")}
|
||
windowEndAt={new Date("2026-05-13T18:00:00.000+08:00")}
|
||
timezone="Asia/Kuala_Lumpur"
|
||
/>,
|
||
);
|
||
expect(html).toMatch(/Likely to pause/);
|
||
expect(html).toMatch(/Widen the window/);
|
||
});
|
||
|
||
it("renders nothing for zero targets", () => {
|
||
const html = renderToStaticMarkup(
|
||
<RunEtaPill
|
||
targetCount={0}
|
||
fireAt={new Date("2026-05-13T09:00:00.000+08:00")}
|
||
windowEndAt={new Date("2026-05-13T18:00:00.000+08:00")}
|
||
timezone="Asia/Kuala_Lumpur"
|
||
/>,
|
||
);
|
||
expect(html).toBe("");
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 6: Implement the pill**
|
||
|
||
Create `apps/web/src/components/reminder-wizard/run-eta-pill.tsx`:
|
||
|
||
```tsx
|
||
import { ClockIcon, AlertTriangleIcon } from "lucide-react";
|
||
import { estimateRunDuration } from "@/lib/run-eta";
|
||
|
||
interface RunEtaPillProps {
|
||
targetCount: number;
|
||
fireAt: Date;
|
||
windowEndAt: Date;
|
||
timezone: string;
|
||
}
|
||
|
||
/**
|
||
* Visible at the wizard's review step and on the per-section edit
|
||
* pages that affect ETA (groups, when). Advisory only — does NOT
|
||
* block submission. The operator can still schedule a run that
|
||
* pauses; the pause-and-resume flow covers that case.
|
||
*/
|
||
export function RunEtaPill({
|
||
targetCount,
|
||
fireAt,
|
||
windowEndAt,
|
||
timezone,
|
||
}: RunEtaPillProps) {
|
||
if (targetCount <= 0) return null;
|
||
|
||
const { durationMinutes, estimatedFinishAt } = estimateRunDuration({
|
||
targetCount,
|
||
fireAt,
|
||
});
|
||
const fits = estimatedFinishAt.getTime() <= windowEndAt.getTime();
|
||
|
||
const finishLocal = new Intl.DateTimeFormat("en-GB", {
|
||
hour: "2-digit",
|
||
minute: "2-digit",
|
||
timeZone: timezone,
|
||
}).format(estimatedFinishAt);
|
||
|
||
if (fits) {
|
||
return (
|
||
<div className="flex items-center gap-2 rounded-lg bg-emerald-500/10 px-3 py-2 text-xs text-emerald-700 dark:text-emerald-400">
|
||
<ClockIcon className="size-3.5" />
|
||
<span>
|
||
~{durationMinutes} min · finishes ~{finishLocal} · Fits in window
|
||
</span>
|
||
</div>
|
||
);
|
||
}
|
||
return (
|
||
<div className="flex items-start gap-2 rounded-lg bg-amber-500/10 px-3 py-2 text-xs text-amber-700 dark:text-amber-400">
|
||
<AlertTriangleIcon className="size-3.5 mt-0.5 shrink-0" />
|
||
<div className="space-y-0.5">
|
||
<div>
|
||
~{durationMinutes} min · finishes ~{finishLocal} · Likely to pause
|
||
</div>
|
||
<div className="text-[11px] opacity-80">
|
||
Widen the window or split into smaller runs.
|
||
</div>
|
||
</div>
|
||
</div>
|
||
);
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 7: Run pill tests**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run run-eta-pill
|
||
```
|
||
|
||
Expected: 3/3 PASS.
|
||
|
||
- [ ] **Step 8: Wire the pill into review-submit-client**
|
||
|
||
In `apps/web/src/components/reminder-wizard/review-submit-client.tsx`, add the pill above the Schedule button. The component already has access to `groupIds`, `scheduledAt`, `timezone`, and now `deliveryEndHour` (from Task 8). Compute the windowEndAt inline:
|
||
|
||
```tsx
|
||
import { RunEtaPill } from "./run-eta-pill";
|
||
import { windowEndAt as computeWindowEndAt } from "@cmbot/shared"; // see note below
|
||
|
||
// inside the component, just before the action button:
|
||
{groupIds && scheduledAt && (() => {
|
||
const ids = groupIds.split(",").filter(Boolean);
|
||
const fireAt = new Date(scheduledAt);
|
||
const wEnd = computeWindowEndAt(timezone, deliveryEndHour ?? 18, fireAt);
|
||
return (
|
||
<RunEtaPill
|
||
targetCount={ids.length}
|
||
fireAt={fireAt}
|
||
windowEndAt={wEnd}
|
||
timezone={timezone}
|
||
/>
|
||
);
|
||
})()}
|
||
```
|
||
|
||
`computeWindowEndAt` is the helper Task 5 created in `packages/shared/src/delivery-window.ts` — both bundles import it from `@cmbot/shared`.
|
||
|
||
- [ ] **Step 9: Wire the pill into edit-groups-form and edit-when-form**
|
||
|
||
Both forms know the reminder's `targetCount`, `fireAt`, `timezone`, and `deliveryWindowEndHour`. Add the pill above their Save button:
|
||
|
||
```tsx
|
||
<RunEtaPill
|
||
targetCount={selectedGroupIds.length}
|
||
fireAt={new Date(scheduledAtIso)}
|
||
windowEndAt={computeWindowEndAt(
|
||
timezone,
|
||
deliveryEndHour,
|
||
new Date(scheduledAtIso),
|
||
)}
|
||
timezone={timezone}
|
||
/>
|
||
```
|
||
|
||
- [ ] **Step 10: Run all web tests**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/shared test -- --run
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run
|
||
```
|
||
|
||
Expected: all green. Web ≈304 (added 8 ETA tests + 3 pill tests, partially offset by no test removals). Shared adds the windowEndAt tests that moved from bot.
|
||
|
||
- [ ] **Step 11: Commit**
|
||
|
||
```bash
|
||
git add apps/web packages/shared apps/bot
|
||
git commit -m "feat(web): ETA preview pill in wizard + edit-groups + edit-when
|
||
|
||
estimateRunDuration() computes a per-run ETA from BOT_MAX_SEND_PER_MINUTE
|
||
(hardcoded as ASSUMED_RATE_PER_MINUTE in the web bundle) plus a 15%
|
||
buffer. The RunEtaPill component shows a green 'Fits in window' or
|
||
amber 'Likely to pause' badge with a one-line suggestion. windowEndAt
|
||
moves from apps/bot/src/scheduler/delivery-window.ts to
|
||
packages/shared/src/delivery-window.ts so both bundles can import it."
|
||
```
|
||
|
||
---
|
||
|
||
## Task 10: Resume + cancel actions and PausedRunBanner
|
||
|
||
**Files:**
|
||
- Modify: `apps/web/src/actions/reminders.ts` (add `resumeReminderRunAction`, `cancelReminderRunAction`)
|
||
- Create: `apps/web/src/components/reminder-detail/paused-run-banner.tsx`
|
||
- Create: `apps/web/src/components/reminder-detail/paused-run-banner.test.tsx`
|
||
- Modify: `apps/web/src/app/reminders/[id]/page.tsx` (mount the banner)
|
||
- Modify: `apps/web/src/app/activity/page.tsx` (add Paused filter + Resume button per row)
|
||
|
||
- [ ] **Step 1: Write the failing banner test**
|
||
|
||
Create `apps/web/src/components/reminder-detail/paused-run-banner.test.tsx`:
|
||
|
||
```tsx
|
||
import { describe, it, expect, vi, beforeEach } from "vitest";
|
||
import { renderToStaticMarkup } from "react-dom/server";
|
||
import { PausedRunBanner } from "./paused-run-banner";
|
||
|
||
const resumeMock = vi.fn();
|
||
const cancelMock = vi.fn();
|
||
vi.mock("@/actions/reminders", () => ({
|
||
resumeReminderRunAction: (...args: unknown[]) => resumeMock(...args),
|
||
cancelReminderRunAction: (...args: unknown[]) => cancelMock(...args),
|
||
}));
|
||
|
||
describe("PausedRunBanner", () => {
|
||
beforeEach(() => {
|
||
resumeMock.mockReset();
|
||
cancelMock.mockReset();
|
||
});
|
||
|
||
it("renders 'Resume' and 'Cancel run' buttons when latest run is paused", () => {
|
||
const html = renderToStaticMarkup(
|
||
<PausedRunBanner
|
||
runId="run-1"
|
||
sent={412}
|
||
total={1000}
|
||
windowEndHour={18}
|
||
timezone="Asia/Kuala_Lumpur"
|
||
/>,
|
||
);
|
||
expect(html).toMatch(/Reminder paused/);
|
||
expect(html).toMatch(/412 of 1000/);
|
||
expect(html).toMatch(/Resume/);
|
||
expect(html).toMatch(/Cancel run/);
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2: Run to verify it fails**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run paused-run-banner
|
||
```
|
||
|
||
Expected: FAIL — module not found.
|
||
|
||
- [ ] **Step 3: Add resume + cancel actions to `apps/web/src/actions/reminders.ts`**
|
||
|
||
Append (near the other run-related actions):
|
||
|
||
```ts
|
||
const runIdSchema = z.object({ runId: z.string().uuid() });
|
||
|
||
export async function resumeReminderRunAction(input: { runId: string }) {
|
||
const op = await getSeededOperator();
|
||
const parsed = runIdSchema.safeParse(input);
|
||
if (!parsed.success) {
|
||
return { ok: false as const, error: "Invalid runId" };
|
||
}
|
||
|
||
const run = await db.query.reminderRuns.findFirst({
|
||
where: (r, { eq }) => eq(r.id, parsed.data.runId),
|
||
with: { reminder: { columns: { operatorId: true, id: true } } },
|
||
});
|
||
if (!run || run.reminder.operatorId !== op.id) {
|
||
return { ok: false as const, error: "Run not found" };
|
||
}
|
||
if (run.status !== "paused") {
|
||
return { ok: false as const, error: `Cannot resume a ${run.status} run` };
|
||
}
|
||
|
||
await getBoss().send("reminder.fire", {
|
||
reminderId: run.reminder.id,
|
||
runId: run.id,
|
||
});
|
||
await writeAudit(op.id, "reminder.run.resumed", { runId: run.id });
|
||
|
||
revalidatePath(`/reminders/${run.reminder.id}`);
|
||
revalidatePath(`/activity`);
|
||
return { ok: true as const };
|
||
}
|
||
|
||
export async function cancelReminderRunAction(input: { runId: string }) {
|
||
const op = await getSeededOperator();
|
||
const parsed = runIdSchema.safeParse(input);
|
||
if (!parsed.success) {
|
||
return { ok: false as const, error: "Invalid runId" };
|
||
}
|
||
|
||
const run = await db.query.reminderRuns.findFirst({
|
||
where: (r, { eq }) => eq(r.id, parsed.data.runId),
|
||
with: { reminder: { columns: { operatorId: true, id: true } } },
|
||
});
|
||
if (!run || run.reminder.operatorId !== op.id) {
|
||
return { ok: false as const, error: "Run not found" };
|
||
}
|
||
if (run.status !== "paused") {
|
||
return { ok: false as const, error: `Cannot cancel a ${run.status} run` };
|
||
}
|
||
|
||
await db.transaction(async (tx) => {
|
||
await tx
|
||
.update(reminderRunTargets)
|
||
.set({ status: "skipped", error: "canceled by operator" })
|
||
.where(
|
||
and(
|
||
eq(reminderRunTargets.runId, run.id),
|
||
eq(reminderRunTargets.status, "pending"),
|
||
),
|
||
);
|
||
await tx
|
||
.update(reminderRuns)
|
||
.set({
|
||
status: "partial",
|
||
endedAt: new Date(),
|
||
errorSummary: "Canceled by operator before all groups received the message.",
|
||
})
|
||
.where(eq(reminderRuns.id, run.id));
|
||
});
|
||
await writeAudit(op.id, "reminder.run.canceled", { runId: run.id });
|
||
|
||
revalidatePath(`/reminders/${run.reminder.id}`);
|
||
revalidatePath(`/activity`);
|
||
return { ok: true as const };
|
||
}
|
||
```
|
||
|
||
(Imports needed at top of file: `revalidatePath` from `next/cache`, `getBoss` from `@/lib/boss`, `writeAudit` from `@/lib/audit`, `reminderRuns` and `reminderRunTargets` from `@cmbot/db`, `and`/`eq` from `drizzle-orm`.)
|
||
|
||
- [ ] **Step 4: Implement PausedRunBanner**
|
||
|
||
Create `apps/web/src/components/reminder-detail/paused-run-banner.tsx`:
|
||
|
||
```tsx
|
||
"use client";
|
||
|
||
import { useState, useTransition } from "react";
|
||
import { AlertCircleIcon, PlayIcon, XIcon, Loader2Icon } from "lucide-react";
|
||
import { Button } from "@/components/ui/button";
|
||
import {
|
||
resumeReminderRunAction,
|
||
cancelReminderRunAction,
|
||
} from "@/actions/reminders";
|
||
|
||
interface PausedRunBannerProps {
|
||
runId: string;
|
||
sent: number;
|
||
total: number;
|
||
windowEndHour: number;
|
||
timezone: string;
|
||
}
|
||
|
||
export function PausedRunBanner({
|
||
runId,
|
||
sent,
|
||
total,
|
||
windowEndHour,
|
||
timezone,
|
||
}: PausedRunBannerProps) {
|
||
const [pending, startTransition] = useTransition();
|
||
const [error, setError] = useState<string | null>(null);
|
||
|
||
const onResume = () =>
|
||
startTransition(async () => {
|
||
setError(null);
|
||
const r = await resumeReminderRunAction({ runId });
|
||
if (!r.ok) setError(r.error);
|
||
});
|
||
|
||
const onCancel = () =>
|
||
startTransition(async () => {
|
||
setError(null);
|
||
const r = await cancelReminderRunAction({ runId });
|
||
if (!r.ok) setError(r.error);
|
||
});
|
||
|
||
return (
|
||
<div className="rounded-lg border border-amber-500/40 bg-amber-500/5 p-4 space-y-3">
|
||
<div className="flex items-start gap-2">
|
||
<AlertCircleIcon className="size-4 text-amber-600 dark:text-amber-400 mt-0.5 shrink-0" />
|
||
<div className="space-y-1">
|
||
<p className="text-sm font-medium">Reminder paused</p>
|
||
<p className="text-xs text-muted-foreground">
|
||
{sent} of {total} groups delivered. The delivery window
|
||
closed at {windowEndHour}:00 ({timezone}). Resume to send
|
||
the remaining {total - sent} groups, or cancel the run.
|
||
</p>
|
||
</div>
|
||
</div>
|
||
{error && (
|
||
<div className="text-xs text-destructive">{error}</div>
|
||
)}
|
||
<div className="flex gap-2">
|
||
<Button size="sm" onClick={onResume} disabled={pending} className="gap-2">
|
||
{pending ? <Loader2Icon className="size-3.5 animate-spin" /> : <PlayIcon className="size-3.5" />}
|
||
Resume
|
||
</Button>
|
||
<Button size="sm" variant="outline" onClick={onCancel} disabled={pending} className="gap-2">
|
||
<XIcon className="size-3.5" />
|
||
Cancel run
|
||
</Button>
|
||
</div>
|
||
</div>
|
||
);
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 5: Run banner test**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run paused-run-banner
|
||
```
|
||
|
||
Expected: PASS.
|
||
|
||
- [ ] **Step 6: Mount the banner on the reminder detail page**
|
||
|
||
In `apps/web/src/app/reminders/[id]/page.tsx`, after loading `reminder` and `runs`, find the latest paused run and conditionally render the banner above the section list:
|
||
|
||
```tsx
|
||
const latestPausedRun = runs.find((r) => r.status === "paused");
|
||
|
||
// in JSX, near the top of the page content:
|
||
{latestPausedRun && (
|
||
<PausedRunBanner
|
||
runId={latestPausedRun.id}
|
||
sent={latestPausedRun.sentCount ?? 0}
|
||
total={latestPausedRun.totalCount ?? 0}
|
||
windowEndHour={reminder.deliveryWindowEndHour}
|
||
timezone={reminder.timezone}
|
||
/>
|
||
)}
|
||
```
|
||
|
||
(`sentCount` and `totalCount` need to be derived in the query — adjust `getReminderWithRuns` to count run-target rows by status. If not present, add a count step there.)
|
||
|
||
- [ ] **Step 7: Add Paused filter + Resume button on Activity page**
|
||
|
||
In `apps/web/src/app/activity/page.tsx`, extend the status filter pills to include `"paused"` (amber). For each row whose status is `paused`, render a small Resume button inline (use the same `resumeReminderRunAction`).
|
||
|
||
Re-use a small client component `ResumeRunButton` that wraps the action call (mirrors the existing inline buttons elsewhere). Place it in `apps/web/src/components/activity/resume-run-button.tsx`.
|
||
|
||
- [ ] **Step 8: Run all web tests**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run
|
||
```
|
||
|
||
Expected: all green.
|
||
|
||
- [ ] **Step 9: Restart web container so SSR picks up the new client components**
|
||
|
||
```bash
|
||
NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml restart web
|
||
```
|
||
|
||
- [ ] **Step 10: Commit**
|
||
|
||
```bash
|
||
git add apps/web
|
||
git commit -m "feat(web): paused-run banner with Resume / Cancel buttons
|
||
|
||
resumeReminderRunAction re-enqueues the existing run via pg-boss with
|
||
runId in the payload (Task 7's fire-reminder accepts that). Cancel
|
||
action flips remaining pending targets to skipped and resolves the
|
||
run to partial. Activity tab gets a Paused filter and inline Resume
|
||
button on each paused row."
|
||
```
|
||
|
||
---
|
||
|
||
## Acceptance check (manual)
|
||
|
||
After all 10 tasks land:
|
||
|
||
- [ ] **Smoke 1.** Create a reminder for 1 group with default delivery hours and a 30-second future fire time. Verify it lands; verify the run's `error_summary` is null and status `success`.
|
||
|
||
- [ ] **Smoke 2.** Create a reminder with end hour set to a value that's already past in the operator's timezone. Schedule it to fire NOW. Verify the run resolves `failed` and the `error_summary` reads "Delivery window closed at H:00 (TZ) before any group could be sent."
|
||
|
||
- [ ] **Smoke 3.** Create two reminders on TWO different paired accounts with the same `scheduledAt`. Watch bot logs:
|
||
|
||
```bash
|
||
NO_SUDO=1 docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml logs --tail=60 bot 2>&1 | grep -E "reminder|fire-reminder"
|
||
```
|
||
|
||
Expected: both reminders' `fire-reminder: done` entries land within seconds of each other (parallel), not sequentially separated by a full fan-out.
|
||
|
||
- [ ] **Smoke 4.** Create a reminder with a 5-MB JPEG and 3 groups. Fire it. Bot logs should show only ONE `prepareWAMessageMedia` / upload entry (look for the upload size in the Baileys debug log) followed by 3 relayMessage events.
|
||
|
||
- [ ] **Smoke 5.** Wizard ETA pill: pick 5 groups + default 6/18 hours. Pill should be green ("Fits in window"). Pick 5000 groups (clone an existing one): pill should flip amber ("Likely to pause") with the "Widen the window" hint.
|
||
|
||
- [ ] **Smoke 6.** Trigger a paused run on purpose: set `BOT_MAX_SEND_PER_MINUTE=2` in `.env.development`, restart bot, fire a 10-group reminder with end hour set ~3 minutes from now. Verify:
|
||
|
||
- The run resolves `paused` (~6 groups sent).
|
||
- A "Reminder paused" notification appears (with sent/total in body).
|
||
- The detail page shows the banner with **Resume** and **Cancel run**.
|
||
- Click **Resume** → run continues from the unsent targets, eventually resolving to `success` (or `paused` again if the window closes again).
|
||
- Reset `BOT_MAX_SEND_PER_MINUTE` after the test.
|
||
|
||
- [ ] **Smoke 7.** Cancel-run: same setup as Smoke 6, but click **Cancel run** instead of Resume. Verify remaining pending targets become `skipped`, run resolves `partial` with errorSummary "Canceled by operator before all groups received the message.", banner disappears.
|
||
|
||
- [ ] **Final test sweep:**
|
||
|
||
```bash
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/bot test -- --run
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/web test -- --run
|
||
NO_SUDO=1 ./scripts/dev.sh exec pnpm --filter @cmbot/shared test -- --run
|
||
```
|
||
|
||
Expected: all green. ~395 total (web ≈315 with the new ETA + banner + notification tests; shared adds windowEndAt suite that moved from bot).
|