feat(bot): PerKeyMutex for accountId-keyed serialisation
Same key serialises, different keys run in parallel. Used by fire-reminder to prevent two same-account fan-outs from doubling the effective send rate (which would risk a WhatsApp ban). Chains auto-clean empty entries so the Map doesn't leak. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
c5339abe1a
commit
5913706ab9
93
apps/bot/src/scheduler/per-key-mutex.test.ts
Normal file
93
apps/bot/src/scheduler/per-key-mutex.test.ts
Normal file
@ -0,0 +1,93 @@
|
|||||||
|
import { describe, it, expect } from "vitest";
|
||||||
|
import { PerKeyMutex } from "./per-key-mutex.js";
|
||||||
|
|
||||||
|
/** Tiny clock-free helper: returns a Promise that resolves after
|
||||||
|
* `n` microtasks. Lets us check ordering without real timers. */
|
||||||
|
function tickN(n: number): Promise<void> {
|
||||||
|
let p: Promise<void> = Promise.resolve();
|
||||||
|
for (let i = 0; i < n; i++) p = p.then();
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("PerKeyMutex", () => {
|
||||||
|
it("allows a single call against one key to run immediately", async () => {
|
||||||
|
const m = new PerKeyMutex();
|
||||||
|
const result = await m.run("k1", async () => 42);
|
||||||
|
expect(result).toBe(42);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("serialises two calls against the same key", async () => {
|
||||||
|
const m = new PerKeyMutex();
|
||||||
|
const order: string[] = [];
|
||||||
|
|
||||||
|
const a = m.run("k1", async () => {
|
||||||
|
order.push("a-start");
|
||||||
|
await tickN(5);
|
||||||
|
order.push("a-end");
|
||||||
|
});
|
||||||
|
const b = m.run("k1", async () => {
|
||||||
|
order.push("b-start");
|
||||||
|
order.push("b-end");
|
||||||
|
});
|
||||||
|
|
||||||
|
await Promise.all([a, b]);
|
||||||
|
expect(order).toEqual(["a-start", "a-end", "b-start", "b-end"]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("runs different keys in parallel", async () => {
|
||||||
|
const m = new PerKeyMutex();
|
||||||
|
const order: string[] = [];
|
||||||
|
|
||||||
|
const a = m.run("k1", async () => {
|
||||||
|
order.push("a-start");
|
||||||
|
await tickN(5);
|
||||||
|
order.push("a-end");
|
||||||
|
});
|
||||||
|
const b = m.run("k2", async () => {
|
||||||
|
order.push("b-start");
|
||||||
|
order.push("b-end");
|
||||||
|
});
|
||||||
|
|
||||||
|
await Promise.all([a, b]);
|
||||||
|
expect(order[0]).toBe("a-start");
|
||||||
|
expect(order).toContain("b-start");
|
||||||
|
expect(order).toContain("b-end");
|
||||||
|
// b's pair lands before a's end (they run in parallel).
|
||||||
|
expect(order.indexOf("b-end")).toBeLessThan(order.indexOf("a-end"));
|
||||||
|
});
|
||||||
|
|
||||||
|
it("releases the lock when the handler throws", async () => {
|
||||||
|
const m = new PerKeyMutex();
|
||||||
|
await expect(
|
||||||
|
m.run("k1", async () => {
|
||||||
|
throw new Error("boom");
|
||||||
|
}),
|
||||||
|
).rejects.toThrow("boom");
|
||||||
|
const result = await m.run("k1", async () => "after");
|
||||||
|
expect(result).toBe("after");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("forwards the resolved value of the handler", async () => {
|
||||||
|
const m = new PerKeyMutex();
|
||||||
|
const out = await m.run("k1", async () => ({ ok: true, n: 7 }));
|
||||||
|
expect(out).toEqual({ ok: true, n: 7 });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("cleans up internal state for keys with no waiters", async () => {
|
||||||
|
const m = new PerKeyMutex();
|
||||||
|
await m.run("k1", async () => {});
|
||||||
|
expect(m.activeKeyCount()).toBe(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("retains a key while a chain is in flight, then drops it", async () => {
|
||||||
|
const m = new PerKeyMutex();
|
||||||
|
let release!: () => void;
|
||||||
|
const gate = new Promise<void>((r) => (release = r));
|
||||||
|
|
||||||
|
const inFlight = m.run("k1", () => gate);
|
||||||
|
expect(m.activeKeyCount()).toBe(1);
|
||||||
|
release();
|
||||||
|
await inFlight;
|
||||||
|
expect(m.activeKeyCount()).toBe(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
48
apps/bot/src/scheduler/per-key-mutex.ts
Normal file
48
apps/bot/src/scheduler/per-key-mutex.ts
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
/**
|
||||||
|
* Async mutex keyed by a string. Different keys run in parallel;
|
||||||
|
* same-key calls serialise.
|
||||||
|
*
|
||||||
|
* Used by fire-reminder so two reminders on the SAME WhatsApp account
|
||||||
|
* take turns (running them concurrently would double the effective
|
||||||
|
* send rate and risk a ban), while reminders on DIFFERENT accounts
|
||||||
|
* proceed in parallel.
|
||||||
|
*
|
||||||
|
* Implementation is a chain-per-key Promise: each call appends its
|
||||||
|
* work to the key's tail. Empty chains are cleaned up so the Map
|
||||||
|
* doesn't grow unbounded across the bot's lifetime.
|
||||||
|
*/
|
||||||
|
export class PerKeyMutex {
|
||||||
|
private chains = new Map<string, Promise<void>>();
|
||||||
|
|
||||||
|
async run<T>(key: string, fn: () => Promise<T>): Promise<T> {
|
||||||
|
const prev = this.chains.get(key) ?? Promise.resolve();
|
||||||
|
|
||||||
|
let release!: () => void;
|
||||||
|
const completion = new Promise<void>((r) => (release = r));
|
||||||
|
const chained = prev.then(() => completion);
|
||||||
|
this.chains.set(key, chained);
|
||||||
|
|
||||||
|
try {
|
||||||
|
await prev;
|
||||||
|
return await fn();
|
||||||
|
} finally {
|
||||||
|
release();
|
||||||
|
// Drop the entry only if no later caller has appended in the
|
||||||
|
// meantime — otherwise we'd evict the in-flight chain.
|
||||||
|
if (this.chains.get(key) === chained) {
|
||||||
|
this.chains.delete(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
activeKeyCount(): number {
|
||||||
|
return this.chains.size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Singleton mutex used by fire-reminder, keyed by accountId. Lives at
|
||||||
|
* module scope so multiple pg-boss workers in the same process share
|
||||||
|
* state.
|
||||||
|
*/
|
||||||
|
export const accountMutex = new PerKeyMutex();
|
||||||
Loading…
x
Reference in New Issue
Block a user