diff --git a/apps/bot/src/index.ts b/apps/bot/src/index.ts index 7246579..3b748ee 100644 --- a/apps/bot/src/index.ts +++ b/apps/bot/src/index.ts @@ -1,21 +1,26 @@ import { logger } from "./logger.js"; import { pool } from "./db.js"; -import { startHealthServer } from "./health.js"; +import { startHealthServer, setSessionCountsProvider } from "./health.js"; import { createTelegramBot } from "./telegram/bot.js"; +import { sessionManager } from "./whatsapp/session-manager.js"; async function main(): Promise { logger.info("bot starting"); const health = startHealthServer(); - const tg = createTelegramBot(); + setSessionCountsProvider(() => sessionManager.getCounts()); + const tg = createTelegramBot(); void tg.start({ onStart: (info) => logger.info({ username: info.username }, "telegram polling started"), drop_pending_updates: true, }); + await sessionManager.resumeFromDb(); + const shutdown = async (signal: string): Promise => { logger.info({ signal }, "shutting down"); await tg.stop(); + await sessionManager.stopAll(); health.close(); await pool.end(); process.exit(0); diff --git a/apps/bot/src/whatsapp/session-manager.test.ts b/apps/bot/src/whatsapp/session-manager.test.ts new file mode 100644 index 0000000..a95ea73 --- /dev/null +++ b/apps/bot/src/whatsapp/session-manager.test.ts @@ -0,0 +1,27 @@ +import { describe, expect, it } from "vitest"; +import { reduceState, type SessionState } from "./session-manager.js"; + +describe("reduceState", () => { + it("pending → connecting on starting", () => { + expect(reduceState("pending", { kind: "starting" })).toBe("connecting"); + }); + it("connecting → connected on open", () => { + expect(reduceState("connecting", { kind: "open" })).toBe("connected"); + }); + it("connected → disconnected on close (not logged out)", () => { + expect(reduceState("connected", { kind: "close", loggedOut: false })).toBe( + "disconnected", + ); + }); + it("any → logged_out on logged-out close", () => { + expect(reduceState("connected", { kind: "close", loggedOut: true })).toBe( + "logged_out", + ); + expect(reduceState("connecting", { kind: "close", loggedOut: true })).toBe( + "logged_out", + ); + }); + it("ignores stray events that don't match transitions", () => { + expect(reduceState("connected", { kind: "starting" })).toBe("connected"); + }); +}); diff --git a/apps/bot/src/whatsapp/session-manager.ts b/apps/bot/src/whatsapp/session-manager.ts new file mode 100644 index 0000000..50ff71e --- /dev/null +++ b/apps/bot/src/whatsapp/session-manager.ts @@ -0,0 +1,179 @@ +import { eq } from "drizzle-orm"; +import { whatsappAccounts } from "@cmbot/db"; +import { db } from "../db.js"; +import { logger } from "../logger.js"; +import { startSession, type Session, type SessionEvent } from "./session.js"; + +export type SessionState = + | "pending" + | "connecting" + | "connected" + | "disconnected" + | "logged_out" + | "banned"; + +export type StateEvent = + | { kind: "starting" } + | { kind: "open" } + | { kind: "close"; loggedOut: boolean }; + +export function reduceState(current: SessionState, event: StateEvent): SessionState { + if (event.kind === "starting" && current === "pending") return "connecting"; + if (event.kind === "open" && (current === "connecting" || current === "disconnected")) { + return "connected"; + } + if (event.kind === "close") { + if (event.loggedOut) return "logged_out"; + return "disconnected"; + } + return current; +} + +export type SessionListener = ( + accountId: string, + state: SessionState, + event: SessionEvent, +) => void | Promise; + +class SessionManager { + private sessions = new Map(); + private states = new Map(); + private listeners = new Set(); + private reconnectTimers = new Map(); + + on(listener: SessionListener): () => void { + this.listeners.add(listener); + return () => { + this.listeners.delete(listener); + }; + } + + getState(accountId: string): SessionState { + return this.states.get(accountId) ?? "pending"; + } + + getCounts(): Record { + const counts: Record = { + pending: 0, + connecting: 0, + connected: 0, + disconnected: 0, + logged_out: 0, + banned: 0, + }; + for (const state of this.states.values()) counts[state]++; + return counts; + } + + hasSession(accountId: string): boolean { + return this.sessions.has(accountId); + } + + getSession(accountId: string): Session | undefined { + return this.sessions.get(accountId); + } + + async start(accountId: string): Promise { + if (this.sessions.has(accountId)) { + logger.debug({ accountId }, "session-manager: already running, ignoring start"); + return; + } + const existingTimer = this.reconnectTimers.get(accountId); + if (existingTimer) { + clearTimeout(existingTimer); + this.reconnectTimers.delete(accountId); + } + this.transition(accountId, { kind: "starting" }); + + const session = await startSession({ + accountId, + onEvent: (event) => this.handleEvent(accountId, event), + }); + this.sessions.set(accountId, session); + } + + async stop(accountId: string): Promise { + const timer = this.reconnectTimers.get(accountId); + if (timer) { + clearTimeout(timer); + this.reconnectTimers.delete(accountId); + } + const session = this.sessions.get(accountId); + if (!session) return; + await session.close(); + this.sessions.delete(accountId); + } + + async stopAll(): Promise { + await Promise.all([...this.sessions.keys()].map((id) => this.stop(id))); + } + + async resumeFromDb(): Promise { + const rows = await db + .select({ id: whatsappAccounts.id, status: whatsappAccounts.status }) + .from(whatsappAccounts); + for (const row of rows) { + if (row.status === "connected" || row.status === "disconnected") { + try { + await this.start(row.id); + } catch (err) { + logger.warn({ err, accountId: row.id }, "resumeFromDb: failed to start"); + } + } + } + } + + private async handleEvent(accountId: string, event: SessionEvent): Promise { + if (event.type === "open") { + this.transition(accountId, { kind: "open" }); + await db + .update(whatsappAccounts) + .set({ + status: "connected", + phoneNumber: event.phoneNumber ?? null, + lastConnectedAt: new Date(), + }) + .where(eq(whatsappAccounts.id, accountId)); + } else if (event.type === "close") { + this.transition(accountId, { kind: "close", loggedOut: event.loggedOut }); + await db + .update(whatsappAccounts) + .set({ status: event.loggedOut ? "logged_out" : "disconnected" }) + .where(eq(whatsappAccounts.id, accountId)); + + if (!event.loggedOut) { + const timer = setTimeout(() => { + this.reconnectTimers.delete(accountId); + void this.stop(accountId).then(() => this.start(accountId)); + }, 5000); + this.reconnectTimers.set(accountId, timer); + } else { + await this.stop(accountId); + } + } else if (event.type === "qr") { + await db + .update(whatsappAccounts) + .set({ lastQrAt: new Date() }) + .where(eq(whatsappAccounts.id, accountId)); + } + + for (const listener of this.listeners) { + try { + await listener(accountId, this.getState(accountId), event); + } catch (err) { + logger.warn({ err, accountId }, "session-manager: listener error"); + } + } + } + + private transition(accountId: string, event: StateEvent): void { + const current = this.states.get(accountId) ?? "pending"; + const next = reduceState(current, event); + if (current !== next) { + logger.info({ accountId, from: current, to: next }, "session-manager: state change"); + } + this.states.set(accountId, next); + } +} + +export const sessionManager = new SessionManager();