From 97099bf28a57acb649f2508330018c26e46df8e9 Mon Sep 17 00:00:00 2001 From: yiekheng Date: Sat, 9 May 2026 21:59:48 +0800 Subject: [PATCH] feat(bot): clean up stale pair flows + 5-min pair timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related fixes for abandoned pairings: - After /pair starts a Baileys session, arm a 5-minute timer. If the operator doesn't scan in time the bot stops the session, deletes the pending account row + session files, and pings them in Telegram. - On bot startup, sweep any 'pending' account rows older than 1 hour — catches the case where the bot was restarted mid-pair, leaving a stale row no in-memory state could clean up. --- apps/bot/src/index.ts | 2 + apps/bot/src/telegram/commands/pair.ts | 79 ++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/apps/bot/src/index.ts b/apps/bot/src/index.ts index 43e4376..e0e0f63 100644 --- a/apps/bot/src/index.ts +++ b/apps/bot/src/index.ts @@ -5,6 +5,7 @@ import { createTelegramBot } from "./telegram/bot.js"; import { sessionManager } from "./whatsapp/session-manager.js"; import { startBoss, stopBoss } from "./scheduler/pgboss-client.js"; import { registerReminderJobs } from "./scheduler/reminder-jobs.js"; +import { sweepStalePendingAccounts } from "./telegram/commands/pair.js"; async function main(): Promise { logger.info("bot starting"); @@ -20,6 +21,7 @@ async function main(): Promise { drop_pending_updates: true, }); + await sweepStalePendingAccounts(); await sessionManager.resumeFromDb(); const shutdown = async (signal: string): Promise => { diff --git a/apps/bot/src/telegram/commands/pair.ts b/apps/bot/src/telegram/commands/pair.ts index 4efa528..6882073 100644 --- a/apps/bot/src/telegram/commands/pair.ts +++ b/apps/bot/src/telegram/commands/pair.ts @@ -2,6 +2,7 @@ import type { Context } from "grammy"; import { InputFile } from "grammy"; import { rm } from "node:fs/promises"; import { join } from "node:path"; +import { eq, and, lt } from "drizzle-orm"; import { whatsappAccounts } from "@cmbot/db"; import { db } from "../../db.js"; import { env } from "../../env.js"; @@ -19,6 +20,9 @@ import { InlineKeyboard } from "grammy"; const qrMessageIdByAccount = new Map(); const lastQrPayloadByAccount = new Map(); const offByAccount = new Map void>(); +const pairTimeouts = new Map(); + +const PAIR_TIMEOUT_MS = 5 * 60 * 1000; async function cancelExistingFlow(accountId: string): Promise { const off = offByAccount.get(accountId); @@ -26,6 +30,11 @@ async function cancelExistingFlow(accountId: string): Promise { off(); offByAccount.delete(accountId); } + const t = pairTimeouts.get(accountId); + if (t) { + clearTimeout(t); + pairTimeouts.delete(accountId); + } qrMessageIdByAccount.delete(accountId); lastQrPayloadByAccount.delete(accountId); if (sessionManager.hasSession(accountId)) { @@ -35,6 +44,46 @@ async function cancelExistingFlow(accountId: string): Promise { await rm(join(env.SESSIONS_DIR, accountId), { recursive: true, force: true }); } +/** + * Tear down a pairing flow that the operator never completed. Removes the + * Baileys session, deletes session files, deletes the DB row, and clears + * in-memory tracking. + */ +async function abandonPair(accountId: string): Promise<{ existed: boolean; label: string | null }> { + const account = await db.query.whatsappAccounts.findFirst({ + where: (a, { eq }) => eq(a.id, accountId), + }); + // Only abandon if it's still pending (operator might have just succeeded + // or repaired in another flow). + if (!account || account.status !== "pending") { + return { existed: false, label: account?.label ?? null }; + } + await cancelExistingFlow(accountId); + await db.delete(whatsappAccounts).where(eq(whatsappAccounts.id, accountId)); + return { existed: true, label: account.label }; +} + +/** + * Sweep stale `pending` accounts on bot startup. If pairing was started + * before a restart, the in-memory state was lost — but the DB row remained. + * Anything older than ~1 hour is considered abandoned and removed. + */ +export async function sweepStalePendingAccounts(): Promise { + const cutoff = new Date(Date.now() - 60 * 60 * 1000); + const stale = await db + .select({ id: whatsappAccounts.id, label: whatsappAccounts.label }) + .from(whatsappAccounts) + .where(and(eq(whatsappAccounts.status, "pending"), lt(whatsappAccounts.createdAt, cutoff))); + for (const row of stale) { + await rm(join(env.SESSIONS_DIR, row.id), { recursive: true, force: true }); + await db.delete(whatsappAccounts).where(eq(whatsappAccounts.id, row.id)); + logger.info({ accountId: row.id, label: row.label }, "sweep: removed stale pending account"); + } + if (stale.length > 0) { + logger.info({ count: stale.length }, "sweep: stale pending accounts cleaned"); + } +} + export async function handlePair(ctx: Context): Promise { const text = ctx.message?.text ?? ""; const label = text @@ -129,6 +178,11 @@ export async function executePairFlow(ctx: Context, label: string): Promise { + void (async () => { + try { + const result = await abandonPair(accountId); + if (result.existed) { + await ctx.reply( + `⌛ Pairing for "${result.label ?? label}" timed out (no scan within ${PAIR_TIMEOUT_MS / 60000} min). Account removed.`, + { reply_markup: new InlineKeyboard().text("⬅ Main Menu", "m:main") }, + ); + } + } catch (err) { + logger.error({ err, accountId }, "pair: abandonment cleanup failed"); + } + })(); + }, PAIR_TIMEOUT_MS); + pairTimeouts.set(accountId, timeoutId); }