diff --git a/apps/web/src/app/api/events/route.ts b/apps/web/src/app/api/events/route.ts new file mode 100644 index 0000000..0761156 --- /dev/null +++ b/apps/web/src/app/api/events/route.ts @@ -0,0 +1,85 @@ +import { NextRequest } from "next/server"; +import { Client } from "pg"; +import { env } from "@/env"; +import { logger } from "@/lib/logger"; + +export const runtime = "nodejs"; +export const dynamic = "force-dynamic"; + +export async function GET(_req: NextRequest) { + const stream = new ReadableStream({ + async start(controller) { + const encoder = new TextEncoder(); + const client = new Client({ connectionString: env.DATABASE_URL }); + let closed = false; + + const send = (event: string, data: unknown) => { + if (closed) return; + try { + controller.enqueue( + encoder.encode(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`), + ); + } catch (err) { + logger.warn({ err }, "sse: enqueue failed"); + } + }; + + try { + await client.connect(); + await client.query('LISTEN "web.event"'); + } catch (err) { + logger.error({ err }, "sse: failed to start listener"); + controller.close(); + return; + } + + client.on("notification", (msg) => { + if (msg.channel !== "web.event" || !msg.payload) return; + try { + const parsed = JSON.parse(msg.payload) as { type: string }; + send(parsed.type, parsed); + } catch (err) { + logger.warn({ err, payload: msg.payload }, "sse: bad payload"); + } + }); + + client.on("error", (err) => { + logger.error({ err }, "sse: pg client error"); + }); + + // Keep-alive ping every 25 seconds + const ping = setInterval(() => send("ping", { ts: Date.now() }), 25_000); + + // Initial hello + send("hello", { ts: Date.now() }); + + const cleanup = async () => { + if (closed) return; + closed = true; + clearInterval(ping); + try { + await client.query('UNLISTEN "web.event"'); + } catch { + // ignore + } + await client.end().catch(() => undefined); + try { + controller.close(); + } catch { + // ignore + } + }; + + _req.signal.addEventListener("abort", () => void cleanup()); + }, + }); + + return new Response(stream, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache, no-transform", + Connection: "keep-alive", + "X-Accel-Buffering": "no", + }, + }); +} diff --git a/apps/web/src/hooks/use-events.ts b/apps/web/src/hooks/use-events.ts new file mode 100644 index 0000000..675fe5b --- /dev/null +++ b/apps/web/src/hooks/use-events.ts @@ -0,0 +1,44 @@ +"use client"; + +import { useEffect } from "react"; + +export type WebEventMap = { + hello: { ts: number }; + ping: { ts: number }; + "session.qr": { accountId: string; qrPng: string }; + "session.connected": { accountId: string; phoneNumber: string | null }; + "session.disconnected": { accountId: string }; + "session.timeout": { accountId: string }; + "groups.synced": { accountId: string; count: number }; + "reminder.fired": { reminderId: string; runId: string; status: string }; + "reminder.failed": { reminderId: string; error: string }; +}; + +type Handlers = { [K in keyof WebEventMap]?: (data: WebEventMap[K]) => void }; + +export function useEvents(handlers: Handlers): void { + useEffect(() => { + const es = new EventSource("/api/events"); + const wired: { type: string; fn: (e: MessageEvent) => void }[] = []; + for (const type of Object.keys(handlers) as (keyof WebEventMap)[]) { + const h = handlers[type]; + if (!h) continue; + const fn = (e: MessageEvent) => { + try { + (h as (data: unknown) => void)(JSON.parse(e.data)); + } catch { + // ignore malformed + } + }; + es.addEventListener(type, fn); + wired.push({ type, fn }); + } + return () => { + for (const { type, fn } of wired) { + es.removeEventListener(type, fn); + } + es.close(); + }; + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []); +}