From bb58f5acf2a01fab8a6e7bf9cbc0eef847f69c62 Mon Sep 17 00:00:00 2001 From: yiekheng Date: Sun, 10 May 2026 14:37:12 +0800 Subject: [PATCH] feat(bot): per-account token-bucket rate limiter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TokenBucket gates each socket.sendMessage call. Tokens regenerate at ratePerMinute/60 per second, capped at one minute's worth so quiet accounts can't burst. FIFO drain across concurrent waiters. accountRateLimiter (singleton) hands out one bucket per accountId, so account A's drain never throttles account B. Default rate is BOT_MAX_SEND_PER_MINUTE (40) — the safe band for an established WhatsApp account. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/bot/src/scheduler/rate-limiter.test.ts | 104 ++++++++++++++++++++ apps/bot/src/scheduler/rate-limiter.ts | 95 ++++++++++++++++++ 2 files changed, 199 insertions(+) create mode 100644 apps/bot/src/scheduler/rate-limiter.test.ts create mode 100644 apps/bot/src/scheduler/rate-limiter.ts diff --git a/apps/bot/src/scheduler/rate-limiter.test.ts b/apps/bot/src/scheduler/rate-limiter.test.ts new file mode 100644 index 0000000..cb04fe6 --- /dev/null +++ b/apps/bot/src/scheduler/rate-limiter.test.ts @@ -0,0 +1,104 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { TokenBucket, accountRateLimiter } from "./rate-limiter.js"; + +describe("TokenBucket", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-10T10:00:00Z")); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + it("starts full: first N=capacity acquires resolve immediately", async () => { + const b = new TokenBucket({ ratePerMinute: 60, capacity: 5 }); + for (let i = 0; i < 5; i++) { + await b.acquire(); + } + }); + + it("blocks the (capacity+1)th acquire until a token regenerates", async () => { + // 60/min = 1 token per second. Capacity 2. + const b = new TokenBucket({ ratePerMinute: 60, capacity: 2 }); + await b.acquire(); + await b.acquire(); + + let resolved = false; + const pending = b.acquire().then(() => { + resolved = true; + }); + + await Promise.resolve(); + expect(resolved).toBe(false); + + await vi.advanceTimersByTimeAsync(1000); + await pending; + expect(resolved).toBe(true); + }); + + it("FIFO: pending acquires resolve in the order they arrived", async () => { + const b = new TokenBucket({ ratePerMinute: 60, capacity: 1 }); + await b.acquire(); // bucket empty + + const order: number[] = []; + const a = b.acquire().then(() => order.push(1)); + const c = b.acquire().then(() => order.push(2)); + + await vi.advanceTimersByTimeAsync(2000); + await Promise.all([a, c]); + expect(order).toEqual([1, 2]); + }); + + it("does not over-fill past capacity even if the clock leaps forward", async () => { + const b = new TokenBucket({ ratePerMinute: 60, capacity: 3 }); + await b.acquire(); + await b.acquire(); + await b.acquire(); + // Leap an hour. Naive impl would credit 3600 tokens; we should cap at 3. + await vi.advanceTimersByTimeAsync(3_600_000); + await b.acquire(); + await b.acquire(); + await b.acquire(); + let resolved = false; + b.acquire().then(() => (resolved = true)); + await Promise.resolve(); + expect(resolved).toBe(false); + }); + + it("ratePerMinute=0 is rejected at construction", () => { + expect(() => new TokenBucket({ ratePerMinute: 0, capacity: 1 })).toThrow(); + }); +}); + +describe("accountRateLimiter (singleton)", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-10T10:00:00Z")); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + it("returns the SAME bucket for repeated lookups of one accountId", () => { + const a1 = accountRateLimiter.get("acct-1"); + const a2 = accountRateLimiter.get("acct-1"); + expect(a1).toBe(a2); + }); + + it("returns DIFFERENT buckets for different accountIds (isolation)", () => { + const a = accountRateLimiter.get("acct-A"); + const b = accountRateLimiter.get("acct-B"); + expect(a).not.toBe(b); + }); + + it("a drained account A bucket does not block account B", async () => { + const a = accountRateLimiter.get("acct-A"); + const b = accountRateLimiter.get("acct-B"); + for (let i = 0; i < 40; i++) await a.acquire(); + + let bResolved = false; + b.acquire().then(() => (bResolved = true)); + await Promise.resolve(); + expect(bResolved).toBe(true); + }); +}); diff --git a/apps/bot/src/scheduler/rate-limiter.ts b/apps/bot/src/scheduler/rate-limiter.ts new file mode 100644 index 0000000..a5c708c --- /dev/null +++ b/apps/bot/src/scheduler/rate-limiter.ts @@ -0,0 +1,95 @@ +import { env } from "../env.js"; + +/** + * Token bucket for per-account send pacing. + * + * Tokens regenerate at `ratePerMinute / 60` per second. Capacity caps + * how many can accumulate during idle periods (so the operator can't + * burst 1000 messages just because the account was quiet for a day). + * + * `acquire()` resolves when a token is available, FIFO across waiters. + * Used by fire-reminder to gate every `socket.sendMessage` call. + */ +export interface TokenBucketOptions { + ratePerMinute: number; + /** Defaults to ratePerMinute (one minute's worth). */ + capacity?: number; +} + +export class TokenBucket { + private readonly ratePerMs: number; + private readonly capacity: number; + private tokens: number; + private lastRefillMs: number; + private waiters: Array<() => void> = []; + + constructor(opts: TokenBucketOptions) { + if (opts.ratePerMinute <= 0) { + throw new Error(`TokenBucket: ratePerMinute must be > 0, got ${opts.ratePerMinute}`); + } + this.ratePerMs = opts.ratePerMinute / 60_000; + this.capacity = opts.capacity ?? opts.ratePerMinute; + this.tokens = this.capacity; + this.lastRefillMs = Date.now(); + } + + /** Resolve when a token is available. FIFO across concurrent waiters. */ + async acquire(): Promise { + this.refill(); + if (this.tokens >= 1 && this.waiters.length === 0) { + this.tokens -= 1; + return; + } + return new Promise((resolve) => { + this.waiters.push(resolve); + this.scheduleNext(); + }); + } + + private refill(): void { + const now = Date.now(); + const elapsed = now - this.lastRefillMs; + if (elapsed <= 0) return; + const gained = elapsed * this.ratePerMs; + this.tokens = Math.min(this.capacity, this.tokens + gained); + this.lastRefillMs = now; + } + + private scheduleNext(): void { + this.refill(); + while (this.tokens >= 1 && this.waiters.length > 0) { + this.tokens -= 1; + const w = this.waiters.shift()!; + w(); + } + if (this.waiters.length === 0) return; + + const tokensShort = 1 - this.tokens; + const waitMs = Math.max(1, Math.ceil(tokensShort / this.ratePerMs)); + setTimeout(() => this.scheduleNext(), waitMs); + } +} + +/** + * Per-accountId TokenBucket registry. Each account gets its own + * pacing budget, so a slow account A never throttles account B. + */ +class AccountRateLimiter { + private buckets = new Map(); + private ratePerMinute: number; + + constructor(ratePerMinute: number) { + this.ratePerMinute = ratePerMinute; + } + + get(accountId: string): TokenBucket { + let b = this.buckets.get(accountId); + if (!b) { + b = new TokenBucket({ ratePerMinute: this.ratePerMinute }); + this.buckets.set(accountId, b); + } + return b; + } +} + +export const accountRateLimiter = new AccountRateLimiter(env.BOT_MAX_SEND_PER_MINUTE);