feat(web): SSE endpoint + useEvents hook
This commit is contained in:
parent
63d41c4389
commit
1fe674c70e
85
apps/web/src/app/api/events/route.ts
Normal file
85
apps/web/src/app/api/events/route.ts
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
import { NextRequest } from "next/server";
|
||||||
|
import { Client } from "pg";
|
||||||
|
import { env } from "@/env";
|
||||||
|
import { logger } from "@/lib/logger";
|
||||||
|
|
||||||
|
export const runtime = "nodejs";
|
||||||
|
export const dynamic = "force-dynamic";
|
||||||
|
|
||||||
|
export async function GET(_req: NextRequest) {
|
||||||
|
const stream = new ReadableStream({
|
||||||
|
async start(controller) {
|
||||||
|
const encoder = new TextEncoder();
|
||||||
|
const client = new Client({ connectionString: env.DATABASE_URL });
|
||||||
|
let closed = false;
|
||||||
|
|
||||||
|
const send = (event: string, data: unknown) => {
|
||||||
|
if (closed) return;
|
||||||
|
try {
|
||||||
|
controller.enqueue(
|
||||||
|
encoder.encode(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`),
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
logger.warn({ err }, "sse: enqueue failed");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
await client.connect();
|
||||||
|
await client.query('LISTEN "web.event"');
|
||||||
|
} catch (err) {
|
||||||
|
logger.error({ err }, "sse: failed to start listener");
|
||||||
|
controller.close();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
client.on("notification", (msg) => {
|
||||||
|
if (msg.channel !== "web.event" || !msg.payload) return;
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(msg.payload) as { type: string };
|
||||||
|
send(parsed.type, parsed);
|
||||||
|
} catch (err) {
|
||||||
|
logger.warn({ err, payload: msg.payload }, "sse: bad payload");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
client.on("error", (err) => {
|
||||||
|
logger.error({ err }, "sse: pg client error");
|
||||||
|
});
|
||||||
|
|
||||||
|
// Keep-alive ping every 25 seconds
|
||||||
|
const ping = setInterval(() => send("ping", { ts: Date.now() }), 25_000);
|
||||||
|
|
||||||
|
// Initial hello
|
||||||
|
send("hello", { ts: Date.now() });
|
||||||
|
|
||||||
|
const cleanup = async () => {
|
||||||
|
if (closed) return;
|
||||||
|
closed = true;
|
||||||
|
clearInterval(ping);
|
||||||
|
try {
|
||||||
|
await client.query('UNLISTEN "web.event"');
|
||||||
|
} catch {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
await client.end().catch(() => undefined);
|
||||||
|
try {
|
||||||
|
controller.close();
|
||||||
|
} catch {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
_req.signal.addEventListener("abort", () => void cleanup());
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
return new Response(stream, {
|
||||||
|
headers: {
|
||||||
|
"Content-Type": "text/event-stream",
|
||||||
|
"Cache-Control": "no-cache, no-transform",
|
||||||
|
Connection: "keep-alive",
|
||||||
|
"X-Accel-Buffering": "no",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
44
apps/web/src/hooks/use-events.ts
Normal file
44
apps/web/src/hooks/use-events.ts
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
"use client";
|
||||||
|
|
||||||
|
import { useEffect } from "react";
|
||||||
|
|
||||||
|
export type WebEventMap = {
|
||||||
|
hello: { ts: number };
|
||||||
|
ping: { ts: number };
|
||||||
|
"session.qr": { accountId: string; qrPng: string };
|
||||||
|
"session.connected": { accountId: string; phoneNumber: string | null };
|
||||||
|
"session.disconnected": { accountId: string };
|
||||||
|
"session.timeout": { accountId: string };
|
||||||
|
"groups.synced": { accountId: string; count: number };
|
||||||
|
"reminder.fired": { reminderId: string; runId: string; status: string };
|
||||||
|
"reminder.failed": { reminderId: string; error: string };
|
||||||
|
};
|
||||||
|
|
||||||
|
type Handlers = { [K in keyof WebEventMap]?: (data: WebEventMap[K]) => void };
|
||||||
|
|
||||||
|
export function useEvents(handlers: Handlers): void {
|
||||||
|
useEffect(() => {
|
||||||
|
const es = new EventSource("/api/events");
|
||||||
|
const wired: { type: string; fn: (e: MessageEvent) => void }[] = [];
|
||||||
|
for (const type of Object.keys(handlers) as (keyof WebEventMap)[]) {
|
||||||
|
const h = handlers[type];
|
||||||
|
if (!h) continue;
|
||||||
|
const fn = (e: MessageEvent) => {
|
||||||
|
try {
|
||||||
|
(h as (data: unknown) => void)(JSON.parse(e.data));
|
||||||
|
} catch {
|
||||||
|
// ignore malformed
|
||||||
|
}
|
||||||
|
};
|
||||||
|
es.addEventListener(type, fn);
|
||||||
|
wired.push({ type, fn });
|
||||||
|
}
|
||||||
|
return () => {
|
||||||
|
for (const { type, fn } of wired) {
|
||||||
|
es.removeEventListener(type, fn);
|
||||||
|
}
|
||||||
|
es.close();
|
||||||
|
};
|
||||||
|
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||||
|
}, []);
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user