feat(bot): add session manager with state machine + reconnect
This commit is contained in:
parent
fc05a8b459
commit
c2ee793ae6
@ -1,21 +1,26 @@
|
||||
import { logger } from "./logger.js";
|
||||
import { pool } from "./db.js";
|
||||
import { startHealthServer } from "./health.js";
|
||||
import { startHealthServer, setSessionCountsProvider } from "./health.js";
|
||||
import { createTelegramBot } from "./telegram/bot.js";
|
||||
import { sessionManager } from "./whatsapp/session-manager.js";
|
||||
|
||||
async function main(): Promise<void> {
|
||||
logger.info("bot starting");
|
||||
const health = startHealthServer();
|
||||
const tg = createTelegramBot();
|
||||
setSessionCountsProvider(() => sessionManager.getCounts());
|
||||
|
||||
const tg = createTelegramBot();
|
||||
void tg.start({
|
||||
onStart: (info) => logger.info({ username: info.username }, "telegram polling started"),
|
||||
drop_pending_updates: true,
|
||||
});
|
||||
|
||||
await sessionManager.resumeFromDb();
|
||||
|
||||
const shutdown = async (signal: string): Promise<void> => {
|
||||
logger.info({ signal }, "shutting down");
|
||||
await tg.stop();
|
||||
await sessionManager.stopAll();
|
||||
health.close();
|
||||
await pool.end();
|
||||
process.exit(0);
|
||||
|
||||
27
apps/bot/src/whatsapp/session-manager.test.ts
Normal file
27
apps/bot/src/whatsapp/session-manager.test.ts
Normal file
@ -0,0 +1,27 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { reduceState, type SessionState } from "./session-manager.js";
|
||||
|
||||
describe("reduceState", () => {
|
||||
it("pending → connecting on starting", () => {
|
||||
expect(reduceState("pending", { kind: "starting" })).toBe<SessionState>("connecting");
|
||||
});
|
||||
it("connecting → connected on open", () => {
|
||||
expect(reduceState("connecting", { kind: "open" })).toBe<SessionState>("connected");
|
||||
});
|
||||
it("connected → disconnected on close (not logged out)", () => {
|
||||
expect(reduceState("connected", { kind: "close", loggedOut: false })).toBe<SessionState>(
|
||||
"disconnected",
|
||||
);
|
||||
});
|
||||
it("any → logged_out on logged-out close", () => {
|
||||
expect(reduceState("connected", { kind: "close", loggedOut: true })).toBe<SessionState>(
|
||||
"logged_out",
|
||||
);
|
||||
expect(reduceState("connecting", { kind: "close", loggedOut: true })).toBe<SessionState>(
|
||||
"logged_out",
|
||||
);
|
||||
});
|
||||
it("ignores stray events that don't match transitions", () => {
|
||||
expect(reduceState("connected", { kind: "starting" })).toBe<SessionState>("connected");
|
||||
});
|
||||
});
|
||||
179
apps/bot/src/whatsapp/session-manager.ts
Normal file
179
apps/bot/src/whatsapp/session-manager.ts
Normal file
@ -0,0 +1,179 @@
|
||||
import { eq } from "drizzle-orm";
|
||||
import { whatsappAccounts } from "@cmbot/db";
|
||||
import { db } from "../db.js";
|
||||
import { logger } from "../logger.js";
|
||||
import { startSession, type Session, type SessionEvent } from "./session.js";
|
||||
|
||||
export type SessionState =
|
||||
| "pending"
|
||||
| "connecting"
|
||||
| "connected"
|
||||
| "disconnected"
|
||||
| "logged_out"
|
||||
| "banned";
|
||||
|
||||
export type StateEvent =
|
||||
| { kind: "starting" }
|
||||
| { kind: "open" }
|
||||
| { kind: "close"; loggedOut: boolean };
|
||||
|
||||
export function reduceState(current: SessionState, event: StateEvent): SessionState {
|
||||
if (event.kind === "starting" && current === "pending") return "connecting";
|
||||
if (event.kind === "open" && (current === "connecting" || current === "disconnected")) {
|
||||
return "connected";
|
||||
}
|
||||
if (event.kind === "close") {
|
||||
if (event.loggedOut) return "logged_out";
|
||||
return "disconnected";
|
||||
}
|
||||
return current;
|
||||
}
|
||||
|
||||
export type SessionListener = (
|
||||
accountId: string,
|
||||
state: SessionState,
|
||||
event: SessionEvent,
|
||||
) => void | Promise<void>;
|
||||
|
||||
class SessionManager {
|
||||
private sessions = new Map<string, Session>();
|
||||
private states = new Map<string, SessionState>();
|
||||
private listeners = new Set<SessionListener>();
|
||||
private reconnectTimers = new Map<string, NodeJS.Timeout>();
|
||||
|
||||
on(listener: SessionListener): () => void {
|
||||
this.listeners.add(listener);
|
||||
return () => {
|
||||
this.listeners.delete(listener);
|
||||
};
|
||||
}
|
||||
|
||||
getState(accountId: string): SessionState {
|
||||
return this.states.get(accountId) ?? "pending";
|
||||
}
|
||||
|
||||
getCounts(): Record<SessionState, number> {
|
||||
const counts: Record<SessionState, number> = {
|
||||
pending: 0,
|
||||
connecting: 0,
|
||||
connected: 0,
|
||||
disconnected: 0,
|
||||
logged_out: 0,
|
||||
banned: 0,
|
||||
};
|
||||
for (const state of this.states.values()) counts[state]++;
|
||||
return counts;
|
||||
}
|
||||
|
||||
hasSession(accountId: string): boolean {
|
||||
return this.sessions.has(accountId);
|
||||
}
|
||||
|
||||
getSession(accountId: string): Session | undefined {
|
||||
return this.sessions.get(accountId);
|
||||
}
|
||||
|
||||
async start(accountId: string): Promise<void> {
|
||||
if (this.sessions.has(accountId)) {
|
||||
logger.debug({ accountId }, "session-manager: already running, ignoring start");
|
||||
return;
|
||||
}
|
||||
const existingTimer = this.reconnectTimers.get(accountId);
|
||||
if (existingTimer) {
|
||||
clearTimeout(existingTimer);
|
||||
this.reconnectTimers.delete(accountId);
|
||||
}
|
||||
this.transition(accountId, { kind: "starting" });
|
||||
|
||||
const session = await startSession({
|
||||
accountId,
|
||||
onEvent: (event) => this.handleEvent(accountId, event),
|
||||
});
|
||||
this.sessions.set(accountId, session);
|
||||
}
|
||||
|
||||
async stop(accountId: string): Promise<void> {
|
||||
const timer = this.reconnectTimers.get(accountId);
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
this.reconnectTimers.delete(accountId);
|
||||
}
|
||||
const session = this.sessions.get(accountId);
|
||||
if (!session) return;
|
||||
await session.close();
|
||||
this.sessions.delete(accountId);
|
||||
}
|
||||
|
||||
async stopAll(): Promise<void> {
|
||||
await Promise.all([...this.sessions.keys()].map((id) => this.stop(id)));
|
||||
}
|
||||
|
||||
async resumeFromDb(): Promise<void> {
|
||||
const rows = await db
|
||||
.select({ id: whatsappAccounts.id, status: whatsappAccounts.status })
|
||||
.from(whatsappAccounts);
|
||||
for (const row of rows) {
|
||||
if (row.status === "connected" || row.status === "disconnected") {
|
||||
try {
|
||||
await this.start(row.id);
|
||||
} catch (err) {
|
||||
logger.warn({ err, accountId: row.id }, "resumeFromDb: failed to start");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async handleEvent(accountId: string, event: SessionEvent): Promise<void> {
|
||||
if (event.type === "open") {
|
||||
this.transition(accountId, { kind: "open" });
|
||||
await db
|
||||
.update(whatsappAccounts)
|
||||
.set({
|
||||
status: "connected",
|
||||
phoneNumber: event.phoneNumber ?? null,
|
||||
lastConnectedAt: new Date(),
|
||||
})
|
||||
.where(eq(whatsappAccounts.id, accountId));
|
||||
} else if (event.type === "close") {
|
||||
this.transition(accountId, { kind: "close", loggedOut: event.loggedOut });
|
||||
await db
|
||||
.update(whatsappAccounts)
|
||||
.set({ status: event.loggedOut ? "logged_out" : "disconnected" })
|
||||
.where(eq(whatsappAccounts.id, accountId));
|
||||
|
||||
if (!event.loggedOut) {
|
||||
const timer = setTimeout(() => {
|
||||
this.reconnectTimers.delete(accountId);
|
||||
void this.stop(accountId).then(() => this.start(accountId));
|
||||
}, 5000);
|
||||
this.reconnectTimers.set(accountId, timer);
|
||||
} else {
|
||||
await this.stop(accountId);
|
||||
}
|
||||
} else if (event.type === "qr") {
|
||||
await db
|
||||
.update(whatsappAccounts)
|
||||
.set({ lastQrAt: new Date() })
|
||||
.where(eq(whatsappAccounts.id, accountId));
|
||||
}
|
||||
|
||||
for (const listener of this.listeners) {
|
||||
try {
|
||||
await listener(accountId, this.getState(accountId), event);
|
||||
} catch (err) {
|
||||
logger.warn({ err, accountId }, "session-manager: listener error");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private transition(accountId: string, event: StateEvent): void {
|
||||
const current = this.states.get(accountId) ?? "pending";
|
||||
const next = reduceState(current, event);
|
||||
if (current !== next) {
|
||||
logger.info({ accountId, from: current, to: next }, "session-manager: state change");
|
||||
}
|
||||
this.states.set(accountId, next);
|
||||
}
|
||||
}
|
||||
|
||||
export const sessionManager = new SessionManager();
|
||||
Loading…
x
Reference in New Issue
Block a user