feat(bot): per-account token-bucket rate limiter

TokenBucket gates each socket.sendMessage call. Tokens regenerate at
ratePerMinute/60 per second, capped at one minute's worth so quiet
accounts can't burst. FIFO drain across concurrent waiters.

accountRateLimiter (singleton) hands out one bucket per accountId, so
account A's drain never throttles account B. Default rate is
BOT_MAX_SEND_PER_MINUTE (40) — the safe band for an established
WhatsApp account.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
yiekheng 2026-05-10 14:37:12 +08:00
parent 5913706ab9
commit bb58f5acf2
2 changed files with 199 additions and 0 deletions

View File

@ -0,0 +1,104 @@
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import { TokenBucket, accountRateLimiter } from "./rate-limiter.js";
describe("TokenBucket", () => {
beforeEach(() => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-10T10:00:00Z"));
});
afterEach(() => {
vi.useRealTimers();
});
it("starts full: first N=capacity acquires resolve immediately", async () => {
const b = new TokenBucket({ ratePerMinute: 60, capacity: 5 });
for (let i = 0; i < 5; i++) {
await b.acquire();
}
});
it("blocks the (capacity+1)th acquire until a token regenerates", async () => {
// 60/min = 1 token per second. Capacity 2.
const b = new TokenBucket({ ratePerMinute: 60, capacity: 2 });
await b.acquire();
await b.acquire();
let resolved = false;
const pending = b.acquire().then(() => {
resolved = true;
});
await Promise.resolve();
expect(resolved).toBe(false);
await vi.advanceTimersByTimeAsync(1000);
await pending;
expect(resolved).toBe(true);
});
it("FIFO: pending acquires resolve in the order they arrived", async () => {
const b = new TokenBucket({ ratePerMinute: 60, capacity: 1 });
await b.acquire(); // bucket empty
const order: number[] = [];
const a = b.acquire().then(() => order.push(1));
const c = b.acquire().then(() => order.push(2));
await vi.advanceTimersByTimeAsync(2000);
await Promise.all([a, c]);
expect(order).toEqual([1, 2]);
});
it("does not over-fill past capacity even if the clock leaps forward", async () => {
const b = new TokenBucket({ ratePerMinute: 60, capacity: 3 });
await b.acquire();
await b.acquire();
await b.acquire();
// Leap an hour. Naive impl would credit 3600 tokens; we should cap at 3.
await vi.advanceTimersByTimeAsync(3_600_000);
await b.acquire();
await b.acquire();
await b.acquire();
let resolved = false;
b.acquire().then(() => (resolved = true));
await Promise.resolve();
expect(resolved).toBe(false);
});
it("ratePerMinute=0 is rejected at construction", () => {
expect(() => new TokenBucket({ ratePerMinute: 0, capacity: 1 })).toThrow();
});
});
describe("accountRateLimiter (singleton)", () => {
beforeEach(() => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-10T10:00:00Z"));
});
afterEach(() => {
vi.useRealTimers();
});
it("returns the SAME bucket for repeated lookups of one accountId", () => {
const a1 = accountRateLimiter.get("acct-1");
const a2 = accountRateLimiter.get("acct-1");
expect(a1).toBe(a2);
});
it("returns DIFFERENT buckets for different accountIds (isolation)", () => {
const a = accountRateLimiter.get("acct-A");
const b = accountRateLimiter.get("acct-B");
expect(a).not.toBe(b);
});
it("a drained account A bucket does not block account B", async () => {
const a = accountRateLimiter.get("acct-A");
const b = accountRateLimiter.get("acct-B");
for (let i = 0; i < 40; i++) await a.acquire();
let bResolved = false;
b.acquire().then(() => (bResolved = true));
await Promise.resolve();
expect(bResolved).toBe(true);
});
});

View File

@ -0,0 +1,95 @@
import { env } from "../env.js";
/**
* Token bucket for per-account send pacing.
*
* Tokens regenerate at `ratePerMinute / 60` per second. Capacity caps
* how many can accumulate during idle periods (so the operator can't
* burst 1000 messages just because the account was quiet for a day).
*
* `acquire()` resolves when a token is available, FIFO across waiters.
* Used by fire-reminder to gate every `socket.sendMessage` call.
*/
export interface TokenBucketOptions {
ratePerMinute: number;
/** Defaults to ratePerMinute (one minute's worth). */
capacity?: number;
}
export class TokenBucket {
private readonly ratePerMs: number;
private readonly capacity: number;
private tokens: number;
private lastRefillMs: number;
private waiters: Array<() => void> = [];
constructor(opts: TokenBucketOptions) {
if (opts.ratePerMinute <= 0) {
throw new Error(`TokenBucket: ratePerMinute must be > 0, got ${opts.ratePerMinute}`);
}
this.ratePerMs = opts.ratePerMinute / 60_000;
this.capacity = opts.capacity ?? opts.ratePerMinute;
this.tokens = this.capacity;
this.lastRefillMs = Date.now();
}
/** Resolve when a token is available. FIFO across concurrent waiters. */
async acquire(): Promise<void> {
this.refill();
if (this.tokens >= 1 && this.waiters.length === 0) {
this.tokens -= 1;
return;
}
return new Promise<void>((resolve) => {
this.waiters.push(resolve);
this.scheduleNext();
});
}
private refill(): void {
const now = Date.now();
const elapsed = now - this.lastRefillMs;
if (elapsed <= 0) return;
const gained = elapsed * this.ratePerMs;
this.tokens = Math.min(this.capacity, this.tokens + gained);
this.lastRefillMs = now;
}
private scheduleNext(): void {
this.refill();
while (this.tokens >= 1 && this.waiters.length > 0) {
this.tokens -= 1;
const w = this.waiters.shift()!;
w();
}
if (this.waiters.length === 0) return;
const tokensShort = 1 - this.tokens;
const waitMs = Math.max(1, Math.ceil(tokensShort / this.ratePerMs));
setTimeout(() => this.scheduleNext(), waitMs);
}
}
/**
* Per-accountId TokenBucket registry. Each account gets its own
* pacing budget, so a slow account A never throttles account B.
*/
class AccountRateLimiter {
private buckets = new Map<string, TokenBucket>();
private ratePerMinute: number;
constructor(ratePerMinute: number) {
this.ratePerMinute = ratePerMinute;
}
get(accountId: string): TokenBucket {
let b = this.buckets.get(accountId);
if (!b) {
b = new TokenBucket({ ratePerMinute: this.ratePerMinute });
this.buckets.set(accountId, b);
}
return b;
}
}
export const accountRateLimiter = new AccountRateLimiter(env.BOT_MAX_SEND_PER_MINUTE);