From 5913706ab92d3785a7a635833c840e7b2203336a Mon Sep 17 00:00:00 2001 From: yiekheng Date: Sun, 10 May 2026 14:36:22 +0800 Subject: [PATCH] feat(bot): PerKeyMutex for accountId-keyed serialisation Same key serialises, different keys run in parallel. Used by fire-reminder to prevent two same-account fan-outs from doubling the effective send rate (which would risk a WhatsApp ban). Chains auto-clean empty entries so the Map doesn't leak. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/bot/src/scheduler/per-key-mutex.test.ts | 93 ++++++++++++++++++++ apps/bot/src/scheduler/per-key-mutex.ts | 48 ++++++++++ 2 files changed, 141 insertions(+) create mode 100644 apps/bot/src/scheduler/per-key-mutex.test.ts create mode 100644 apps/bot/src/scheduler/per-key-mutex.ts diff --git a/apps/bot/src/scheduler/per-key-mutex.test.ts b/apps/bot/src/scheduler/per-key-mutex.test.ts new file mode 100644 index 0000000..d1c46c2 --- /dev/null +++ b/apps/bot/src/scheduler/per-key-mutex.test.ts @@ -0,0 +1,93 @@ +import { describe, it, expect } from "vitest"; +import { PerKeyMutex } from "./per-key-mutex.js"; + +/** Tiny clock-free helper: returns a Promise that resolves after + * `n` microtasks. Lets us check ordering without real timers. */ +function tickN(n: number): Promise { + let p: Promise = Promise.resolve(); + for (let i = 0; i < n; i++) p = p.then(); + return p; +} + +describe("PerKeyMutex", () => { + it("allows a single call against one key to run immediately", async () => { + const m = new PerKeyMutex(); + const result = await m.run("k1", async () => 42); + expect(result).toBe(42); + }); + + it("serialises two calls against the same key", async () => { + const m = new PerKeyMutex(); + const order: string[] = []; + + const a = m.run("k1", async () => { + order.push("a-start"); + await tickN(5); + order.push("a-end"); + }); + const b = m.run("k1", async () => { + order.push("b-start"); + order.push("b-end"); + }); + + await Promise.all([a, b]); + expect(order).toEqual(["a-start", "a-end", "b-start", "b-end"]); + }); + + it("runs different keys in parallel", async () => { + const m = new PerKeyMutex(); + const order: string[] = []; + + const a = m.run("k1", async () => { + order.push("a-start"); + await tickN(5); + order.push("a-end"); + }); + const b = m.run("k2", async () => { + order.push("b-start"); + order.push("b-end"); + }); + + await Promise.all([a, b]); + expect(order[0]).toBe("a-start"); + expect(order).toContain("b-start"); + expect(order).toContain("b-end"); + // b's pair lands before a's end (they run in parallel). + expect(order.indexOf("b-end")).toBeLessThan(order.indexOf("a-end")); + }); + + it("releases the lock when the handler throws", async () => { + const m = new PerKeyMutex(); + await expect( + m.run("k1", async () => { + throw new Error("boom"); + }), + ).rejects.toThrow("boom"); + const result = await m.run("k1", async () => "after"); + expect(result).toBe("after"); + }); + + it("forwards the resolved value of the handler", async () => { + const m = new PerKeyMutex(); + const out = await m.run("k1", async () => ({ ok: true, n: 7 })); + expect(out).toEqual({ ok: true, n: 7 }); + }); + + it("cleans up internal state for keys with no waiters", async () => { + const m = new PerKeyMutex(); + await m.run("k1", async () => {}); + expect(m.activeKeyCount()).toBe(0); + }); + + it("retains a key while a chain is in flight, then drops it", async () => { + const m = new PerKeyMutex(); + let release!: () => void; + const gate = new Promise((r) => (release = r)); + + const inFlight = m.run("k1", () => gate); + expect(m.activeKeyCount()).toBe(1); + release(); + await inFlight; + expect(m.activeKeyCount()).toBe(0); + }); +}); diff --git a/apps/bot/src/scheduler/per-key-mutex.ts b/apps/bot/src/scheduler/per-key-mutex.ts new file mode 100644 index 0000000..ce45aa5 --- /dev/null +++ b/apps/bot/src/scheduler/per-key-mutex.ts @@ -0,0 +1,48 @@ +/** + * Async mutex keyed by a string. Different keys run in parallel; + * same-key calls serialise. + * + * Used by fire-reminder so two reminders on the SAME WhatsApp account + * take turns (running them concurrently would double the effective + * send rate and risk a ban), while reminders on DIFFERENT accounts + * proceed in parallel. + * + * Implementation is a chain-per-key Promise: each call appends its + * work to the key's tail. Empty chains are cleaned up so the Map + * doesn't grow unbounded across the bot's lifetime. + */ +export class PerKeyMutex { + private chains = new Map>(); + + async run(key: string, fn: () => Promise): Promise { + const prev = this.chains.get(key) ?? Promise.resolve(); + + let release!: () => void; + const completion = new Promise((r) => (release = r)); + const chained = prev.then(() => completion); + this.chains.set(key, chained); + + try { + await prev; + return await fn(); + } finally { + release(); + // Drop the entry only if no later caller has appended in the + // meantime — otherwise we'd evict the in-flight chain. + if (this.chains.get(key) === chained) { + this.chains.delete(key); + } + } + } + + activeKeyCount(): number { + return this.chains.size; + } +} + +/** + * Singleton mutex used by fire-reminder, keyed by accountId. Lives at + * module scope so multiple pg-boss workers in the same process share + * state. + */ +export const accountMutex = new PerKeyMutex();