yiekheng 57786f9d09 feat(bot,web): window-end gate + paused/resume run lifecycle
fire-reminder.ts now:

* Computes windowEnd via @cmbot/shared/windowEndAt(timezone, endHour,
  now). Per-target loop trips the gate before sending; pending rows
  are LEFT pending (not flipped to skipped) so the run is resumable.
* Accepts an optional runId on the FireReminderPayload. When set,
  the handler ATTACHES to that existing run instead of creating a
  new one and only re-tries pending targets. Resume is allowed even
  when the reminder.status is 'paused' (otherwise we couldn't drag
  it back into delivery).
* Final-status logic adds a 'paused' branch (window closed mid-run
  with at least one row still pending AND something delivered);
  failed when window closed before any send; partial / success
  otherwise.
* Lifecycle: a paused run flips the reminder row to status='paused'
  and skips the recurring re-arm. Resuming or completing later
  flips it back to 'active'.
* SSE event payload gains optional sent/total counts.

reminderFiredToNotification picks up:
* New 'paused' headline + 'X of Y groups delivered. Tap to resume
  or cancel.' body.
* 'partial' body uses sent/total when present.

WebEventMap and the bot's WebEvent union match the new shape.

Tests:
* fire-reminder.test.ts gains a "resume against paused reminder
  acquires mutex" case.
* notifications.test.ts gains 3 paused/partial-sent body cases.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 15:48:52 +08:00

480 lines
17 KiB
TypeScript

import { and, eq, inArray } from "drizzle-orm";
import { reminderRuns, reminderRunTargets, reminders } from "@cmbot/db";
import {
generateWAMessageContent,
generateMessageID,
type AnyMessageContent,
type proto,
type WASocket,
} from "@whiskeysockets/baileys";
import pLimit from "p-limit";
import { readFile } from "node:fs/promises";
import { db } from "../db.js";
import { logger } from "../logger.js";
import { sessionManager } from "../whatsapp/session-manager.js";
import {
absoluteMediaPath,
nextOccurrence,
resolveDeliveryKind,
windowEndAt,
} from "@cmbot/shared";
import { env } from "../env.js";
import { writeAuditLog } from "../audit.js";
import { getReminderWithDetails } from "../reminders/crud.js";
import { getBoss } from "./pgboss-client.js";
import { scheduleReminderFire } from "./reminder-jobs.js";
import { pgNotifyWeb } from "../ipc/notify.js";
import { accountMutex } from "./per-key-mutex.js";
import { accountRateLimiter } from "./rate-limiter.js";
import { MediaUploadCache } from "./media-upload-cache.js";
export type FireReminderPayload = {
reminderId: string;
/** Optional resume hook. When present, fire-reminder ATTACHES to
* the existing run instead of creating a new one and only re-tries
* targets in `pending` status. Set by the resume server action. */
runId?: string;
};
/** Random delay between same-group message parts. Just enough for
* visible ordering in the chat at WA's natural pace. */
function partJitterMs(): number {
return 200 + Math.floor(Math.random() * 300); // 200..499
}
/** Baileys's WASocket exposes assertSessions on its internal interface,
* but it isn't part of the public type. Call it once per group before
* the first send so relayMessage doesn't trip on missing sessions. */
type SocketWithAssertSessions = WASocket & {
assertSessions?: (jids: string[], force: boolean) => Promise<boolean>;
};
async function ensureGroupSessions(socket: WASocket, groupJid: string): Promise<void> {
const internal = socket as SocketWithAssertSessions;
if (typeof internal.assertSessions !== "function") return;
const meta = await socket.groupMetadata(groupJid);
const participantJids = meta.participants.map((p) => p.id);
// Chunk so a single bad participant doesn't fail the whole group.
const CHUNK = 5;
for (let i = 0; i < participantJids.length; i += CHUNK) {
const chunk = participantJids.slice(i, i + CHUNK);
try {
await internal.assertSessions(chunk, true);
} catch (err) {
logger.warn(
{ groupJid, err: (err as Error).message },
"fire-reminder: assertSessions chunk failed",
);
}
}
}
export async function fireReminder(payload: FireReminderPayload): Promise<void> {
const reminder = await getReminderWithDetails(payload.reminderId);
if (!reminder) {
logger.warn({ reminderId: payload.reminderId }, "fire-reminder: reminder not found");
return;
}
// Resumes are allowed even when the reminder's lifecycle status is
// 'paused' — we WANT to take a paused reminder back to active mid-
// resume. Fresh fires still require status='active'.
if (!payload.runId && reminder.status !== "active") {
logger.info(
{ reminderId: reminder.id, status: reminder.status },
"fire-reminder: skipping (not active)",
);
return;
}
// Per-account mutex: two reminders on the SAME account take turns
// (running them concurrently would double the effective send rate
// and risk a ban). Different accounts run in parallel.
await accountMutex.run(reminder.accountId, () => fireReminderInner(reminder, payload.runId));
}
async function fireReminderInner(
reminder: NonNullable<Awaited<ReturnType<typeof getReminderWithDetails>>>,
resumeRunId?: string,
): Promise<void> {
// Resume path attaches to the existing run row; fresh path inserts a new one.
let runId: string;
if (resumeRunId) {
const existing = await db.query.reminderRuns.findFirst({
where: (r, { eq: dEq }) => dEq(r.id, resumeRunId),
});
if (!existing) {
logger.warn(
{ reminderId: reminder.id, resumeRunId },
"fire-reminder: resume target run missing",
);
return;
}
runId = existing.id;
// Flip the run back to in-flight so the UI stops showing it as paused.
await db
.update(reminderRuns)
.set({ status: "pending", errorSummary: null })
.where(eq(reminderRuns.id, runId));
} else {
const [run] = await db
.insert(reminderRuns)
.values({
reminderId: reminder.id,
reminderName: reminder.name,
status: "pending",
})
.returning({ id: reminderRuns.id });
runId = run!.id;
}
const session = sessionManager.getSession(reminder.accountId);
if (!session) {
logger.warn({ reminderId: reminder.id }, "fire-reminder: account not connected");
if (!resumeRunId) {
await markAllSkipped(runId, reminder, "account not connected");
}
await db
.update(reminderRuns)
.set({ status: "skipped", errorSummary: "account not connected" })
.where(eq(reminderRuns.id, runId));
await pgNotifyWeb({
type: "reminder.fired",
reminderId: reminder.id,
runId,
status: "skipped",
});
return;
}
// Up-front bulk loads. Drops ~3000 round-trips to ~3 for a 1000-group run.
const groupIds = reminder.targets.map((t) => t.groupId);
const groupRows = groupIds.length
? await db.query.whatsappGroups.findMany({ where: (g) => inArray(g.id, groupIds) })
: [];
const groupById = new Map(groupRows.map((g) => [g.id, g]));
const mediaIds = Array.from(
new Set(reminder.messages.map((m) => m.mediaId).filter((id): id is string => Boolean(id))),
);
const mediaRows = mediaIds.length
? await db.query.mediaFiles.findMany({ where: (m) => inArray(m.id, mediaIds) })
: [];
const mediaById = new Map(mediaRows.map((m) => [m.id, m]));
// Pre-create run_target rows on the fresh path so the Activity tab
// shows progress mid-run. Resume reuses the existing rows.
if (!resumeRunId && reminder.targets.length > 0) {
await db.insert(reminderRunTargets).values(
reminder.targets.map((t) => ({
runId,
groupId: t.groupId,
groupLabel: groupById.get(t.groupId)?.name ?? null,
status: "pending" as const,
})),
);
}
// On resume, only the still-pending rows are processed. On a fresh
// fire that's every row since we just inserted them all as pending.
const pendingRows = await db.query.reminderRunTargets.findMany({
where: (t, { eq: dEq, and: dAnd }) => dAnd(dEq(t.runId, runId), dEq(t.status, "pending")),
});
const pendingGroupIds = new Set(pendingRows.map((r) => r.groupId));
const targetsToProcess = reminder.targets.filter((t) => pendingGroupIds.has(t.groupId));
// Already-sent / already-failed counts from prior run rounds (resume
// case). The final tally adds these to what THIS round produces.
const priorSentCount = resumeRunId
? (
await db.query.reminderRunTargets.findMany({
where: (t, { eq: dEq, and: dAnd }) =>
dAnd(dEq(t.runId, runId), dEq(t.status, "sent")),
})
).length
: 0;
const priorFailedCount = resumeRunId
? (
await db.query.reminderRunTargets.findMany({
where: (t, { eq: dEq, and: dAnd }) =>
dAnd(dEq(t.runId, runId), dEq(t.status, "failed")),
})
).length
: 0;
// Window-end timestamp. If the reminder fires AFTER today's deadline
// hour (cron miss-fired late, or it's already 7pm) this is in the
// past and the FIRST gate check trips immediately, ending the run
// as failed without sending anything.
const windowEnd = windowEndAt(
reminder.timezone,
reminder.deliveryWindowEndHour,
new Date(),
);
// Per-run media upload cache (one prepare call per unique mediaId).
const uploadCache = new MediaUploadCache<proto.IMessage>(async (mediaId) => {
const media = mediaById.get(mediaId);
if (!media) throw new Error(`media row missing: ${mediaId}`);
const filePath = absoluteMediaPath(media.storagePath, env.MEDIA_DIR);
const buffer = await readFile(filePath);
const head = buffer.subarray(0, 12);
const resolved = resolveDeliveryKind(media.mimeType, head);
const senderKind: "image" | "video" | "document" =
resolved === "image" || resolved === "video" ? resolved : "document";
const content: AnyMessageContent =
senderKind === "image"
? { image: buffer, mimetype: media.mimeType }
: senderKind === "video"
? { video: buffer, mimetype: media.mimeType }
: {
document: buffer,
fileName: media.filenameOriginal,
mimetype: media.mimeType,
};
return generateWAMessageContent(content, {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
upload: (session.socket as any).waUploadToServer,
});
});
// Per-account rate limiter — gates each socket send.
const rateLimiter = accountRateLimiter.get(reminder.accountId);
let sentCount = 0;
let failedCount = 0;
let skippedCount = 0;
let windowClosed = false;
const groupConcurrency = pLimit(env.BOT_GROUP_CONCURRENCY);
await Promise.all(
targetsToProcess.map((target) =>
groupConcurrency(async () => {
// Window-end gate. CRITICAL: leave the row as `pending` (NOT
// `skipped`) so the run can be resumed later.
if (Date.now() >= windowEnd.getTime()) {
windowClosed = true;
return;
}
const group = groupById.get(target.groupId);
if (!group) {
await db
.update(reminderRunTargets)
.set({ status: "skipped", error: "group missing from db" })
.where(
and(
eq(reminderRunTargets.runId, runId),
eq(reminderRunTargets.groupId, target.groupId),
),
);
skippedCount++;
return;
}
const start = Date.now();
try {
await ensureGroupSessions(session.socket, group.waGroupJid);
let lastMessageId: string | undefined;
for (const part of reminder.messages) {
await rateLimiter.acquire();
if (part.kind === "text" && part.textContent) {
const r = await session.socket.sendMessage(group.waGroupJid, {
text: part.textContent,
});
lastMessageId = r?.key?.id ?? undefined;
} else if (part.mediaId) {
const prebuilt = await uploadCache.get(part.mediaId);
if (part.textContent) injectCaption(prebuilt, part.textContent);
const messageId = generateMessageID();
await session.socket.relayMessage(group.waGroupJid, prebuilt, { messageId });
lastMessageId = messageId;
}
await new Promise((r) => setTimeout(r, partJitterMs()));
}
await db
.update(reminderRunTargets)
.set({
status: "sent",
waMessageId: lastMessageId ?? null,
latencyMs: Date.now() - start,
})
.where(
and(
eq(reminderRunTargets.runId, runId),
eq(reminderRunTargets.groupId, target.groupId),
),
);
sentCount++;
} catch (err) {
logger.error(
{ err, reminderId: reminder.id, groupId: target.groupId },
"fire-reminder: send failed",
);
await db
.update(reminderRunTargets)
.set({ status: "failed", error: (err as Error).message })
.where(
and(
eq(reminderRunTargets.runId, runId),
eq(reminderRunTargets.groupId, target.groupId),
),
);
failedCount++;
}
}),
),
);
// Compose the final status. Four shapes:
// paused : window closed mid-run with at least one row still pending
// AND we delivered at least one in this run or a prior round.
// Resumable. Sent rows stay sent, pending stays pending.
// success : every target sent.
// partial : every target attempted; some sent, some failed/skipped.
// failed : zero sent across all rounds, OR window closed before the
// first send (no progress to resume).
const total = reminder.targets.length;
const totalSent = priorSentCount + sentCount;
const totalFailed = priorFailedCount + failedCount;
const remainingPending = (
await db.query.reminderRunTargets.findMany({
where: (t, { eq: dEq, and: dAnd }) =>
dAnd(dEq(t.runId, runId), dEq(t.status, "pending")),
})
).length;
let status: "success" | "partial" | "failed" | "paused";
let errorSummary: string | null = null;
if (windowClosed && remainingPending > 0 && totalSent > 0) {
status = "paused";
errorSummary = `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}). ${totalSent} of ${total} groups delivered, ${remainingPending} still pending. Resume from the Activity tab.`;
} else if (windowClosed && totalSent === 0) {
status = "failed";
errorSummary = `Delivery window closed at ${reminder.deliveryWindowEndHour}:00 (${reminder.timezone}) before any group could be sent. The reminder fired too late in the day.`;
} else if (totalSent === total) {
status = "success";
} else if (totalSent > 0) {
status = "partial";
errorSummary = `${totalSent} of ${total} groups delivered (${totalFailed} failed, ${skippedCount} skipped).`;
} else {
status = "failed";
errorSummary = total === 0 ? "No targets attached to reminder." : `All ${total} sends failed.`;
}
await db
.update(reminderRuns)
.set({ status, errorSummary })
.where(eq(reminderRuns.id, runId));
await pgNotifyWeb({
type: "reminder.fired",
reminderId: reminder.id,
runId,
status,
sent: totalSent,
total,
});
// Lifecycle bookkeeping. Skip when the run is paused — the reminder
// shouldn't end or re-arm while a resume is still possible. We also
// flip the reminder row itself to status='paused' so dashboards and
// the list view can reflect it.
if (status === "paused") {
await db
.update(reminders)
.set({ status: "paused", updatedAt: new Date() })
.where(eq(reminders.id, reminder.id));
logger.info(
{ reminderId: reminder.id, runId, totalSent, remainingPending },
"fire-reminder: paused — leaving lifecycle alone for resume",
);
} else {
if (reminder.scheduleKind === "one_off") {
await db
.update(reminders)
.set({ status: "ended", updatedAt: new Date() })
.where(eq(reminders.id, reminder.id));
} else if (reminder.scheduleKind === "recurring" && reminder.rrule) {
const next = nextOccurrence(reminder.rrule, reminder.timezone, new Date());
await db
.update(reminders)
.set({
// If we're resuming a previously-paused reminder, lift it
// back to active so the next cron occurrence fires normally.
status: "active",
lastFiredAt: new Date(),
updatedAt: new Date(),
})
.where(eq(reminders.id, reminder.id));
if (next) {
try {
await scheduleReminderFire(getBoss(), reminder.id, next);
logger.info({ reminderId: reminder.id, next }, "fire-reminder: re-armed for next occurrence");
} catch (err) {
logger.error({ err, reminderId: reminder.id }, "fire-reminder: failed to re-arm next occurrence");
}
} else {
logger.info({ reminderId: reminder.id }, "fire-reminder: no further occurrences, ending");
await db.update(reminders).set({ status: "ended" }).where(eq(reminders.id, reminder.id));
}
}
}
await writeAuditLog(db, {
operatorId: reminder.createdBy,
source: "system",
action: "reminder.fired",
targetType: "reminder",
targetId: reminder.id,
payload: { runId, status, sent: sentCount, failed: failedCount, skipped: skippedCount },
});
logger.info(
{
reminderId: reminder.id,
runId,
status,
sent: sentCount,
failed: failedCount,
skipped: skippedCount,
windowClosed,
},
"fire-reminder: done",
);
}
async function markAllSkipped(
runId: string,
reminder: NonNullable<Awaited<ReturnType<typeof getReminderWithDetails>>>,
error: string,
): Promise<void> {
if (reminder.targets.length === 0) return;
const rows = await db.query.whatsappGroups.findMany({
where: (g) => inArray(g.id, reminder.targets.map((t) => t.groupId)),
columns: { id: true, name: true },
});
const labelById = new Map(rows.map((r) => [r.id, r.name]));
await db.insert(reminderRunTargets).values(
reminder.targets.map((t) => ({
runId,
groupId: t.groupId,
groupLabel: labelById.get(t.groupId) ?? null,
status: "skipped" as const,
error,
})),
);
}
/**
* Inject the caption into the prebuilt media message. Baileys' relayMessage
* doesn't take a caption alongside the content; the protobuf already has
* the slot, so we mutate it just before relaying.
*/
function injectCaption(msg: proto.IMessage, caption: string): void {
if (msg.imageMessage) msg.imageMessage.caption = caption;
else if (msg.videoMessage) msg.videoMessage.caption = caption;
else if (msg.documentMessage) msg.documentMessage.caption = caption;
}