feat(bot): add IPC handlers for pair / unpair / sync / send-test / schedule

This commit is contained in:
yiekheng 2026-05-09 22:34:01 +08:00
parent abcf19b71a
commit af21bc5599
6 changed files with 265 additions and 0 deletions

View File

@ -2,6 +2,11 @@ import { Client } from "pg";
import type { Notification } from "pg"; import type { Notification } from "pg";
import { logger } from "../logger.js"; import { logger } from "../logger.js";
import { env } from "../env.js"; import { env } from "../env.js";
import { handleStartPairing } from "./pair-handler.js";
import { handleUnpair } from "./unpair-handler.js";
import { handleSyncGroups } from "./sync-groups-handler.js";
import { handleSendTest } from "./send-test-handler.js";
import { handleScheduleReminder } from "./schedule-reminder-handler.js";
export type BotCommand = export type BotCommand =
| { type: "account.start_pairing"; accountId: string } | { type: "account.start_pairing"; accountId: string }
@ -57,3 +62,21 @@ export async function startCommandConsumer(): Promise<() => Promise<void>> {
logger.info("ipc: command consumer stopped"); logger.info("ipc: command consumer stopped");
}; };
} }
export function registerDefaultHandlers(): void {
registerHandler("account.start_pairing", async (cmd) => {
await handleStartPairing(cmd.accountId);
});
registerHandler("account.unpair", async (cmd) => {
await handleUnpair(cmd.accountId);
});
registerHandler("account.sync_groups", async (cmd) => {
await handleSyncGroups(cmd.accountId);
});
registerHandler("group.send_test", async (cmd) => {
await handleSendTest(cmd.groupId, cmd.text);
});
registerHandler("reminder.schedule", async (cmd) => {
await handleScheduleReminder(cmd.reminderId, cmd.scheduledAtIso);
});
}

View File

@ -0,0 +1,153 @@
import { eq, and, lt } from "drizzle-orm";
import { rm } from "node:fs/promises";
import { join } from "node:path";
import { whatsappAccounts } from "@cmbot/db";
import { db } from "../db.js";
import { env } from "../env.js";
import { logger } from "../logger.js";
import { sessionManager } from "../whatsapp/session-manager.js";
import { renderQrPng } from "../whatsapp/qr-renderer.js";
import { syncGroupsForAccount } from "../whatsapp/group-sync.js";
import { writeAuditLog } from "../audit.js";
import { pgNotifyWeb } from "./notify.js";
const PAIR_TIMEOUT_MS = 5 * 60 * 1000;
const offByAccount = new Map<string, () => void>();
const lastQrPayload = new Map<string, string>();
const pairTimeouts = new Map<string, NodeJS.Timeout>();
async function abandonPair(accountId: string): Promise<{ existed: boolean; label: string | null }> {
const account = await db.query.whatsappAccounts.findFirst({
where: (a, { eq }) => eq(a.id, accountId),
});
if (!account || account.status !== "pending") {
return { existed: false, label: account?.label ?? null };
}
const off = offByAccount.get(accountId);
if (off) {
off();
offByAccount.delete(accountId);
}
const t = pairTimeouts.get(accountId);
if (t) {
clearTimeout(t);
pairTimeouts.delete(accountId);
}
lastQrPayload.delete(accountId);
if (sessionManager.hasSession(accountId)) {
await sessionManager.stop(accountId);
}
await rm(join(env.SESSIONS_DIR, accountId), { recursive: true, force: true });
await db.delete(whatsappAccounts).where(eq(whatsappAccounts.id, accountId));
return { existed: true, label: account.label };
}
export async function handleStartPairing(accountId: string): Promise<void> {
const account = await db.query.whatsappAccounts.findFirst({
where: (a, { eq }) => eq(a.id, accountId),
});
if (!account) {
logger.warn({ accountId }, "pair: account row missing");
return;
}
const off = sessionManager.on(async (id, _state, event) => {
if (id !== accountId) return;
try {
if (event.type === "qr") {
if (lastQrPayload.get(id) === event.payload) return;
lastQrPayload.set(id, event.payload);
const png = await renderQrPng(event.payload);
await pgNotifyWeb({
type: "session.qr",
accountId: id,
qrPng: png.toString("base64"),
});
} else if (event.type === "open") {
const t = pairTimeouts.get(id);
if (t) {
clearTimeout(t);
pairTimeouts.delete(id);
}
lastQrPayload.delete(id);
offByAccount.delete(id);
const session = sessionManager.getSession(id);
let synced = 0;
if (session) {
const r = await syncGroupsForAccount(id, session.socket);
synced = r.synced;
}
await writeAuditLog(db, {
operatorId: account.operatorId,
source: "web",
action: "account.paired",
targetType: "whatsapp_account",
targetId: id,
payload: { label: account.label },
});
await pgNotifyWeb({
type: "session.connected",
accountId: id,
phoneNumber: event.phoneNumber ?? null,
});
await pgNotifyWeb({
type: "groups.synced",
accountId: id,
count: synced,
});
off();
} else if (event.type === "close" && event.loggedOut) {
const t = pairTimeouts.get(id);
if (t) {
clearTimeout(t);
pairTimeouts.delete(id);
}
lastQrPayload.delete(id);
offByAccount.delete(id);
await pgNotifyWeb({ type: "session.timeout", accountId: id });
off();
}
} catch (err) {
logger.error({ err, accountId: id }, "pair: handler error");
}
});
offByAccount.set(accountId, off);
try {
await sessionManager.start(accountId);
} catch (err) {
logger.error({ err, accountId }, "pair: start failed");
off();
offByAccount.delete(accountId);
await pgNotifyWeb({ type: "session.timeout", accountId });
return;
}
const timeoutId = setTimeout(() => {
void (async () => {
try {
const r = await abandonPair(accountId);
if (r.existed) {
await pgNotifyWeb({ type: "session.timeout", accountId });
}
} catch (err) {
logger.error({ err, accountId }, "pair: timeout cleanup failed");
}
})();
}, PAIR_TIMEOUT_MS);
pairTimeouts.set(accountId, timeoutId);
}
/** Sweep stale pending accounts on bot startup. */
export async function sweepStalePendingAccounts(): Promise<void> {
const cutoff = new Date(Date.now() - 60 * 60 * 1000);
const stale = await db
.select({ id: whatsappAccounts.id, label: whatsappAccounts.label })
.from(whatsappAccounts)
.where(and(eq(whatsappAccounts.status, "pending"), lt(whatsappAccounts.createdAt, cutoff)));
for (const row of stale) {
await rm(join(env.SESSIONS_DIR, row.id), { recursive: true, force: true });
await db.delete(whatsappAccounts).where(eq(whatsappAccounts.id, row.id));
logger.info({ accountId: row.id, label: row.label }, "sweep: removed stale pending account");
}
}

View File

@ -0,0 +1,6 @@
import { getBoss } from "../scheduler/pgboss-client.js";
import { scheduleReminderFire } from "../scheduler/reminder-jobs.js";
export async function handleScheduleReminder(reminderId: string, scheduledAtIso: string): Promise<void> {
await scheduleReminderFire(getBoss(), reminderId, new Date(scheduledAtIso));
}

View File

@ -0,0 +1,33 @@
import { sessionManager } from "../whatsapp/session-manager.js";
import { sendTextToGroup } from "../whatsapp/sender.js";
import { writeAuditLog } from "../audit.js";
import { db } from "../db.js";
import { logger } from "../logger.js";
export async function handleSendTest(groupId: string, text: string): Promise<void> {
const group = await db.query.whatsappGroups.findFirst({
where: (g, { eq }) => eq(g.id, groupId),
});
if (!group) {
logger.warn({ groupId }, "send-test: group missing");
return;
}
const session = sessionManager.getSession(group.accountId);
if (!session) {
logger.warn({ groupId, accountId: group.accountId }, "send-test: account not connected");
return;
}
try {
const result = await sendTextToGroup(session.socket, group.waGroupJid, text);
await writeAuditLog(db, {
operatorId: null,
source: "web",
action: "group.send_test",
targetType: "whatsapp_group",
targetId: groupId,
payload: { groupName: group.name, length: text.length, waMessageId: result.messageId ?? null },
});
} catch (err) {
logger.error({ err, groupId }, "send-test: failed");
}
}

View File

@ -0,0 +1,15 @@
import { db } from "../db.js";
import { sessionManager } from "../whatsapp/session-manager.js";
import { syncGroupsForAccount } from "../whatsapp/group-sync.js";
import { pgNotifyWeb } from "./notify.js";
import { logger } from "../logger.js";
export async function handleSyncGroups(accountId: string): Promise<void> {
const session = sessionManager.getSession(accountId);
if (!session) {
logger.warn({ accountId }, "sync-groups: account not connected");
return;
}
const result = await syncGroupsForAccount(accountId, session.socket);
await pgNotifyWeb({ type: "groups.synced", accountId, count: result.synced });
}

View File

@ -0,0 +1,35 @@
import { eq } from "drizzle-orm";
import { rm } from "node:fs/promises";
import { join } from "node:path";
import { whatsappAccounts } from "@cmbot/db";
import { db } from "../db.js";
import { env } from "../env.js";
import { sessionManager } from "../whatsapp/session-manager.js";
import { writeAuditLog } from "../audit.js";
import { pgNotifyWeb } from "./notify.js";
import { logger } from "../logger.js";
export async function handleUnpair(accountId: string): Promise<void> {
const account = await db.query.whatsappAccounts.findFirst({
where: (a, { eq }) => eq(a.id, accountId),
});
if (!account) {
logger.warn({ accountId }, "unpair: account row missing");
return;
}
await sessionManager.stop(accountId);
await rm(join(env.SESSIONS_DIR, accountId), { recursive: true, force: true });
await db
.update(whatsappAccounts)
.set({ status: "logged_out", phoneNumber: null })
.where(eq(whatsappAccounts.id, accountId));
await writeAuditLog(db, {
operatorId: account.operatorId,
source: "web",
action: "account.unpaired",
targetType: "whatsapp_account",
targetId: accountId,
payload: { label: account.label },
});
await pgNotifyWeb({ type: "session.disconnected", accountId });
}