From 01eb5752eee542b21be78d28c9d0a76dddc98854 Mon Sep 17 00:00:00 2001 From: yiekheng Date: Sat, 9 May 2026 17:29:21 +0800 Subject: [PATCH] feat(scheduler): add fire-reminder handler + job registration Also fix rrule default-import workaround so the shared package loads correctly under NodeNext ESM resolution (rrule@2.8.1 has no exports field). --- apps/bot/src/scheduler/fire-reminder.ts | 132 ++++++++++++++++++++++++ apps/bot/src/scheduler/reminder-jobs.ts | 43 +++++++- packages/shared/src/rrule.ts | 8 +- 3 files changed, 178 insertions(+), 5 deletions(-) create mode 100644 apps/bot/src/scheduler/fire-reminder.ts diff --git a/apps/bot/src/scheduler/fire-reminder.ts b/apps/bot/src/scheduler/fire-reminder.ts new file mode 100644 index 0000000..5fc1e79 --- /dev/null +++ b/apps/bot/src/scheduler/fire-reminder.ts @@ -0,0 +1,132 @@ +import { eq } from "drizzle-orm"; +import { reminderRuns, reminderRunTargets } from "@cmbot/db"; +import { db } from "../db.js"; +import { logger } from "../logger.js"; +import { sessionManager } from "../whatsapp/session-manager.js"; +import { sendTextToGroup, sendMediaToGroup } from "../whatsapp/sender.js"; +import { absoluteMediaPath } from "@cmbot/shared"; +import { env } from "../env.js"; +import { writeAuditLog } from "../audit.js"; +import { getReminderWithDetails } from "../reminders/crud.js"; + +export type FireReminderPayload = { reminderId: string }; + +export async function fireReminder(payload: FireReminderPayload): Promise { + const reminder = await getReminderWithDetails(payload.reminderId); + if (!reminder) { + logger.warn({ reminderId: payload.reminderId }, "fire-reminder: reminder not found"); + return; + } + if (reminder.status !== "active") { + logger.info({ reminderId: reminder.id, status: reminder.status }, "fire-reminder: skipping (not active)"); + return; + } + + const [run] = await db + .insert(reminderRuns) + .values({ reminderId: reminder.id, status: "pending" }) + .returning({ id: reminderRuns.id }); + const runId = run!.id; + + const session = sessionManager.getSession(reminder.accountId); + if (!session) { + logger.warn({ reminderId: reminder.id }, "fire-reminder: account not connected"); + for (const target of reminder.targets) { + await db.insert(reminderRunTargets).values({ + runId, + groupId: target.groupId, + status: "skipped", + error: "account not connected", + }); + } + await db + .update(reminderRuns) + .set({ status: "skipped", errorSummary: "account not connected" }) + .where(eq(reminderRuns.id, runId)); + return; + } + + let allSent = true; + let anySent = false; + for (const target of reminder.targets) { + const group = await db.query.whatsappGroups.findFirst({ + where: (g, { eq }) => eq(g.id, target.groupId), + }); + if (!group) { + await db.insert(reminderRunTargets).values({ + runId, + groupId: target.groupId, + status: "skipped", + error: "group missing from db", + }); + allSent = false; + continue; + } + const start = Date.now(); + try { + let lastMessageId: string | undefined; + for (const part of reminder.messages) { + if (part.kind === "text" && part.textContent) { + const r = await sendTextToGroup(session.socket, group.waGroupJid, part.textContent); + lastMessageId = r.messageId; + } else if (part.mediaId) { + const media = await db.query.mediaFiles.findFirst({ + where: (m, { eq }) => eq(m.id, part.mediaId!), + }); + if (!media) throw new Error(`media row missing: ${part.mediaId}`); + const filePath = absoluteMediaPath(media.storagePath, env.MEDIA_DIR); + // Map our DB kind ('media' or 'image'/'video'/'document') to sender kind. + // For now we infer from mime type since createReminder stores 'media'. + const senderKind: "image" | "video" | "document" = + media.mimeType.startsWith("image/") + ? "image" + : media.mimeType.startsWith("video/") + ? "video" + : "document"; + const r = await sendMediaToGroup(session.socket, group.waGroupJid, senderKind, filePath, { + caption: part.textContent ?? undefined, + mimeType: media.mimeType, + filename: media.filenameOriginal, + }); + lastMessageId = r.messageId; + } + // 1.5s jitter between message parts to stay under WA's rate limit + await new Promise((r) => setTimeout(r, 1500)); + } + await db.insert(reminderRunTargets).values({ + runId, + groupId: target.groupId, + status: "sent", + waMessageId: lastMessageId ?? null, + latencyMs: Date.now() - start, + }); + anySent = true; + } catch (err) { + logger.error({ err, reminderId: reminder.id, groupId: target.groupId }, "fire-reminder: send failed"); + await db.insert(reminderRunTargets).values({ + runId, + groupId: target.groupId, + status: "failed", + error: (err as Error).message, + }); + allSent = false; + } + } + + const status = allSent ? "success" : anySent ? "partial" : "failed"; + await db + .update(reminderRuns) + .set({ status }) + .where(eq(reminderRuns.id, runId)); + + await writeAuditLog(db, { + operatorId: reminder.createdBy, + source: "system", + action: "reminder.fired", + targetType: "reminder", + targetId: reminder.id, + payload: { runId, status }, + }); + + logger.info({ reminderId: reminder.id, runId, status }, "fire-reminder: done"); +} diff --git a/apps/bot/src/scheduler/reminder-jobs.ts b/apps/bot/src/scheduler/reminder-jobs.ts index 2937954..61dbf98 100644 --- a/apps/bot/src/scheduler/reminder-jobs.ts +++ b/apps/bot/src/scheduler/reminder-jobs.ts @@ -1,7 +1,44 @@ import type { PgBoss } from "pg-boss"; import { logger } from "../logger.js"; +import { fireReminder, type FireReminderPayload } from "./fire-reminder.js"; -// Wired up properly in Task 5. Placeholder so index.ts can import. -export async function registerReminderJobs(_boss: PgBoss): Promise { - logger.debug("registerReminderJobs: placeholder (task 5 will fill in)"); +export const REMINDER_FIRE_QUEUE = "reminder.fire"; + +export async function registerReminderJobs(boss: PgBoss): Promise { + await boss.createQueue(REMINDER_FIRE_QUEUE); + await boss.work(REMINDER_FIRE_QUEUE, async (jobs) => { + const job = jobs[0]; + if (!job) return; + logger.debug({ jobId: job.id, payload: job.data }, "reminder.fire: handling"); + await fireReminder(job.data); + }); + logger.info("reminder.fire: handler registered"); +} + +export async function scheduleReminderFire( + boss: PgBoss, + reminderId: string, + scheduledAt: Date, +): Promise { + const id = await boss.send( + REMINDER_FIRE_QUEUE, + { reminderId }, + { + startAfter: scheduledAt, + retryLimit: 3, + retryDelay: 30, + retryBackoff: true, + // Use the reminderId as a singleton key so re-scheduling cancels the old job + singletonKey: `reminder:${reminderId}`, + }, + ); + logger.info({ reminderId, jobId: id, scheduledAt }, "reminder.fire: scheduled"); + return id; +} + +export async function cancelReminderFire(_boss: PgBoss, reminderId: string): Promise { + // Soft cancel: pg-boss doesn't expose a clean cancel-by-singleton API in v12. + // The scheduled job will still fire, but `fireReminder` exits early when the + // reminder row is gone. Hard cancel can be added later by storing the jobId. + logger.info({ reminderId }, "reminder.fire: cancel requested (soft, fizzles on fire)"); } diff --git a/packages/shared/src/rrule.ts b/packages/shared/src/rrule.ts index b10983d..1d53706 100644 --- a/packages/shared/src/rrule.ts +++ b/packages/shared/src/rrule.ts @@ -1,9 +1,13 @@ -import { RRule, rrulestr } from "rrule"; +// rrule@2.8.1 lacks a proper "exports" field, so named ESM imports fail at +// runtime with NodeNext resolution. Use the default import and destructure. +import rrulePkg from "rrule"; +import type { RRule as RRuleType } from "rrule"; +const { RRule, rrulestr } = rrulePkg as unknown as typeof import("rrule"); import { DateTime } from "luxon"; export const MIN_INTERVAL_MS = 5 * 60 * 1000; -export function parseRRule(rule: string): RRule { +export function parseRRule(rule: string): RRuleType { const parsed = rrulestr(rule); if (!(parsed instanceof RRule)) { throw new Error("Compound RRULE/RRSET not supported");