import { eq } from "drizzle-orm"; import { whatsappAccounts } from "@cmbot/db"; import { db } from "../db.js"; import { logger } from "../logger.js"; import { startSession, type Session, type SessionEvent } from "./session.js"; export type SessionState = | "pending" | "connecting" | "connected" | "disconnected" | "logged_out" | "banned"; export type StateEvent = | { kind: "starting" } | { kind: "open" } | { kind: "close"; loggedOut: boolean }; export function reduceState(current: SessionState, event: StateEvent): SessionState { if (event.kind === "starting" && current === "pending") return "connecting"; if (event.kind === "open" && (current === "connecting" || current === "disconnected")) { return "connected"; } if (event.kind === "close") { if (event.loggedOut) return "logged_out"; return "disconnected"; } return current; } export type SessionListener = ( accountId: string, state: SessionState, event: SessionEvent, ) => void | Promise; class SessionManager { private sessions = new Map(); private states = new Map(); private listeners = new Set(); private reconnectTimers = new Map(); on(listener: SessionListener): () => void { this.listeners.add(listener); return () => { this.listeners.delete(listener); }; } getState(accountId: string): SessionState { return this.states.get(accountId) ?? "pending"; } getCounts(): Record { const counts: Record = { pending: 0, connecting: 0, connected: 0, disconnected: 0, logged_out: 0, banned: 0, }; for (const state of this.states.values()) counts[state]++; return counts; } hasSession(accountId: string): boolean { return this.sessions.has(accountId); } getSession(accountId: string): Session | undefined { return this.sessions.get(accountId); } async start(accountId: string): Promise { if (this.sessions.has(accountId)) { logger.debug({ accountId }, "session-manager: already running, ignoring start"); return; } const existingTimer = this.reconnectTimers.get(accountId); if (existingTimer) { clearTimeout(existingTimer); this.reconnectTimers.delete(accountId); } this.transition(accountId, { kind: "starting" }); const session = await startSession({ accountId, onEvent: (event) => this.handleEvent(accountId, event), }); this.sessions.set(accountId, session); } async stop(accountId: string): Promise { const timer = this.reconnectTimers.get(accountId); if (timer) { clearTimeout(timer); this.reconnectTimers.delete(accountId); } const session = this.sessions.get(accountId); if (!session) return; await session.close(); this.sessions.delete(accountId); } async stopAll(): Promise { await Promise.all([...this.sessions.keys()].map((id) => this.stop(id))); } async resumeFromDb(): Promise { const rows = await db .select({ id: whatsappAccounts.id, status: whatsappAccounts.status }) .from(whatsappAccounts); for (const row of rows) { if (row.status === "connected" || row.status === "disconnected") { try { await this.start(row.id); } catch (err) { logger.warn({ err, accountId: row.id }, "resumeFromDb: failed to start"); } } } } private async handleEvent(accountId: string, event: SessionEvent): Promise { if (event.type === "open") { this.transition(accountId, { kind: "open" }); await db .update(whatsappAccounts) .set({ status: "connected", phoneNumber: event.phoneNumber ?? null, lastConnectedAt: new Date(), }) .where(eq(whatsappAccounts.id, accountId)); } else if (event.type === "close") { this.transition(accountId, { kind: "close", loggedOut: event.loggedOut }); await db .update(whatsappAccounts) .set({ status: event.loggedOut ? "logged_out" : "disconnected" }) .where(eq(whatsappAccounts.id, accountId)); if (event.loggedOut) { await this.stop(accountId); } else if (event.restartRequired) { // Status 515 — the post-pair-success reconnect. Always re-open // immediately (no 5 s back-off, no `lastConnectedAt` gate). If // we don't, the auth handshake never completes and the user // sees a spurious "Pairing timed out". const timer = setTimeout(() => { this.reconnectTimers.delete(accountId); void this.stop(accountId).then(() => this.start(accountId)); }, 250); this.reconnectTimers.set(accountId, timer); } else { // Other ephemeral closes (refs exhausted, network blip): only // auto-reconnect for accounts that have been linked at least // once. During an initial pair attempt this would otherwise // restart the pair dance and rotate the QR every few seconds. const account = await db.query.whatsappAccounts.findFirst({ where: (a, { eq }) => eq(a.id, accountId), columns: { lastConnectedAt: true }, }); if (account?.lastConnectedAt) { const timer = setTimeout(() => { this.reconnectTimers.delete(accountId); void this.stop(accountId).then(() => this.start(accountId)); }, 5000); this.reconnectTimers.set(accountId, timer); } else { // Brand-new account that hasn't authenticated yet — let the // pair-handler clean up via its timeout. await this.stop(accountId); } } } else if (event.type === "qr") { await db .update(whatsappAccounts) .set({ lastQrAt: new Date() }) .where(eq(whatsappAccounts.id, accountId)); } for (const listener of this.listeners) { try { await listener(accountId, this.getState(accountId), event); } catch (err) { logger.warn({ err, accountId }, "session-manager: listener error"); } } } private transition(accountId: string, event: StateEvent): void { const current = this.states.get(accountId) ?? "pending"; const next = reduceState(current, event); if (current !== next) { logger.info({ accountId, from: current, to: next }, "session-manager: state change"); } this.states.set(accountId, next); } } export const sessionManager = new SessionManager();