From af21bc5599855e39d919ed1bf719023b601b1816 Mon Sep 17 00:00:00 2001 From: yiekheng Date: Sat, 9 May 2026 22:34:01 +0800 Subject: [PATCH] feat(bot): add IPC handlers for pair / unpair / sync / send-test / schedule --- apps/bot/src/ipc/command-consumer.ts | 23 +++ apps/bot/src/ipc/pair-handler.ts | 153 ++++++++++++++++++ apps/bot/src/ipc/schedule-reminder-handler.ts | 6 + apps/bot/src/ipc/send-test-handler.ts | 33 ++++ apps/bot/src/ipc/sync-groups-handler.ts | 15 ++ apps/bot/src/ipc/unpair-handler.ts | 35 ++++ 6 files changed, 265 insertions(+) create mode 100644 apps/bot/src/ipc/pair-handler.ts create mode 100644 apps/bot/src/ipc/schedule-reminder-handler.ts create mode 100644 apps/bot/src/ipc/send-test-handler.ts create mode 100644 apps/bot/src/ipc/sync-groups-handler.ts create mode 100644 apps/bot/src/ipc/unpair-handler.ts diff --git a/apps/bot/src/ipc/command-consumer.ts b/apps/bot/src/ipc/command-consumer.ts index e73acec..e03f681 100644 --- a/apps/bot/src/ipc/command-consumer.ts +++ b/apps/bot/src/ipc/command-consumer.ts @@ -2,6 +2,11 @@ import { Client } from "pg"; import type { Notification } from "pg"; import { logger } from "../logger.js"; import { env } from "../env.js"; +import { handleStartPairing } from "./pair-handler.js"; +import { handleUnpair } from "./unpair-handler.js"; +import { handleSyncGroups } from "./sync-groups-handler.js"; +import { handleSendTest } from "./send-test-handler.js"; +import { handleScheduleReminder } from "./schedule-reminder-handler.js"; export type BotCommand = | { type: "account.start_pairing"; accountId: string } @@ -57,3 +62,21 @@ export async function startCommandConsumer(): Promise<() => Promise> { logger.info("ipc: command consumer stopped"); }; } + +export function registerDefaultHandlers(): void { + registerHandler("account.start_pairing", async (cmd) => { + await handleStartPairing(cmd.accountId); + }); + registerHandler("account.unpair", async (cmd) => { + await handleUnpair(cmd.accountId); + }); + registerHandler("account.sync_groups", async (cmd) => { + await handleSyncGroups(cmd.accountId); + }); + registerHandler("group.send_test", async (cmd) => { + await handleSendTest(cmd.groupId, cmd.text); + }); + registerHandler("reminder.schedule", async (cmd) => { + await handleScheduleReminder(cmd.reminderId, cmd.scheduledAtIso); + }); +} diff --git a/apps/bot/src/ipc/pair-handler.ts b/apps/bot/src/ipc/pair-handler.ts new file mode 100644 index 0000000..50ca755 --- /dev/null +++ b/apps/bot/src/ipc/pair-handler.ts @@ -0,0 +1,153 @@ +import { eq, and, lt } from "drizzle-orm"; +import { rm } from "node:fs/promises"; +import { join } from "node:path"; +import { whatsappAccounts } from "@cmbot/db"; +import { db } from "../db.js"; +import { env } from "../env.js"; +import { logger } from "../logger.js"; +import { sessionManager } from "../whatsapp/session-manager.js"; +import { renderQrPng } from "../whatsapp/qr-renderer.js"; +import { syncGroupsForAccount } from "../whatsapp/group-sync.js"; +import { writeAuditLog } from "../audit.js"; +import { pgNotifyWeb } from "./notify.js"; + +const PAIR_TIMEOUT_MS = 5 * 60 * 1000; +const offByAccount = new Map void>(); +const lastQrPayload = new Map(); +const pairTimeouts = new Map(); + +async function abandonPair(accountId: string): Promise<{ existed: boolean; label: string | null }> { + const account = await db.query.whatsappAccounts.findFirst({ + where: (a, { eq }) => eq(a.id, accountId), + }); + if (!account || account.status !== "pending") { + return { existed: false, label: account?.label ?? null }; + } + const off = offByAccount.get(accountId); + if (off) { + off(); + offByAccount.delete(accountId); + } + const t = pairTimeouts.get(accountId); + if (t) { + clearTimeout(t); + pairTimeouts.delete(accountId); + } + lastQrPayload.delete(accountId); + if (sessionManager.hasSession(accountId)) { + await sessionManager.stop(accountId); + } + await rm(join(env.SESSIONS_DIR, accountId), { recursive: true, force: true }); + await db.delete(whatsappAccounts).where(eq(whatsappAccounts.id, accountId)); + return { existed: true, label: account.label }; +} + +export async function handleStartPairing(accountId: string): Promise { + const account = await db.query.whatsappAccounts.findFirst({ + where: (a, { eq }) => eq(a.id, accountId), + }); + if (!account) { + logger.warn({ accountId }, "pair: account row missing"); + return; + } + + const off = sessionManager.on(async (id, _state, event) => { + if (id !== accountId) return; + try { + if (event.type === "qr") { + if (lastQrPayload.get(id) === event.payload) return; + lastQrPayload.set(id, event.payload); + const png = await renderQrPng(event.payload); + await pgNotifyWeb({ + type: "session.qr", + accountId: id, + qrPng: png.toString("base64"), + }); + } else if (event.type === "open") { + const t = pairTimeouts.get(id); + if (t) { + clearTimeout(t); + pairTimeouts.delete(id); + } + lastQrPayload.delete(id); + offByAccount.delete(id); + const session = sessionManager.getSession(id); + let synced = 0; + if (session) { + const r = await syncGroupsForAccount(id, session.socket); + synced = r.synced; + } + await writeAuditLog(db, { + operatorId: account.operatorId, + source: "web", + action: "account.paired", + targetType: "whatsapp_account", + targetId: id, + payload: { label: account.label }, + }); + await pgNotifyWeb({ + type: "session.connected", + accountId: id, + phoneNumber: event.phoneNumber ?? null, + }); + await pgNotifyWeb({ + type: "groups.synced", + accountId: id, + count: synced, + }); + off(); + } else if (event.type === "close" && event.loggedOut) { + const t = pairTimeouts.get(id); + if (t) { + clearTimeout(t); + pairTimeouts.delete(id); + } + lastQrPayload.delete(id); + offByAccount.delete(id); + await pgNotifyWeb({ type: "session.timeout", accountId: id }); + off(); + } + } catch (err) { + logger.error({ err, accountId: id }, "pair: handler error"); + } + }); + offByAccount.set(accountId, off); + + try { + await sessionManager.start(accountId); + } catch (err) { + logger.error({ err, accountId }, "pair: start failed"); + off(); + offByAccount.delete(accountId); + await pgNotifyWeb({ type: "session.timeout", accountId }); + return; + } + + const timeoutId = setTimeout(() => { + void (async () => { + try { + const r = await abandonPair(accountId); + if (r.existed) { + await pgNotifyWeb({ type: "session.timeout", accountId }); + } + } catch (err) { + logger.error({ err, accountId }, "pair: timeout cleanup failed"); + } + })(); + }, PAIR_TIMEOUT_MS); + pairTimeouts.set(accountId, timeoutId); +} + +/** Sweep stale pending accounts on bot startup. */ +export async function sweepStalePendingAccounts(): Promise { + const cutoff = new Date(Date.now() - 60 * 60 * 1000); + const stale = await db + .select({ id: whatsappAccounts.id, label: whatsappAccounts.label }) + .from(whatsappAccounts) + .where(and(eq(whatsappAccounts.status, "pending"), lt(whatsappAccounts.createdAt, cutoff))); + for (const row of stale) { + await rm(join(env.SESSIONS_DIR, row.id), { recursive: true, force: true }); + await db.delete(whatsappAccounts).where(eq(whatsappAccounts.id, row.id)); + logger.info({ accountId: row.id, label: row.label }, "sweep: removed stale pending account"); + } +} diff --git a/apps/bot/src/ipc/schedule-reminder-handler.ts b/apps/bot/src/ipc/schedule-reminder-handler.ts new file mode 100644 index 0000000..d49ba21 --- /dev/null +++ b/apps/bot/src/ipc/schedule-reminder-handler.ts @@ -0,0 +1,6 @@ +import { getBoss } from "../scheduler/pgboss-client.js"; +import { scheduleReminderFire } from "../scheduler/reminder-jobs.js"; + +export async function handleScheduleReminder(reminderId: string, scheduledAtIso: string): Promise { + await scheduleReminderFire(getBoss(), reminderId, new Date(scheduledAtIso)); +} diff --git a/apps/bot/src/ipc/send-test-handler.ts b/apps/bot/src/ipc/send-test-handler.ts new file mode 100644 index 0000000..b21965b --- /dev/null +++ b/apps/bot/src/ipc/send-test-handler.ts @@ -0,0 +1,33 @@ +import { sessionManager } from "../whatsapp/session-manager.js"; +import { sendTextToGroup } from "../whatsapp/sender.js"; +import { writeAuditLog } from "../audit.js"; +import { db } from "../db.js"; +import { logger } from "../logger.js"; + +export async function handleSendTest(groupId: string, text: string): Promise { + const group = await db.query.whatsappGroups.findFirst({ + where: (g, { eq }) => eq(g.id, groupId), + }); + if (!group) { + logger.warn({ groupId }, "send-test: group missing"); + return; + } + const session = sessionManager.getSession(group.accountId); + if (!session) { + logger.warn({ groupId, accountId: group.accountId }, "send-test: account not connected"); + return; + } + try { + const result = await sendTextToGroup(session.socket, group.waGroupJid, text); + await writeAuditLog(db, { + operatorId: null, + source: "web", + action: "group.send_test", + targetType: "whatsapp_group", + targetId: groupId, + payload: { groupName: group.name, length: text.length, waMessageId: result.messageId ?? null }, + }); + } catch (err) { + logger.error({ err, groupId }, "send-test: failed"); + } +} diff --git a/apps/bot/src/ipc/sync-groups-handler.ts b/apps/bot/src/ipc/sync-groups-handler.ts new file mode 100644 index 0000000..f08de77 --- /dev/null +++ b/apps/bot/src/ipc/sync-groups-handler.ts @@ -0,0 +1,15 @@ +import { db } from "../db.js"; +import { sessionManager } from "../whatsapp/session-manager.js"; +import { syncGroupsForAccount } from "../whatsapp/group-sync.js"; +import { pgNotifyWeb } from "./notify.js"; +import { logger } from "../logger.js"; + +export async function handleSyncGroups(accountId: string): Promise { + const session = sessionManager.getSession(accountId); + if (!session) { + logger.warn({ accountId }, "sync-groups: account not connected"); + return; + } + const result = await syncGroupsForAccount(accountId, session.socket); + await pgNotifyWeb({ type: "groups.synced", accountId, count: result.synced }); +} diff --git a/apps/bot/src/ipc/unpair-handler.ts b/apps/bot/src/ipc/unpair-handler.ts new file mode 100644 index 0000000..964a5c0 --- /dev/null +++ b/apps/bot/src/ipc/unpair-handler.ts @@ -0,0 +1,35 @@ +import { eq } from "drizzle-orm"; +import { rm } from "node:fs/promises"; +import { join } from "node:path"; +import { whatsappAccounts } from "@cmbot/db"; +import { db } from "../db.js"; +import { env } from "../env.js"; +import { sessionManager } from "../whatsapp/session-manager.js"; +import { writeAuditLog } from "../audit.js"; +import { pgNotifyWeb } from "./notify.js"; +import { logger } from "../logger.js"; + +export async function handleUnpair(accountId: string): Promise { + const account = await db.query.whatsappAccounts.findFirst({ + where: (a, { eq }) => eq(a.id, accountId), + }); + if (!account) { + logger.warn({ accountId }, "unpair: account row missing"); + return; + } + await sessionManager.stop(accountId); + await rm(join(env.SESSIONS_DIR, accountId), { recursive: true, force: true }); + await db + .update(whatsappAccounts) + .set({ status: "logged_out", phoneNumber: null }) + .where(eq(whatsappAccounts.id, accountId)); + await writeAuditLog(db, { + operatorId: account.operatorId, + source: "web", + action: "account.unpaired", + targetType: "whatsapp_account", + targetId: accountId, + payload: { label: account.label }, + }); + await pgNotifyWeb({ type: "session.disconnected", accountId }); +}