feat(bot): clean up stale pair flows + 5-min pair timeout
Two related fixes for abandoned pairings: - After /pair starts a Baileys session, arm a 5-minute timer. If the operator doesn't scan in time the bot stops the session, deletes the pending account row + session files, and pings them in Telegram. - On bot startup, sweep any 'pending' account rows older than 1 hour — catches the case where the bot was restarted mid-pair, leaving a stale row no in-memory state could clean up.
This commit is contained in:
parent
5a775e076b
commit
97099bf28a
@ -5,6 +5,7 @@ import { createTelegramBot } from "./telegram/bot.js";
|
|||||||
import { sessionManager } from "./whatsapp/session-manager.js";
|
import { sessionManager } from "./whatsapp/session-manager.js";
|
||||||
import { startBoss, stopBoss } from "./scheduler/pgboss-client.js";
|
import { startBoss, stopBoss } from "./scheduler/pgboss-client.js";
|
||||||
import { registerReminderJobs } from "./scheduler/reminder-jobs.js";
|
import { registerReminderJobs } from "./scheduler/reminder-jobs.js";
|
||||||
|
import { sweepStalePendingAccounts } from "./telegram/commands/pair.js";
|
||||||
|
|
||||||
async function main(): Promise<void> {
|
async function main(): Promise<void> {
|
||||||
logger.info("bot starting");
|
logger.info("bot starting");
|
||||||
@ -20,6 +21,7 @@ async function main(): Promise<void> {
|
|||||||
drop_pending_updates: true,
|
drop_pending_updates: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
await sweepStalePendingAccounts();
|
||||||
await sessionManager.resumeFromDb();
|
await sessionManager.resumeFromDb();
|
||||||
|
|
||||||
const shutdown = async (signal: string): Promise<void> => {
|
const shutdown = async (signal: string): Promise<void> => {
|
||||||
|
|||||||
@ -2,6 +2,7 @@ import type { Context } from "grammy";
|
|||||||
import { InputFile } from "grammy";
|
import { InputFile } from "grammy";
|
||||||
import { rm } from "node:fs/promises";
|
import { rm } from "node:fs/promises";
|
||||||
import { join } from "node:path";
|
import { join } from "node:path";
|
||||||
|
import { eq, and, lt } from "drizzle-orm";
|
||||||
import { whatsappAccounts } from "@cmbot/db";
|
import { whatsappAccounts } from "@cmbot/db";
|
||||||
import { db } from "../../db.js";
|
import { db } from "../../db.js";
|
||||||
import { env } from "../../env.js";
|
import { env } from "../../env.js";
|
||||||
@ -19,6 +20,9 @@ import { InlineKeyboard } from "grammy";
|
|||||||
const qrMessageIdByAccount = new Map<string, number>();
|
const qrMessageIdByAccount = new Map<string, number>();
|
||||||
const lastQrPayloadByAccount = new Map<string, string>();
|
const lastQrPayloadByAccount = new Map<string, string>();
|
||||||
const offByAccount = new Map<string, () => void>();
|
const offByAccount = new Map<string, () => void>();
|
||||||
|
const pairTimeouts = new Map<string, NodeJS.Timeout>();
|
||||||
|
|
||||||
|
const PAIR_TIMEOUT_MS = 5 * 60 * 1000;
|
||||||
|
|
||||||
async function cancelExistingFlow(accountId: string): Promise<void> {
|
async function cancelExistingFlow(accountId: string): Promise<void> {
|
||||||
const off = offByAccount.get(accountId);
|
const off = offByAccount.get(accountId);
|
||||||
@ -26,6 +30,11 @@ async function cancelExistingFlow(accountId: string): Promise<void> {
|
|||||||
off();
|
off();
|
||||||
offByAccount.delete(accountId);
|
offByAccount.delete(accountId);
|
||||||
}
|
}
|
||||||
|
const t = pairTimeouts.get(accountId);
|
||||||
|
if (t) {
|
||||||
|
clearTimeout(t);
|
||||||
|
pairTimeouts.delete(accountId);
|
||||||
|
}
|
||||||
qrMessageIdByAccount.delete(accountId);
|
qrMessageIdByAccount.delete(accountId);
|
||||||
lastQrPayloadByAccount.delete(accountId);
|
lastQrPayloadByAccount.delete(accountId);
|
||||||
if (sessionManager.hasSession(accountId)) {
|
if (sessionManager.hasSession(accountId)) {
|
||||||
@ -35,6 +44,46 @@ async function cancelExistingFlow(accountId: string): Promise<void> {
|
|||||||
await rm(join(env.SESSIONS_DIR, accountId), { recursive: true, force: true });
|
await rm(join(env.SESSIONS_DIR, accountId), { recursive: true, force: true });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tear down a pairing flow that the operator never completed. Removes the
|
||||||
|
* Baileys session, deletes session files, deletes the DB row, and clears
|
||||||
|
* in-memory tracking.
|
||||||
|
*/
|
||||||
|
async function abandonPair(accountId: string): Promise<{ existed: boolean; label: string | null }> {
|
||||||
|
const account = await db.query.whatsappAccounts.findFirst({
|
||||||
|
where: (a, { eq }) => eq(a.id, accountId),
|
||||||
|
});
|
||||||
|
// Only abandon if it's still pending (operator might have just succeeded
|
||||||
|
// or repaired in another flow).
|
||||||
|
if (!account || account.status !== "pending") {
|
||||||
|
return { existed: false, label: account?.label ?? null };
|
||||||
|
}
|
||||||
|
await cancelExistingFlow(accountId);
|
||||||
|
await db.delete(whatsappAccounts).where(eq(whatsappAccounts.id, accountId));
|
||||||
|
return { existed: true, label: account.label };
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sweep stale `pending` accounts on bot startup. If pairing was started
|
||||||
|
* before a restart, the in-memory state was lost — but the DB row remained.
|
||||||
|
* Anything older than ~1 hour is considered abandoned and removed.
|
||||||
|
*/
|
||||||
|
export async function sweepStalePendingAccounts(): Promise<void> {
|
||||||
|
const cutoff = new Date(Date.now() - 60 * 60 * 1000);
|
||||||
|
const stale = await db
|
||||||
|
.select({ id: whatsappAccounts.id, label: whatsappAccounts.label })
|
||||||
|
.from(whatsappAccounts)
|
||||||
|
.where(and(eq(whatsappAccounts.status, "pending"), lt(whatsappAccounts.createdAt, cutoff)));
|
||||||
|
for (const row of stale) {
|
||||||
|
await rm(join(env.SESSIONS_DIR, row.id), { recursive: true, force: true });
|
||||||
|
await db.delete(whatsappAccounts).where(eq(whatsappAccounts.id, row.id));
|
||||||
|
logger.info({ accountId: row.id, label: row.label }, "sweep: removed stale pending account");
|
||||||
|
}
|
||||||
|
if (stale.length > 0) {
|
||||||
|
logger.info({ count: stale.length }, "sweep: stale pending accounts cleaned");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function handlePair(ctx: Context): Promise<void> {
|
export async function handlePair(ctx: Context): Promise<void> {
|
||||||
const text = ctx.message?.text ?? "";
|
const text = ctx.message?.text ?? "";
|
||||||
const label = text
|
const label = text
|
||||||
@ -129,6 +178,11 @@ export async function executePairFlow(ctx: Context, label: string): Promise<void
|
|||||||
qrMessageIdByAccount.delete(id);
|
qrMessageIdByAccount.delete(id);
|
||||||
lastQrPayloadByAccount.delete(id);
|
lastQrPayloadByAccount.delete(id);
|
||||||
offByAccount.delete(id);
|
offByAccount.delete(id);
|
||||||
|
const t = pairTimeouts.get(id);
|
||||||
|
if (t) {
|
||||||
|
clearTimeout(t);
|
||||||
|
pairTimeouts.delete(id);
|
||||||
|
}
|
||||||
await writeAuditLog(db, {
|
await writeAuditLog(db, {
|
||||||
operatorId: operatorRow.id,
|
operatorId: operatorRow.id,
|
||||||
source: "telegram",
|
source: "telegram",
|
||||||
@ -157,6 +211,11 @@ export async function executePairFlow(ctx: Context, label: string): Promise<void
|
|||||||
qrMessageIdByAccount.delete(id);
|
qrMessageIdByAccount.delete(id);
|
||||||
lastQrPayloadByAccount.delete(id);
|
lastQrPayloadByAccount.delete(id);
|
||||||
offByAccount.delete(id);
|
offByAccount.delete(id);
|
||||||
|
const t = pairTimeouts.get(id);
|
||||||
|
if (t) {
|
||||||
|
clearTimeout(t);
|
||||||
|
pairTimeouts.delete(id);
|
||||||
|
}
|
||||||
const kb = new InlineKeyboard().text("⬅ Main Menu", "m:main");
|
const kb = new InlineKeyboard().text("⬅ Main Menu", "m:main");
|
||||||
await ctx.reply(`⚠️ Pairing failed (logged out).`, { reply_markup: kb });
|
await ctx.reply(`⚠️ Pairing failed (logged out).`, { reply_markup: kb });
|
||||||
off();
|
off();
|
||||||
@ -174,5 +233,25 @@ export async function executePairFlow(ctx: Context, label: string): Promise<void
|
|||||||
await ctx.reply(`Pairing failed to start: ${(err as Error).message}`);
|
await ctx.reply(`Pairing failed to start: ${(err as Error).message}`);
|
||||||
off();
|
off();
|
||||||
offByAccount.delete(accountId);
|
offByAccount.delete(accountId);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Arm the abandonment timer. If the operator doesn't complete pairing
|
||||||
|
// within PAIR_TIMEOUT_MS, clean up the row + session and notify them.
|
||||||
|
const timeoutId = setTimeout(() => {
|
||||||
|
void (async () => {
|
||||||
|
try {
|
||||||
|
const result = await abandonPair(accountId);
|
||||||
|
if (result.existed) {
|
||||||
|
await ctx.reply(
|
||||||
|
`⌛ Pairing for "${result.label ?? label}" timed out (no scan within ${PAIR_TIMEOUT_MS / 60000} min). Account removed.`,
|
||||||
|
{ reply_markup: new InlineKeyboard().text("⬅ Main Menu", "m:main") },
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
logger.error({ err, accountId }, "pair: abandonment cleanup failed");
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
}, PAIR_TIMEOUT_MS);
|
||||||
|
pairTimeouts.set(accountId, timeoutId);
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user