feat(scheduler): add fire-reminder handler + job registration

Also fix rrule default-import workaround so the shared package loads
correctly under NodeNext ESM resolution (rrule@2.8.1 has no exports field).
This commit is contained in:
yiekheng 2026-05-09 17:29:21 +08:00
parent 2ed436ef0e
commit 01eb5752ee
3 changed files with 178 additions and 5 deletions

View File

@ -0,0 +1,132 @@
import { eq } from "drizzle-orm";
import { reminderRuns, reminderRunTargets } from "@cmbot/db";
import { db } from "../db.js";
import { logger } from "../logger.js";
import { sessionManager } from "../whatsapp/session-manager.js";
import { sendTextToGroup, sendMediaToGroup } from "../whatsapp/sender.js";
import { absoluteMediaPath } from "@cmbot/shared";
import { env } from "../env.js";
import { writeAuditLog } from "../audit.js";
import { getReminderWithDetails } from "../reminders/crud.js";
export type FireReminderPayload = { reminderId: string };
export async function fireReminder(payload: FireReminderPayload): Promise<void> {
const reminder = await getReminderWithDetails(payload.reminderId);
if (!reminder) {
logger.warn({ reminderId: payload.reminderId }, "fire-reminder: reminder not found");
return;
}
if (reminder.status !== "active") {
logger.info({ reminderId: reminder.id, status: reminder.status }, "fire-reminder: skipping (not active)");
return;
}
const [run] = await db
.insert(reminderRuns)
.values({ reminderId: reminder.id, status: "pending" })
.returning({ id: reminderRuns.id });
const runId = run!.id;
const session = sessionManager.getSession(reminder.accountId);
if (!session) {
logger.warn({ reminderId: reminder.id }, "fire-reminder: account not connected");
for (const target of reminder.targets) {
await db.insert(reminderRunTargets).values({
runId,
groupId: target.groupId,
status: "skipped",
error: "account not connected",
});
}
await db
.update(reminderRuns)
.set({ status: "skipped", errorSummary: "account not connected" })
.where(eq(reminderRuns.id, runId));
return;
}
let allSent = true;
let anySent = false;
for (const target of reminder.targets) {
const group = await db.query.whatsappGroups.findFirst({
where: (g, { eq }) => eq(g.id, target.groupId),
});
if (!group) {
await db.insert(reminderRunTargets).values({
runId,
groupId: target.groupId,
status: "skipped",
error: "group missing from db",
});
allSent = false;
continue;
}
const start = Date.now();
try {
let lastMessageId: string | undefined;
for (const part of reminder.messages) {
if (part.kind === "text" && part.textContent) {
const r = await sendTextToGroup(session.socket, group.waGroupJid, part.textContent);
lastMessageId = r.messageId;
} else if (part.mediaId) {
const media = await db.query.mediaFiles.findFirst({
where: (m, { eq }) => eq(m.id, part.mediaId!),
});
if (!media) throw new Error(`media row missing: ${part.mediaId}`);
const filePath = absoluteMediaPath(media.storagePath, env.MEDIA_DIR);
// Map our DB kind ('media' or 'image'/'video'/'document') to sender kind.
// For now we infer from mime type since createReminder stores 'media'.
const senderKind: "image" | "video" | "document" =
media.mimeType.startsWith("image/")
? "image"
: media.mimeType.startsWith("video/")
? "video"
: "document";
const r = await sendMediaToGroup(session.socket, group.waGroupJid, senderKind, filePath, {
caption: part.textContent ?? undefined,
mimeType: media.mimeType,
filename: media.filenameOriginal,
});
lastMessageId = r.messageId;
}
// 1.5s jitter between message parts to stay under WA's rate limit
await new Promise((r) => setTimeout(r, 1500));
}
await db.insert(reminderRunTargets).values({
runId,
groupId: target.groupId,
status: "sent",
waMessageId: lastMessageId ?? null,
latencyMs: Date.now() - start,
});
anySent = true;
} catch (err) {
logger.error({ err, reminderId: reminder.id, groupId: target.groupId }, "fire-reminder: send failed");
await db.insert(reminderRunTargets).values({
runId,
groupId: target.groupId,
status: "failed",
error: (err as Error).message,
});
allSent = false;
}
}
const status = allSent ? "success" : anySent ? "partial" : "failed";
await db
.update(reminderRuns)
.set({ status })
.where(eq(reminderRuns.id, runId));
await writeAuditLog(db, {
operatorId: reminder.createdBy,
source: "system",
action: "reminder.fired",
targetType: "reminder",
targetId: reminder.id,
payload: { runId, status },
});
logger.info({ reminderId: reminder.id, runId, status }, "fire-reminder: done");
}

View File

@ -1,7 +1,44 @@
import type { PgBoss } from "pg-boss";
import { logger } from "../logger.js";
import { fireReminder, type FireReminderPayload } from "./fire-reminder.js";
// Wired up properly in Task 5. Placeholder so index.ts can import.
export async function registerReminderJobs(_boss: PgBoss): Promise<void> {
logger.debug("registerReminderJobs: placeholder (task 5 will fill in)");
export const REMINDER_FIRE_QUEUE = "reminder.fire";
export async function registerReminderJobs(boss: PgBoss): Promise<void> {
await boss.createQueue(REMINDER_FIRE_QUEUE);
await boss.work<FireReminderPayload>(REMINDER_FIRE_QUEUE, async (jobs) => {
const job = jobs[0];
if (!job) return;
logger.debug({ jobId: job.id, payload: job.data }, "reminder.fire: handling");
await fireReminder(job.data);
});
logger.info("reminder.fire: handler registered");
}
export async function scheduleReminderFire(
boss: PgBoss,
reminderId: string,
scheduledAt: Date,
): Promise<string | null> {
const id = await boss.send(
REMINDER_FIRE_QUEUE,
{ reminderId },
{
startAfter: scheduledAt,
retryLimit: 3,
retryDelay: 30,
retryBackoff: true,
// Use the reminderId as a singleton key so re-scheduling cancels the old job
singletonKey: `reminder:${reminderId}`,
},
);
logger.info({ reminderId, jobId: id, scheduledAt }, "reminder.fire: scheduled");
return id;
}
export async function cancelReminderFire(_boss: PgBoss, reminderId: string): Promise<void> {
// Soft cancel: pg-boss doesn't expose a clean cancel-by-singleton API in v12.
// The scheduled job will still fire, but `fireReminder` exits early when the
// reminder row is gone. Hard cancel can be added later by storing the jobId.
logger.info({ reminderId }, "reminder.fire: cancel requested (soft, fizzles on fire)");
}

View File

@ -1,9 +1,13 @@
import { RRule, rrulestr } from "rrule";
// rrule@2.8.1 lacks a proper "exports" field, so named ESM imports fail at
// runtime with NodeNext resolution. Use the default import and destructure.
import rrulePkg from "rrule";
import type { RRule as RRuleType } from "rrule";
const { RRule, rrulestr } = rrulePkg as unknown as typeof import("rrule");
import { DateTime } from "luxon";
export const MIN_INTERVAL_MS = 5 * 60 * 1000;
export function parseRRule(rule: string): RRule {
export function parseRRule(rule: string): RRuleType {
const parsed = rrulestr(rule);
if (!(parsed instanceof RRule)) {
throw new Error("Compound RRULE/RRSET not supported");