diff --git a/apps/bot/package.json b/apps/bot/package.json index 4c38790..957d2a4 100644 --- a/apps/bot/package.json +++ b/apps/bot/package.json @@ -19,6 +19,7 @@ "drizzle-orm": "^0.36.0", "grammy": "^1.31.0", "luxon": "^3.5.0", + "pg": "^8.13.0", "pg-boss": "^12.18.2", "pino": "^9.5.0", "pino-pretty": "^11.3.0", @@ -28,6 +29,7 @@ "devDependencies": { "@types/luxon": "^3.4.2", "@types/node": "^22.7.0", + "@types/pg": "^8.11.10", "@types/qrcode": "^1.5.5", "tsx": "^4.19.0", "typescript": "^5.5.0", diff --git a/apps/bot/src/ipc/command-consumer.ts b/apps/bot/src/ipc/command-consumer.ts new file mode 100644 index 0000000..e73acec --- /dev/null +++ b/apps/bot/src/ipc/command-consumer.ts @@ -0,0 +1,59 @@ +import { Client } from "pg"; +import type { Notification } from "pg"; +import { logger } from "../logger.js"; +import { env } from "../env.js"; + +export type BotCommand = + | { type: "account.start_pairing"; accountId: string } + | { type: "account.unpair"; accountId: string } + | { type: "account.sync_groups"; accountId: string } + | { type: "group.send_test"; groupId: string; text: string } + | { type: "reminder.schedule"; reminderId: string; scheduledAtIso: string }; + +type Handler = (cmd: BotCommand) => Promise; +const handlers: { [K in BotCommand["type"]]?: (cmd: Extract) => Promise } = {}; + +export function registerHandler( + type: T, + fn: (cmd: Extract) => Promise, +): void { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (handlers as any)[type] = fn; +} + +export async function startCommandConsumer(): Promise<() => Promise> { + const client = new Client({ connectionString: env.DATABASE_URL }); + await client.connect(); + await client.query('LISTEN "bot.command"'); + + client.on("notification", (msg: Notification) => { + if (msg.channel !== "bot.command" || !msg.payload) return; + let cmd: BotCommand; + try { + cmd = JSON.parse(msg.payload) as BotCommand; + } catch (err) { + logger.warn({ err, payload: msg.payload }, "ipc: bad command payload"); + return; + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const fn: Handler | undefined = (handlers as any)[cmd.type]; + if (!fn) { + logger.warn({ cmd }, "ipc: no handler for command type"); + return; + } + fn(cmd).catch((err) => logger.error({ err, cmd }, "ipc: handler failed")); + }); + + client.on("error", (err: Error) => logger.error({ err }, "ipc: consumer client error")); + logger.info("ipc: command consumer started"); + + return async () => { + try { + await client.query('UNLISTEN "bot.command"'); + } catch (err) { + logger.warn({ err }, "ipc: UNLISTEN failed (continuing shutdown)"); + } + await client.end(); + logger.info("ipc: command consumer stopped"); + }; +} diff --git a/apps/bot/src/ipc/notify.ts b/apps/bot/src/ipc/notify.ts new file mode 100644 index 0000000..75ae679 --- /dev/null +++ b/apps/bot/src/ipc/notify.ts @@ -0,0 +1,19 @@ +import { sql } from "drizzle-orm"; +import { db } from "../db.js"; +import { logger } from "../logger.js"; + +export type WebEvent = + | { type: "session.qr"; accountId: string; qrPng: string /* base64 */ } + | { type: "session.connected"; accountId: string; phoneNumber: string | null } + | { type: "session.disconnected"; accountId: string } + | { type: "session.timeout"; accountId: string } + | { type: "groups.synced"; accountId: string; count: number } + | { type: "reminder.fired"; reminderId: string; runId: string; status: string } + | { type: "reminder.failed"; reminderId: string; error: string }; + +export async function pgNotifyWeb(event: WebEvent): Promise { + const json = JSON.stringify(event); + // pg_notify takes a literal channel name as 1st arg. + await db.execute(sql`SELECT pg_notify('web.event', ${json})`); + logger.debug({ event: event.type }, "ipc: web.event published"); +}