Extract the pair-handler's close-event decision into a pure helper decidePairListenerOnClose(warmingUp, restartRequired) returning one of ignore-leaked-close / post-pair-restart / treat-as-timeout. Refactor pair-handler to call the helper instead of the inline if-chain. New tests in pair-state.test.ts: - warmingUp=true → ignore-leaked-close (regression: prior session's close racing the new listener) - warmingUp=true + restartRequired=true → still ignore (defense in depth — a stale 515 must not hand control to the reconnect path) - warmingUp=false + restartRequired=true → post-pair-restart - warmingUp=false → treat-as-timeout Bot suite goes from 60 → 64 tests, all green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
247 lines
9.2 KiB
TypeScript
247 lines
9.2 KiB
TypeScript
import { eq, and, lt } from "drizzle-orm";
|
|
import { rm } from "node:fs/promises";
|
|
import { join } from "node:path";
|
|
import { whatsappAccounts } from "@cmbot/db";
|
|
import { db } from "../db.js";
|
|
import { env } from "../env.js";
|
|
import { logger } from "../logger.js";
|
|
import { sessionManager } from "../whatsapp/session-manager.js";
|
|
import { renderQrPng } from "../whatsapp/qr-renderer.js";
|
|
import { syncGroupsForAccount } from "../whatsapp/group-sync.js";
|
|
import { writeAuditLog } from "../audit.js";
|
|
import { pgNotifyWeb } from "./notify.js";
|
|
import { decidePairListenerOnClose } from "./pair-state.js";
|
|
|
|
const PAIR_TIMEOUT_MS = 5 * 60 * 1000;
|
|
const offByAccount = new Map<string, () => void>();
|
|
const lastQrPayload = new Map<string, string>();
|
|
const pairTimeouts = new Map<string, NodeJS.Timeout>();
|
|
// "Warming" set: while present, the just-attached listener will ignore
|
|
// close events. Cleared the moment a qr/open arrives. This prevents the
|
|
// old session's close (broadcast asynchronously by sessionManager after
|
|
// our await sessionManager.stop() returns) from being mis-read as the
|
|
// NEW session timing out — which manifested as: get QR → go back →
|
|
// click Pair again → instantly see "Pairing timed out".
|
|
const pairingWarmingUp = new Set<string>();
|
|
|
|
async function abandonPair(accountId: string): Promise<{ existed: boolean; label: string | null }> {
|
|
const account = await db.query.whatsappAccounts.findFirst({
|
|
where: (a, { eq }) => eq(a.id, accountId),
|
|
});
|
|
if (!account || account.status !== "pending") {
|
|
return { existed: false, label: account?.label ?? null };
|
|
}
|
|
const off = offByAccount.get(accountId);
|
|
if (off) {
|
|
off();
|
|
offByAccount.delete(accountId);
|
|
}
|
|
const t = pairTimeouts.get(accountId);
|
|
if (t) {
|
|
clearTimeout(t);
|
|
pairTimeouts.delete(accountId);
|
|
}
|
|
lastQrPayload.delete(accountId);
|
|
pairingWarmingUp.delete(accountId);
|
|
if (sessionManager.hasSession(accountId)) {
|
|
await sessionManager.stop(accountId);
|
|
}
|
|
// Throw away the partial Baileys session files so the next pair
|
|
// attempt starts clean — but KEEP the account row so the operator
|
|
// sees it on the list with a "Re-pair" affordance.
|
|
await rm(join(env.SESSIONS_DIR, accountId), { recursive: true, force: true });
|
|
await db
|
|
.update(whatsappAccounts)
|
|
.set({ status: "unpaired", lastQrPng: null })
|
|
.where(eq(whatsappAccounts.id, accountId));
|
|
return { existed: true, label: account.label };
|
|
}
|
|
|
|
export async function handleStartPairing(accountId: string): Promise<void> {
|
|
const account = await db.query.whatsappAccounts.findFirst({
|
|
where: (a, { eq }) => eq(a.id, accountId),
|
|
});
|
|
if (!account) {
|
|
logger.warn({ accountId }, "pair: account row missing");
|
|
return;
|
|
}
|
|
|
|
// Detach any listener still subscribed from a prior pairing attempt for
|
|
// this account. Without this, repeated Re-pair clicks accumulate
|
|
// listeners and each one writes a fresh QR to the DB on every Baileys
|
|
// event — the UI then flashes through new QRs constantly.
|
|
const prevOff = offByAccount.get(accountId);
|
|
if (prevOff) {
|
|
prevOff();
|
|
offByAccount.delete(accountId);
|
|
}
|
|
|
|
// For Re-pair, an old session may still be alive. Stop it so
|
|
// sessionManager.start() actually opens a fresh socket and Baileys emits
|
|
// a new QR. (start() is a no-op when a session is already registered.)
|
|
if (sessionManager.hasSession(accountId)) {
|
|
await sessionManager.stop(accountId);
|
|
}
|
|
// Clear any stale QR lingering from a prior attempt.
|
|
lastQrPayload.delete(accountId);
|
|
await db
|
|
.update(whatsappAccounts)
|
|
.set({ lastQrPng: null })
|
|
.where(eq(whatsappAccounts.id, accountId));
|
|
|
|
// Mark the new attempt as warming up. Cleared by the first qr/open we
|
|
// observe; while set, any close event is treated as the leaked tail of
|
|
// the previous session being torn down (see comment near
|
|
// `pairingWarmingUp` declaration).
|
|
pairingWarmingUp.add(accountId);
|
|
|
|
const off = sessionManager.on(async (id, _state, event) => {
|
|
if (id !== accountId) return;
|
|
try {
|
|
if (event.type === "qr") {
|
|
pairingWarmingUp.delete(id);
|
|
// Dedupe by payload — Baileys can re-emit the same QR string in a
|
|
// burst. Different strings (a fresh QR) always pass through, so
|
|
// the user gets a new QR as soon as Baileys generates one.
|
|
if (lastQrPayload.get(id) === event.payload) return;
|
|
lastQrPayload.set(id, event.payload);
|
|
const png = await renderQrPng(event.payload);
|
|
// PNG is too large (~5-10KB) for pg_notify (8000 byte limit).
|
|
// Persist on the account row; web fetches via /api/qr/[id].
|
|
await db
|
|
.update(whatsappAccounts)
|
|
.set({ lastQrPng: png.toString("base64"), lastQrAt: new Date() })
|
|
.where(eq(whatsappAccounts.id, id));
|
|
await pgNotifyWeb({
|
|
type: "session.qr",
|
|
accountId: id,
|
|
ts: Date.now(),
|
|
});
|
|
} else if (event.type === "open") {
|
|
pairingWarmingUp.delete(id);
|
|
const t = pairTimeouts.get(id);
|
|
if (t) {
|
|
clearTimeout(t);
|
|
pairTimeouts.delete(id);
|
|
}
|
|
lastQrPayload.delete(id);
|
|
offByAccount.delete(id);
|
|
const session = sessionManager.getSession(id);
|
|
let synced = 0;
|
|
if (session) {
|
|
const r = await syncGroupsForAccount(id, session.socket);
|
|
synced = r.synced;
|
|
}
|
|
await writeAuditLog(db, {
|
|
operatorId: account.operatorId,
|
|
source: "web",
|
|
action: "account.paired",
|
|
targetType: "whatsapp_account",
|
|
targetId: id,
|
|
payload: { label: account.label },
|
|
});
|
|
await pgNotifyWeb({
|
|
type: "session.connected",
|
|
accountId: id,
|
|
phoneNumber: event.phoneNumber ?? null,
|
|
});
|
|
await pgNotifyWeb({
|
|
type: "groups.synced",
|
|
accountId: id,
|
|
count: synced,
|
|
});
|
|
off();
|
|
} else if (event.type === "close") {
|
|
const decision = decidePairListenerOnClose({
|
|
warmingUp: pairingWarmingUp.has(id),
|
|
restartRequired: event.restartRequired,
|
|
});
|
|
if (decision === "ignore-leaked-close") {
|
|
logger.info(
|
|
{ accountId: id },
|
|
"pair: ignoring close from previous attempt while warming up",
|
|
);
|
|
return;
|
|
}
|
|
if (decision === "post-pair-restart") {
|
|
// After the user scans, WhatsApp tells Baileys to "restart"
|
|
// the connection. The socket closes with status 515 and the
|
|
// session-manager will reopen it with the new credentials —
|
|
// the next `open` event finishes the pairing. Keep the
|
|
// listener attached and don't surface a timeout to the UI.
|
|
logger.info(
|
|
{ accountId: id },
|
|
"pair: restart-required close (post-pair reconnect) — keeping listener alive",
|
|
);
|
|
return;
|
|
}
|
|
// decision === "treat-as-timeout": ephemeral close on a live
|
|
// attempt. Park the row as `unpaired` and push session.timeout
|
|
// so the operator sees the "Re-pair" affordance.
|
|
const t = pairTimeouts.get(id);
|
|
if (t) {
|
|
clearTimeout(t);
|
|
pairTimeouts.delete(id);
|
|
}
|
|
lastQrPayload.delete(id);
|
|
offByAccount.delete(id);
|
|
await db
|
|
.update(whatsappAccounts)
|
|
.set({ status: "unpaired", lastQrPng: null })
|
|
.where(eq(whatsappAccounts.id, id));
|
|
await pgNotifyWeb({ type: "session.timeout", accountId: id });
|
|
off();
|
|
}
|
|
} catch (err) {
|
|
logger.error({ err, accountId: id }, "pair: handler error");
|
|
}
|
|
});
|
|
offByAccount.set(accountId, off);
|
|
|
|
try {
|
|
await sessionManager.start(accountId);
|
|
} catch (err) {
|
|
logger.error({ err, accountId }, "pair: start failed");
|
|
off();
|
|
offByAccount.delete(accountId);
|
|
await pgNotifyWeb({ type: "session.timeout", accountId });
|
|
return;
|
|
}
|
|
|
|
const timeoutId = setTimeout(() => {
|
|
void (async () => {
|
|
try {
|
|
const r = await abandonPair(accountId);
|
|
if (r.existed) {
|
|
await pgNotifyWeb({ type: "session.timeout", accountId });
|
|
}
|
|
} catch (err) {
|
|
logger.error({ err, accountId }, "pair: timeout cleanup failed");
|
|
}
|
|
})();
|
|
}, PAIR_TIMEOUT_MS);
|
|
pairTimeouts.set(accountId, timeoutId);
|
|
}
|
|
|
|
/**
|
|
* Sweep stale `pending` accounts on bot startup. The bot was probably
|
|
* restarted mid-pair (or the operator never finished scanning) — the
|
|
* row is parked as `unpaired` so the operator sees it on the list and
|
|
* can hit Re-pair, instead of silently disappearing.
|
|
*/
|
|
export async function sweepStalePendingAccounts(): Promise<void> {
|
|
const cutoff = new Date(Date.now() - 60 * 60 * 1000);
|
|
const stale = await db
|
|
.select({ id: whatsappAccounts.id, label: whatsappAccounts.label })
|
|
.from(whatsappAccounts)
|
|
.where(and(eq(whatsappAccounts.status, "pending"), lt(whatsappAccounts.createdAt, cutoff)));
|
|
for (const row of stale) {
|
|
await rm(join(env.SESSIONS_DIR, row.id), { recursive: true, force: true });
|
|
await db
|
|
.update(whatsappAccounts)
|
|
.set({ status: "unpaired", lastQrPng: null })
|
|
.where(eq(whatsappAccounts.id, row.id));
|
|
logger.info({ accountId: row.id, label: row.label }, "sweep: parked stale pending account as unpaired");
|
|
}
|
|
}
|