diff --git a/apps/bot/src/scheduler/rate-limiter.test.ts b/apps/bot/src/scheduler/rate-limiter.test.ts new file mode 100644 index 0000000..cb04fe6 --- /dev/null +++ b/apps/bot/src/scheduler/rate-limiter.test.ts @@ -0,0 +1,104 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { TokenBucket, accountRateLimiter } from "./rate-limiter.js"; + +describe("TokenBucket", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-10T10:00:00Z")); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + it("starts full: first N=capacity acquires resolve immediately", async () => { + const b = new TokenBucket({ ratePerMinute: 60, capacity: 5 }); + for (let i = 0; i < 5; i++) { + await b.acquire(); + } + }); + + it("blocks the (capacity+1)th acquire until a token regenerates", async () => { + // 60/min = 1 token per second. Capacity 2. + const b = new TokenBucket({ ratePerMinute: 60, capacity: 2 }); + await b.acquire(); + await b.acquire(); + + let resolved = false; + const pending = b.acquire().then(() => { + resolved = true; + }); + + await Promise.resolve(); + expect(resolved).toBe(false); + + await vi.advanceTimersByTimeAsync(1000); + await pending; + expect(resolved).toBe(true); + }); + + it("FIFO: pending acquires resolve in the order they arrived", async () => { + const b = new TokenBucket({ ratePerMinute: 60, capacity: 1 }); + await b.acquire(); // bucket empty + + const order: number[] = []; + const a = b.acquire().then(() => order.push(1)); + const c = b.acquire().then(() => order.push(2)); + + await vi.advanceTimersByTimeAsync(2000); + await Promise.all([a, c]); + expect(order).toEqual([1, 2]); + }); + + it("does not over-fill past capacity even if the clock leaps forward", async () => { + const b = new TokenBucket({ ratePerMinute: 60, capacity: 3 }); + await b.acquire(); + await b.acquire(); + await b.acquire(); + // Leap an hour. Naive impl would credit 3600 tokens; we should cap at 3. + await vi.advanceTimersByTimeAsync(3_600_000); + await b.acquire(); + await b.acquire(); + await b.acquire(); + let resolved = false; + b.acquire().then(() => (resolved = true)); + await Promise.resolve(); + expect(resolved).toBe(false); + }); + + it("ratePerMinute=0 is rejected at construction", () => { + expect(() => new TokenBucket({ ratePerMinute: 0, capacity: 1 })).toThrow(); + }); +}); + +describe("accountRateLimiter (singleton)", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-10T10:00:00Z")); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + it("returns the SAME bucket for repeated lookups of one accountId", () => { + const a1 = accountRateLimiter.get("acct-1"); + const a2 = accountRateLimiter.get("acct-1"); + expect(a1).toBe(a2); + }); + + it("returns DIFFERENT buckets for different accountIds (isolation)", () => { + const a = accountRateLimiter.get("acct-A"); + const b = accountRateLimiter.get("acct-B"); + expect(a).not.toBe(b); + }); + + it("a drained account A bucket does not block account B", async () => { + const a = accountRateLimiter.get("acct-A"); + const b = accountRateLimiter.get("acct-B"); + for (let i = 0; i < 40; i++) await a.acquire(); + + let bResolved = false; + b.acquire().then(() => (bResolved = true)); + await Promise.resolve(); + expect(bResolved).toBe(true); + }); +}); diff --git a/apps/bot/src/scheduler/rate-limiter.ts b/apps/bot/src/scheduler/rate-limiter.ts new file mode 100644 index 0000000..a5c708c --- /dev/null +++ b/apps/bot/src/scheduler/rate-limiter.ts @@ -0,0 +1,95 @@ +import { env } from "../env.js"; + +/** + * Token bucket for per-account send pacing. + * + * Tokens regenerate at `ratePerMinute / 60` per second. Capacity caps + * how many can accumulate during idle periods (so the operator can't + * burst 1000 messages just because the account was quiet for a day). + * + * `acquire()` resolves when a token is available, FIFO across waiters. + * Used by fire-reminder to gate every `socket.sendMessage` call. + */ +export interface TokenBucketOptions { + ratePerMinute: number; + /** Defaults to ratePerMinute (one minute's worth). */ + capacity?: number; +} + +export class TokenBucket { + private readonly ratePerMs: number; + private readonly capacity: number; + private tokens: number; + private lastRefillMs: number; + private waiters: Array<() => void> = []; + + constructor(opts: TokenBucketOptions) { + if (opts.ratePerMinute <= 0) { + throw new Error(`TokenBucket: ratePerMinute must be > 0, got ${opts.ratePerMinute}`); + } + this.ratePerMs = opts.ratePerMinute / 60_000; + this.capacity = opts.capacity ?? opts.ratePerMinute; + this.tokens = this.capacity; + this.lastRefillMs = Date.now(); + } + + /** Resolve when a token is available. FIFO across concurrent waiters. */ + async acquire(): Promise { + this.refill(); + if (this.tokens >= 1 && this.waiters.length === 0) { + this.tokens -= 1; + return; + } + return new Promise((resolve) => { + this.waiters.push(resolve); + this.scheduleNext(); + }); + } + + private refill(): void { + const now = Date.now(); + const elapsed = now - this.lastRefillMs; + if (elapsed <= 0) return; + const gained = elapsed * this.ratePerMs; + this.tokens = Math.min(this.capacity, this.tokens + gained); + this.lastRefillMs = now; + } + + private scheduleNext(): void { + this.refill(); + while (this.tokens >= 1 && this.waiters.length > 0) { + this.tokens -= 1; + const w = this.waiters.shift()!; + w(); + } + if (this.waiters.length === 0) return; + + const tokensShort = 1 - this.tokens; + const waitMs = Math.max(1, Math.ceil(tokensShort / this.ratePerMs)); + setTimeout(() => this.scheduleNext(), waitMs); + } +} + +/** + * Per-accountId TokenBucket registry. Each account gets its own + * pacing budget, so a slow account A never throttles account B. + */ +class AccountRateLimiter { + private buckets = new Map(); + private ratePerMinute: number; + + constructor(ratePerMinute: number) { + this.ratePerMinute = ratePerMinute; + } + + get(accountId: string): TokenBucket { + let b = this.buckets.get(accountId); + if (!b) { + b = new TokenBucket({ ratePerMinute: this.ratePerMinute }); + this.buckets.set(accountId, b); + } + return b; + } +} + +export const accountRateLimiter = new AccountRateLimiter(env.BOT_MAX_SEND_PER_MINUTE);