feat(bot): MediaUploadCache for once-per-run media prepare
One cache instance per fire-reminder run. Each unique mediaId gets prepared (uploaded to WA CDN) exactly once, and subsequent group sends within the run reuse the prepared message via relayMessage. Concurrent gets coalesce into a single prepare. Failed prepares don't poison the cache — next caller retries. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
bb58f5acf2
commit
7da872eb5f
70
apps/bot/src/scheduler/media-upload-cache.test.ts
Normal file
70
apps/bot/src/scheduler/media-upload-cache.test.ts
Normal file
@ -0,0 +1,70 @@
|
|||||||
|
import { describe, it, expect, vi } from "vitest";
|
||||||
|
import { MediaUploadCache } from "./media-upload-cache.js";
|
||||||
|
|
||||||
|
describe("MediaUploadCache", () => {
|
||||||
|
it("uploads each unique mediaId exactly once across N gets", async () => {
|
||||||
|
const prepare = vi.fn(async (mediaId: string) => ({
|
||||||
|
kind: "prepared",
|
||||||
|
mediaId,
|
||||||
|
}));
|
||||||
|
const cache = new MediaUploadCache(prepare);
|
||||||
|
|
||||||
|
const a1 = await cache.get("media-A");
|
||||||
|
const a2 = await cache.get("media-A");
|
||||||
|
const b1 = await cache.get("media-B");
|
||||||
|
|
||||||
|
expect(prepare).toHaveBeenCalledTimes(2);
|
||||||
|
expect(prepare).toHaveBeenCalledWith("media-A");
|
||||||
|
expect(prepare).toHaveBeenCalledWith("media-B");
|
||||||
|
expect(a1).toBe(a2);
|
||||||
|
expect(a1).not.toBe(b1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("coalesces concurrent gets of the same mediaId into ONE prepare call", async () => {
|
||||||
|
let resolveA: (v: unknown) => void = () => {};
|
||||||
|
const aPromise = new Promise((r) => (resolveA = r));
|
||||||
|
const prepare = vi.fn(async (mediaId: string) => {
|
||||||
|
if (mediaId === "media-A") return aPromise;
|
||||||
|
return { kind: "prepared", mediaId };
|
||||||
|
});
|
||||||
|
const cache = new MediaUploadCache(prepare);
|
||||||
|
|
||||||
|
const p1 = cache.get("media-A");
|
||||||
|
const p2 = cache.get("media-A");
|
||||||
|
const p3 = cache.get("media-A");
|
||||||
|
|
||||||
|
resolveA({ kind: "prepared", mediaId: "media-A" });
|
||||||
|
|
||||||
|
const [r1, r2, r3] = await Promise.all([p1, p2, p3]);
|
||||||
|
expect(prepare).toHaveBeenCalledTimes(1);
|
||||||
|
expect(r1).toBe(r2);
|
||||||
|
expect(r2).toBe(r3);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("a thrown prepare is NOT cached — next get retries", async () => {
|
||||||
|
let attempt = 0;
|
||||||
|
const prepare = vi.fn(async (_mediaId: string) => {
|
||||||
|
attempt++;
|
||||||
|
if (attempt === 1) throw new Error("upload network blip");
|
||||||
|
return { kind: "prepared", attempt };
|
||||||
|
});
|
||||||
|
const cache = new MediaUploadCache(prepare);
|
||||||
|
|
||||||
|
await expect(cache.get("media-A")).rejects.toThrow("upload network blip");
|
||||||
|
const r = await cache.get("media-A");
|
||||||
|
expect(prepare).toHaveBeenCalledTimes(2);
|
||||||
|
expect(r).toEqual({ kind: "prepared", attempt: 2 });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("size() reflects the number of cached unique mediaIds", async () => {
|
||||||
|
const prepare = async (mediaId: string) => ({ mediaId });
|
||||||
|
const cache = new MediaUploadCache(prepare);
|
||||||
|
expect(cache.size()).toBe(0);
|
||||||
|
await cache.get("a");
|
||||||
|
expect(cache.size()).toBe(1);
|
||||||
|
await cache.get("b");
|
||||||
|
expect(cache.size()).toBe(2);
|
||||||
|
await cache.get("a"); // already cached
|
||||||
|
expect(cache.size()).toBe(2);
|
||||||
|
});
|
||||||
|
});
|
||||||
44
apps/bot/src/scheduler/media-upload-cache.ts
Normal file
44
apps/bot/src/scheduler/media-upload-cache.ts
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
/**
|
||||||
|
* Per-run cache of `prepareWAMessageMedia` results, keyed by
|
||||||
|
* `mediaId`. The point: when a reminder fans out to 1000 groups with
|
||||||
|
* one image, we want to upload that image to WhatsApp's CDN ONCE, not
|
||||||
|
* 1000 times. Subsequent group sends reuse the prepared message
|
||||||
|
* (with embedded directPath / mediaKey) via socket.relayMessage.
|
||||||
|
*
|
||||||
|
* Lifecycle: one cache instance per fire-reminder run. After the run
|
||||||
|
* completes, the cache is dropped — we don't share uploads across
|
||||||
|
* runs because WA media tokens are short-lived.
|
||||||
|
*
|
||||||
|
* Concurrent gets of the same mediaId are coalesced into a single
|
||||||
|
* prepare call. Failed prepares are NOT cached so the next attempt
|
||||||
|
* retries (network blips at upload time shouldn't poison the cache).
|
||||||
|
*/
|
||||||
|
export class MediaUploadCache<T> {
|
||||||
|
private readonly prepare: (mediaId: string) => Promise<T>;
|
||||||
|
private readonly entries = new Map<string, Promise<T>>();
|
||||||
|
|
||||||
|
constructor(prepare: (mediaId: string) => Promise<T>) {
|
||||||
|
this.prepare = prepare;
|
||||||
|
}
|
||||||
|
|
||||||
|
async get(mediaId: string): Promise<T> {
|
||||||
|
const existing = this.entries.get(mediaId);
|
||||||
|
if (existing) return existing;
|
||||||
|
|
||||||
|
const inflight = this.prepare(mediaId);
|
||||||
|
// Insert eagerly so concurrent gets dedupe.
|
||||||
|
this.entries.set(mediaId, inflight);
|
||||||
|
|
||||||
|
try {
|
||||||
|
return await inflight;
|
||||||
|
} catch (err) {
|
||||||
|
// Don't cache failures — the next caller should retry.
|
||||||
|
this.entries.delete(mediaId);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
size(): number {
|
||||||
|
return this.entries.size;
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user