import { eq } from "drizzle-orm"; import { reminderRuns, reminderRunTargets, reminders } from "@cmbot/db"; import { db } from "../db.js"; import { logger } from "../logger.js"; import { sessionManager } from "../whatsapp/session-manager.js"; import { sendTextToGroup, sendMediaToGroup } from "../whatsapp/sender.js"; import { absoluteMediaPath, nextOccurrence } from "@cmbot/shared"; import { env } from "../env.js"; import { writeAuditLog } from "../audit.js"; import { getReminderWithDetails } from "../reminders/crud.js"; import { getBoss } from "./pgboss-client.js"; import { scheduleReminderFire } from "./reminder-jobs.js"; export type FireReminderPayload = { reminderId: string }; export async function fireReminder(payload: FireReminderPayload): Promise { const reminder = await getReminderWithDetails(payload.reminderId); if (!reminder) { logger.warn({ reminderId: payload.reminderId }, "fire-reminder: reminder not found"); return; } if (reminder.status !== "active") { logger.info({ reminderId: reminder.id, status: reminder.status }, "fire-reminder: skipping (not active)"); return; } const [run] = await db .insert(reminderRuns) .values({ reminderId: reminder.id, // Snapshot the name so the run row stays readable in history even // after the reminder is deleted (FK is ON DELETE SET NULL). reminderName: reminder.name, status: "pending", }) .returning({ id: reminderRuns.id }); const runId = run!.id; const session = sessionManager.getSession(reminder.accountId); if (!session) { logger.warn({ reminderId: reminder.id }, "fire-reminder: account not connected"); for (const target of reminder.targets) { await db.insert(reminderRunTargets).values({ runId, groupId: target.groupId, status: "skipped", error: "account not connected", }); } await db .update(reminderRuns) .set({ status: "skipped", errorSummary: "account not connected" }) .where(eq(reminderRuns.id, runId)); return; } let allSent = true; let anySent = false; for (const target of reminder.targets) { const group = await db.query.whatsappGroups.findFirst({ where: (g, { eq }) => eq(g.id, target.groupId), }); if (!group) { await db.insert(reminderRunTargets).values({ runId, groupId: target.groupId, status: "skipped", error: "group missing from db", }); allSent = false; continue; } const start = Date.now(); try { let lastMessageId: string | undefined; for (const part of reminder.messages) { if (part.kind === "text" && part.textContent) { const r = await sendTextToGroup(session.socket, group.waGroupJid, part.textContent); lastMessageId = r.messageId; } else if (part.mediaId) { const media = await db.query.mediaFiles.findFirst({ where: (m, { eq }) => eq(m.id, part.mediaId!), }); if (!media) throw new Error(`media row missing: ${part.mediaId}`); const filePath = absoluteMediaPath(media.storagePath, env.MEDIA_DIR); // Map our DB kind ('media' or 'image'/'video'/'document') to sender kind. // For now we infer from mime type since createReminder stores 'media'. const senderKind: "image" | "video" | "document" = media.mimeType.startsWith("image/") ? "image" : media.mimeType.startsWith("video/") ? "video" : "document"; const r = await sendMediaToGroup(session.socket, group.waGroupJid, senderKind, filePath, { caption: part.textContent ?? undefined, mimeType: media.mimeType, filename: media.filenameOriginal, }); lastMessageId = r.messageId; } // 1.5s jitter between message parts to stay under WA's rate limit await new Promise((r) => setTimeout(r, 1500)); } await db.insert(reminderRunTargets).values({ runId, groupId: target.groupId, status: "sent", waMessageId: lastMessageId ?? null, latencyMs: Date.now() - start, }); anySent = true; } catch (err) { logger.error({ err, reminderId: reminder.id, groupId: target.groupId }, "fire-reminder: send failed"); await db.insert(reminderRunTargets).values({ runId, groupId: target.groupId, status: "failed", error: (err as Error).message, }); allSent = false; } } const status = allSent ? "success" : anySent ? "partial" : "failed"; await db .update(reminderRuns) .set({ status }) .where(eq(reminderRuns.id, runId)); // One-off reminders end after firing. Recurring reminders compute the // next occurrence from the RRULE and re-arm the pg-boss job; only the // last fire timestamp + updatedAt move forward. if (reminder.scheduleKind === "one_off") { await db .update(reminders) .set({ status: "ended", updatedAt: new Date() }) .where(eq(reminders.id, reminder.id)); } else if (reminder.scheduleKind === "recurring" && reminder.rrule) { const next = nextOccurrence(reminder.rrule, reminder.timezone, new Date()); await db .update(reminders) .set({ lastFiredAt: new Date(), updatedAt: new Date() }) .where(eq(reminders.id, reminder.id)); if (next) { try { await scheduleReminderFire(getBoss(), reminder.id, next); logger.info({ reminderId: reminder.id, next }, "fire-reminder: re-armed for next occurrence"); } catch (err) { logger.error({ err, reminderId: reminder.id }, "fire-reminder: failed to re-arm next occurrence"); } } else { logger.info({ reminderId: reminder.id }, "fire-reminder: no further occurrences, ending"); await db.update(reminders).set({ status: "ended" }).where(eq(reminders.id, reminder.id)); } } await writeAuditLog(db, { operatorId: reminder.createdBy, source: "system", action: "reminder.fired", targetType: "reminder", targetId: reminder.id, payload: { runId, status }, }); logger.info({ reminderId: reminder.id, runId, status }, "fire-reminder: done"); }