import type { PgBoss } from "pg-boss"; import { sql } from "drizzle-orm"; import { logger } from "../logger.js"; import { env } from "../env.js"; import { db } from "../db.js"; import { fireReminder, type FireReminderPayload } from "./fire-reminder.js"; export const REMINDER_FIRE_QUEUE = "reminder.fire"; export async function registerReminderJobs(boss: PgBoss): Promise { // 'standard' (the default) lets us enqueue a new fire even when an // older one for the same singletonKey is still 'created'. We need // that for the recurring/edit path: when a reminder is rescheduled, // scheduleReminderFire() first cancels the stale 'created' job for // this reminder and then sends a new one — under 'stately' the // SECOND send returns null (it dedupes against the first across // states), so a reschedule silently dropped the new fire and the // reminder never fired at the new time. Duplicate-fire safety is // covered at the handler level by the inner-mutex recent-run check // in fire-reminder.ts (see DUPLICATE_FIRE_WINDOW_MS), which catches // the microsecond-spaced send case 'stately' was supposed to guard. await boss.createQueue(REMINDER_FIRE_QUEUE, { policy: "standard" }); // pg-boss v12's createQueue is idempotent and DOES NOT update the // policy on an existing queue row. Earlier deployments forced // policy='stately' here, which broke reschedules. Force-flip back to // 'standard' on every boot so an old queue row doesn't strand us. try { await db.execute( sql`UPDATE pgboss.queue SET policy = 'standard' WHERE name = ${REMINDER_FIRE_QUEUE} AND policy <> 'standard'`, ); } catch (err) { logger.warn( { err }, "reminder.fire: failed to force queue policy=standard (handler-level dedupe still applies)", ); } await boss.work( REMINDER_FIRE_QUEUE, { // Up to BOT_FIRE_CONCURRENCY workers per node, each polling and // processing independently. Combined with the per-account mutex // inside fireReminder, this lets reminders on DIFFERENT accounts // run in parallel while same-account reminders take turns. localConcurrency: env.BOT_FIRE_CONCURRENCY, }, async (jobs) => { const job = jobs[0]; if (!job) return; logger.debug({ jobId: job.id, payload: job.data }, "reminder.fire: handling"); await fireReminder(job.data); }, ); logger.info( { localConcurrency: env.BOT_FIRE_CONCURRENCY }, "reminder.fire: handler registered", ); } export async function scheduleReminderFire( boss: PgBoss, reminderId: string, scheduledAt: Date, ): Promise { const singletonKey = `reminder:${reminderId}`; // Replace-then-send. Any 'created' (i.e. not yet started) job for // this reminder is the stale next-fire from the previous schedule // attempt; nuke it so the new schedule wins. Active/completed jobs // are left alone — those represent in-flight or already-fired runs // and the handler-level dedupe handles overlap. try { const cancelled = await db.execute( sql`UPDATE pgboss.job SET state = 'cancelled', completed_on = now() WHERE name = ${REMINDER_FIRE_QUEUE} AND singleton_key = ${singletonKey} AND state = 'created' RETURNING id`, ); if (cancelled.rows.length > 0) { logger.info( { reminderId, cancelled: cancelled.rows.length }, "reminder.fire: cancelled stale created jobs before reschedule", ); } } catch (err) { // If the cancellation step fails, log but still try to send. Worst // case we end up with two created jobs and the handler-level // recent-run dedupe drops the duplicate fire. logger.warn({ err, reminderId }, "reminder.fire: pre-send cancel failed"); } const id = await boss.send( REMINDER_FIRE_QUEUE, { reminderId }, { startAfter: scheduledAt, retryLimit: 3, retryDelay: 30, retryBackoff: true, // Singleton key kept on the job row for diagnostics + the // pre-send cancel above, even though 'standard' policy doesn't // dedupe by it. singletonKey, }, ); logger.info({ reminderId, jobId: id, scheduledAt }, "reminder.fire: scheduled"); return id; } /** * Re-enqueue a paused run so fire-reminder picks up the still-pending * targets. Different singleton key from scheduleReminderFire so the * resume doesn't clobber the next-occurrence scheduled job and vice * versa. */ export async function enqueueReminderResume( boss: PgBoss, reminderId: string, runId: string, ): Promise { const id = await boss.send( REMINDER_FIRE_QUEUE, { reminderId, runId }, { retryLimit: 3, retryDelay: 30, retryBackoff: true, singletonKey: `reminder:resume:${runId}`, }, ); logger.info({ reminderId, runId, jobId: id }, "reminder.fire: resume enqueued"); return id; } export async function cancelReminderFire(_boss: PgBoss, reminderId: string): Promise { // Soft cancel: pg-boss doesn't expose a clean cancel-by-singleton API in v12. // The scheduled job will still fire, but `fireReminder` exits early when the // reminder row is gone. Hard cancel can be added later by storing the jobId. logger.info({ reminderId }, "reminder.fire: cancel requested (soft, fizzles on fire)"); }