cm_whatsapp_bot_v1/apps/bot/src/ipc/command-consumer.ts

83 lines
3.0 KiB
TypeScript

import { Client } from "pg";
import type { Notification } from "pg";
import { logger } from "../logger.js";
import { env } from "../env.js";
import { handleStartPairing } from "./pair-handler.js";
import { handleUnpair } from "./unpair-handler.js";
import { handleSyncGroups } from "./sync-groups-handler.js";
import { handleSendTest } from "./send-test-handler.js";
import { handleScheduleReminder } from "./schedule-reminder-handler.js";
export type BotCommand =
| { type: "account.start_pairing"; accountId: string }
| { type: "account.unpair"; accountId: string }
| { type: "account.sync_groups"; accountId: string }
| { type: "group.send_test"; groupId: string; text: string }
| { type: "reminder.schedule"; reminderId: string; scheduledAtIso: string };
type Handler = (cmd: BotCommand) => Promise<void>;
const handlers: { [K in BotCommand["type"]]?: (cmd: Extract<BotCommand, { type: K }>) => Promise<void> } = {};
export function registerHandler<T extends BotCommand["type"]>(
type: T,
fn: (cmd: Extract<BotCommand, { type: T }>) => Promise<void>,
): void {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(handlers as any)[type] = fn;
}
export async function startCommandConsumer(): Promise<() => Promise<void>> {
const client = new Client({ connectionString: env.DATABASE_URL });
await client.connect();
await client.query('LISTEN "bot.command"');
client.on("notification", (msg: Notification) => {
if (msg.channel !== "bot.command" || !msg.payload) return;
let cmd: BotCommand;
try {
cmd = JSON.parse(msg.payload) as BotCommand;
} catch (err) {
logger.warn({ err, payload: msg.payload }, "ipc: bad command payload");
return;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const fn: Handler | undefined = (handlers as any)[cmd.type];
if (!fn) {
logger.warn({ cmd }, "ipc: no handler for command type");
return;
}
fn(cmd).catch((err) => logger.error({ err, cmd }, "ipc: handler failed"));
});
client.on("error", (err: Error) => logger.error({ err }, "ipc: consumer client error"));
logger.info("ipc: command consumer started");
return async () => {
try {
await client.query('UNLISTEN "bot.command"');
} catch (err) {
logger.warn({ err }, "ipc: UNLISTEN failed (continuing shutdown)");
}
await client.end();
logger.info("ipc: command consumer stopped");
};
}
export function registerDefaultHandlers(): void {
registerHandler("account.start_pairing", async (cmd) => {
await handleStartPairing(cmd.accountId);
});
registerHandler("account.unpair", async (cmd) => {
await handleUnpair(cmd.accountId);
});
registerHandler("account.sync_groups", async (cmd) => {
await handleSyncGroups(cmd.accountId);
});
registerHandler("group.send_test", async (cmd) => {
await handleSendTest(cmd.groupId, cmd.text);
});
registerHandler("reminder.schedule", async (cmd) => {
await handleScheduleReminder(cmd.reminderId, cmd.scheduledAtIso);
});
}