End-state of plan 2: operator can schedule one-off reminders via the Telegram menu wizard, attach text + optional media (photo/video/doc), and the bot fires them on time to a chosen group. Failed sends retry with backoff. Run history captured in the DB. Out of scope (deferred to follow-ups): - Recurring reminders (RRULE) - Multi-group / multi-part messages beyond text+1 media - Run history view in menu - Web dashboard (plan 3) 9 tasks covering pg-boss client, reminder CRUD helpers, sender refactor (media), Telegram media ingest, fire-reminder handler, wizard state, menu views, callback wiring, and end-to-end verification.
1629 lines
52 KiB
Markdown
1629 lines
52 KiB
Markdown
# Reminder Scheduling & Sending — Implementation Plan
|
||
|
||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||
|
||
**Goal:** End-state — operator can create one-off scheduled reminders via the Telegram menu, attach a text body and optional media (photo/video/document), and the bot fires them on time to the chosen group. Failed sends retry with backoff. Run history is browsable in the menu.
|
||
|
||
**Architecture:** Reuse the existing `bot` service. Add `pg-boss` (Postgres-native job queue) for delayed scheduling. Extend `sender.ts` to send media. Extend the Telegram menu with a Reminders branch (list, new, detail). Media uploaded via Telegram is downloaded once, hashed, and stored on the shared volume at `/data/media/{yyyy/mm}/{uuid}.{ext}` with a `media_files` row.
|
||
|
||
**Tech stack additions:** `pg-boss` (Postgres job queue), `node-fetch` already pulled by Baileys (or use built-in `fetch`).
|
||
|
||
**Out of scope for plan 2** (deferred to plan 2.5 or plan 3):
|
||
- Recurring reminders (RRULE engine integration). Schema is already there; the wizard and pg-boss `repeatEvery` wiring will land in a follow-up.
|
||
- Multi-group targets per reminder (DB supports it; menu only allows one for now).
|
||
- Multi-part messages beyond text + one media (DB supports `position`-ordered parts; UI will only build single text or text+media for now).
|
||
- Web dashboard (plan 3).
|
||
|
||
**Spec reference:** `docs/superpowers/specs/2026-05-03-whatsapp-bot-design.md` §11 (reminder execution flow), §12 (error handling).
|
||
|
||
---
|
||
|
||
## File structure produced by this plan
|
||
|
||
```
|
||
apps/bot/src/
|
||
├── scheduler/
|
||
│ ├── pgboss-client.ts init + graceful shutdown
|
||
│ ├── reminder-jobs.ts register handler, schedule/upsert/cancel
|
||
│ └── fire-reminder.ts job handler — load, send, record
|
||
├── reminders/
|
||
│ ├── crud.ts create/get/list/delete reminder helpers
|
||
│ └── time-parsing.ts parseDueAt for quick-picks and free text
|
||
├── media/
|
||
│ └── ingest.ts download Telegram media → /data/media + DB row
|
||
├── whatsapp/
|
||
│ └── sender.ts (extended) sendTextToGroup, sendMediaToGroup
|
||
└── telegram/
|
||
├── menus.ts (extended) reminders list, detail, wizard views
|
||
├── callbacks.ts (extended) reminder callbacks
|
||
├── commands/
|
||
│ └── reminders.ts /reminders command
|
||
└── state.ts (extended) wizard state
|
||
```
|
||
|
||
---
|
||
|
||
## Task 1: Add pg-boss dependency, create client + lifecycle
|
||
|
||
**Files:**
|
||
- Modify: `apps/bot/package.json` — add `"pg-boss": "^10.1.5"`
|
||
- Create: `apps/bot/src/scheduler/pgboss-client.ts`
|
||
- Modify: `apps/bot/src/index.ts` — start/stop pg-boss
|
||
|
||
- [ ] **Step 1: Add pg-boss to apps/bot/package.json deps and run install**
|
||
|
||
```bash
|
||
NO_SUDO=1 scripts/dev.sh pnpm add pg-boss --filter @cmbot/bot
|
||
```
|
||
|
||
Expected: pg-boss 10.x added under apps/bot/package.json deps. pnpm-lock.yaml updated.
|
||
|
||
- [ ] **Step 2: Create `apps/bot/src/scheduler/pgboss-client.ts`**
|
||
|
||
```typescript
|
||
import PgBoss from "pg-boss";
|
||
import { env } from "../env.js";
|
||
import { logger } from "../logger.js";
|
||
|
||
let boss: PgBoss | null = null;
|
||
|
||
export async function startBoss(): Promise<PgBoss> {
|
||
if (boss) return boss;
|
||
const instance = new PgBoss({
|
||
connectionString: env.DATABASE_URL,
|
||
schema: "pgboss",
|
||
retryLimit: 3,
|
||
retryDelay: 30,
|
||
retryBackoff: true,
|
||
});
|
||
instance.on("error", (err) => logger.error({ err }, "pg-boss: error"));
|
||
await instance.start();
|
||
boss = instance;
|
||
logger.info("pg-boss started");
|
||
return instance;
|
||
}
|
||
|
||
export async function stopBoss(): Promise<void> {
|
||
if (!boss) return;
|
||
await boss.stop({ graceful: true, timeout: 5000 });
|
||
boss = null;
|
||
logger.info("pg-boss stopped");
|
||
}
|
||
|
||
export function getBoss(): PgBoss {
|
||
if (!boss) throw new Error("pg-boss not started");
|
||
return boss;
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 3: Modify `apps/bot/src/index.ts` to start/stop pg-boss**
|
||
|
||
Replace contents:
|
||
|
||
```typescript
|
||
import { logger } from "./logger.js";
|
||
import { pool } from "./db.js";
|
||
import { startHealthServer, setSessionCountsProvider } from "./health.js";
|
||
import { createTelegramBot } from "./telegram/bot.js";
|
||
import { sessionManager } from "./whatsapp/session-manager.js";
|
||
import { startBoss, stopBoss } from "./scheduler/pgboss-client.js";
|
||
import { registerReminderJobs } from "./scheduler/reminder-jobs.js";
|
||
|
||
async function main(): Promise<void> {
|
||
logger.info("bot starting");
|
||
const health = startHealthServer();
|
||
setSessionCountsProvider(() => sessionManager.getCounts());
|
||
|
||
const boss = await startBoss();
|
||
await registerReminderJobs(boss);
|
||
|
||
const tg = createTelegramBot();
|
||
void tg.start({
|
||
onStart: (info) => logger.info({ username: info.username }, "telegram polling started"),
|
||
drop_pending_updates: true,
|
||
});
|
||
|
||
await sessionManager.resumeFromDb();
|
||
|
||
const shutdown = async (signal: string): Promise<void> => {
|
||
logger.info({ signal }, "shutting down");
|
||
await tg.stop();
|
||
await sessionManager.stopAll();
|
||
await stopBoss();
|
||
health.close();
|
||
await pool.end();
|
||
process.exit(0);
|
||
};
|
||
|
||
process.on("SIGINT", () => void shutdown("SIGINT"));
|
||
process.on("SIGTERM", () => void shutdown("SIGTERM"));
|
||
|
||
logger.info("bot ready");
|
||
}
|
||
|
||
main().catch((err) => {
|
||
logger.fatal({ err }, "bot failed to start");
|
||
process.exit(1);
|
||
});
|
||
```
|
||
|
||
NOTE: `registerReminderJobs` doesn't exist yet — Task 5 creates it. Either put a temporary stub at the import site for this commit, or commit Task 1 + Task 5 together. The simpler option: add a placeholder file `apps/bot/src/scheduler/reminder-jobs.ts` exporting a no-op `registerReminderJobs(_boss)` that Task 5 will fill in.
|
||
|
||
- [ ] **Step 4: Create placeholder `apps/bot/src/scheduler/reminder-jobs.ts`**
|
||
|
||
```typescript
|
||
import type PgBoss from "pg-boss";
|
||
import { logger } from "../logger.js";
|
||
|
||
// Wired up properly in Task 5. Placeholder so index.ts can import.
|
||
export async function registerReminderJobs(_boss: PgBoss): Promise<void> {
|
||
logger.debug("registerReminderJobs: placeholder (task 5 will fill in)");
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 5: Typecheck and restart**
|
||
|
||
```bash
|
||
NO_SUDO=1 scripts/dev.sh pnpm --filter @cmbot/bot typecheck
|
||
NO_SUDO=1 scripts/dev.sh restart-bot
|
||
```
|
||
|
||
Verify in logs: `pg-boss started`, no errors. The pgboss schema gets created on first run; verify with:
|
||
|
||
```bash
|
||
NO_SUDO=1 scripts/dev.sh exec sh -c 'cd packages/db && pnpm exec tsx -e "
|
||
(async () => {
|
||
const { Pool } = await import(\"pg\");
|
||
const p = new Pool({ connectionString: process.env.DATABASE_URL });
|
||
const r = await p.query(\"SELECT count(*) FROM information_schema.tables WHERE table_schema=\\\$1\", [\"pgboss\"]);
|
||
console.log(\"pgboss tables:\", r.rows[0].count);
|
||
await p.end();
|
||
})();
|
||
"'
|
||
```
|
||
|
||
Expected: 5+ pgboss tables (job, schedule, archive, version, …).
|
||
|
||
- [ ] **Step 6: Commit**
|
||
|
||
```bash
|
||
git add apps/bot pnpm-lock.yaml
|
||
git -c commit.gpgsign=false commit -m "feat(scheduler): add pg-boss client + lifecycle"
|
||
```
|
||
|
||
---
|
||
|
||
## Task 2: Reminder CRUD helpers
|
||
|
||
**Files:**
|
||
- Create: `apps/bot/src/reminders/crud.ts`
|
||
- Create: `apps/bot/src/reminders/time-parsing.ts`
|
||
- Create: `apps/bot/src/reminders/time-parsing.test.ts`
|
||
|
||
- [ ] **Step 1: Create `apps/bot/src/reminders/time-parsing.ts`**
|
||
|
||
```typescript
|
||
import { DateTime } from "luxon";
|
||
import { DEFAULT_TIMEZONE } from "@cmbot/shared";
|
||
|
||
export type Quick = "now" | "in_1h" | "in_3h" | "tomorrow_9am" | "next_mon_9am";
|
||
|
||
export function quickToDate(quick: Quick, timezone: string = DEFAULT_TIMEZONE): Date {
|
||
const now = DateTime.now().setZone(timezone);
|
||
switch (quick) {
|
||
case "now":
|
||
return now.plus({ seconds: 30 }).toJSDate();
|
||
case "in_1h":
|
||
return now.plus({ hours: 1 }).toJSDate();
|
||
case "in_3h":
|
||
return now.plus({ hours: 3 }).toJSDate();
|
||
case "tomorrow_9am":
|
||
return now.plus({ days: 1 }).set({ hour: 9, minute: 0, second: 0, millisecond: 0 }).toJSDate();
|
||
case "next_mon_9am": {
|
||
const dow = now.weekday; // 1 = Mon
|
||
const daysUntilMon = ((1 - dow + 7) % 7) || 7;
|
||
return now.plus({ days: daysUntilMon }).set({ hour: 9, minute: 0, second: 0, millisecond: 0 }).toJSDate();
|
||
}
|
||
}
|
||
}
|
||
|
||
export type ParseResult = { ok: true; date: Date } | { ok: false; reason: string };
|
||
|
||
const FORMATS = [
|
||
"yyyy-MM-dd HH:mm",
|
||
"yyyy-MM-dd HH:mm:ss",
|
||
"yyyy/MM/dd HH:mm",
|
||
"dd/MM/yyyy HH:mm",
|
||
"dd-MM-yyyy HH:mm",
|
||
];
|
||
|
||
export function parseFreeText(input: string, timezone: string = DEFAULT_TIMEZONE): ParseResult {
|
||
const trimmed = input.trim();
|
||
for (const fmt of FORMATS) {
|
||
const dt = DateTime.fromFormat(trimmed, fmt, { zone: timezone });
|
||
if (dt.isValid) {
|
||
const jsDate = dt.toJSDate();
|
||
if (jsDate.getTime() <= Date.now()) {
|
||
return { ok: false, reason: "Time is in the past" };
|
||
}
|
||
return { ok: true, date: jsDate };
|
||
}
|
||
}
|
||
return { ok: false, reason: "Couldn't parse — try YYYY-MM-DD HH:MM (e.g. 2026-05-15 09:00)" };
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Create `apps/bot/src/reminders/time-parsing.test.ts`**
|
||
|
||
```typescript
|
||
import { describe, it, expect } from "vitest";
|
||
import { quickToDate, parseFreeText } from "./time-parsing.js";
|
||
|
||
describe("quickToDate", () => {
|
||
it("in_1h returns ~1h ahead", () => {
|
||
const d = quickToDate("in_1h");
|
||
const diffMs = d.getTime() - Date.now();
|
||
expect(diffMs).toBeGreaterThan(55 * 60 * 1000);
|
||
expect(diffMs).toBeLessThan(65 * 60 * 1000);
|
||
});
|
||
it("now returns ~30s ahead", () => {
|
||
const d = quickToDate("now");
|
||
expect(d.getTime() - Date.now()).toBeGreaterThan(20 * 1000);
|
||
});
|
||
});
|
||
|
||
describe("parseFreeText", () => {
|
||
it("accepts YYYY-MM-DD HH:MM", () => {
|
||
const r = parseFreeText("2099-12-31 23:59");
|
||
expect(r.ok).toBe(true);
|
||
});
|
||
it("rejects past times", () => {
|
||
const r = parseFreeText("2020-01-01 00:00");
|
||
expect(r.ok).toBe(false);
|
||
if (!r.ok) expect(r.reason).toMatch(/past/i);
|
||
});
|
||
it("rejects garbage", () => {
|
||
const r = parseFreeText("not a date");
|
||
expect(r.ok).toBe(false);
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 3: Run tests (fail → pass cycle)**
|
||
|
||
```bash
|
||
NO_SUDO=1 scripts/dev.sh pnpm --filter @cmbot/bot test
|
||
```
|
||
|
||
After step 1 the test should pass since the implementation is in step 1. Confirm: 18 tests pass total (3 new + 15 existing).
|
||
|
||
- [ ] **Step 4: Create `apps/bot/src/reminders/crud.ts`**
|
||
|
||
```typescript
|
||
import { eq, desc } from "drizzle-orm";
|
||
import {
|
||
reminders,
|
||
reminderMessages,
|
||
reminderTargets,
|
||
type Reminder,
|
||
} from "@cmbot/db";
|
||
import { db } from "../db.js";
|
||
import { DEFAULT_TIMEZONE } from "@cmbot/shared";
|
||
|
||
export type CreateReminderInput = {
|
||
accountId: string;
|
||
groupId: string;
|
||
name: string;
|
||
scheduledAt: Date;
|
||
text?: string | null;
|
||
mediaId?: string | null;
|
||
caption?: string | null;
|
||
createdBy: string;
|
||
timezone?: string;
|
||
};
|
||
|
||
export type ReminderWithDetails = Reminder & {
|
||
targets: { groupId: string }[];
|
||
messages: { id: string; position: number; kind: string; textContent: string | null; mediaId: string | null }[];
|
||
};
|
||
|
||
export async function createReminder(input: CreateReminderInput): Promise<string> {
|
||
return await db.transaction(async (tx) => {
|
||
const [rem] = await tx
|
||
.insert(reminders)
|
||
.values({
|
||
accountId: input.accountId,
|
||
name: input.name,
|
||
scheduleKind: "one_off",
|
||
scheduledAt: input.scheduledAt,
|
||
timezone: input.timezone ?? DEFAULT_TIMEZONE,
|
||
status: "active",
|
||
createdBy: input.createdBy,
|
||
})
|
||
.returning({ id: reminders.id });
|
||
|
||
await tx.insert(reminderTargets).values({
|
||
reminderId: rem!.id,
|
||
groupId: input.groupId,
|
||
position: 0,
|
||
});
|
||
|
||
let position = 0;
|
||
if (input.text && !input.mediaId) {
|
||
await tx.insert(reminderMessages).values({
|
||
reminderId: rem!.id,
|
||
position: position++,
|
||
kind: "text",
|
||
textContent: input.text,
|
||
mediaId: null,
|
||
});
|
||
} else if (input.mediaId) {
|
||
await tx.insert(reminderMessages).values({
|
||
reminderId: rem!.id,
|
||
position: position++,
|
||
kind: "media",
|
||
textContent: input.caption ?? input.text ?? null,
|
||
mediaId: input.mediaId,
|
||
});
|
||
}
|
||
|
||
return rem!.id;
|
||
});
|
||
}
|
||
|
||
export async function getReminderWithDetails(id: string): Promise<ReminderWithDetails | null> {
|
||
const rem = await db.query.reminders.findFirst({
|
||
where: (r, { eq }) => eq(r.id, id),
|
||
});
|
||
if (!rem) return null;
|
||
const targets = await db.query.reminderTargets.findMany({
|
||
where: (t, { eq }) => eq(t.reminderId, id),
|
||
});
|
||
const messages = await db.query.reminderMessages.findMany({
|
||
where: (m, { eq }) => eq(m.reminderId, id),
|
||
orderBy: (m, { asc }) => [asc(m.position)],
|
||
});
|
||
return { ...rem, targets, messages };
|
||
}
|
||
|
||
export async function listRemindersForOperator(
|
||
operatorId: string,
|
||
limit = 50,
|
||
): Promise<(Reminder & { accountLabel: string; groupCount: number })[]> {
|
||
// Hand-rolled SQL for the join — drizzle's relational query doesn't easily
|
||
// express "all reminders for accounts owned by this operator".
|
||
const rows = await db.execute(/* sql */ `
|
||
SELECT
|
||
r.*,
|
||
wa.label as account_label,
|
||
(SELECT count(*) FROM reminder_targets rt WHERE rt.reminder_id = r.id) as group_count
|
||
FROM reminders r
|
||
JOIN whatsapp_accounts wa ON wa.id = r.account_id
|
||
WHERE wa.operator_id = '${operatorId}'
|
||
ORDER BY r.scheduled_at DESC NULLS LAST, r.created_at DESC
|
||
LIMIT ${limit}
|
||
`);
|
||
return rows.rows as never;
|
||
}
|
||
|
||
export async function deleteReminder(id: string): Promise<void> {
|
||
await db.delete(reminders).where(eq(reminders.id, id));
|
||
}
|
||
```
|
||
|
||
NOTE: the raw SQL string in `listRemindersForOperator` does NOT escape `operatorId`. Since `operatorId` is a UUID we control (looked up server-side), this is safe in this codebase. If you ever feed user-input into it, switch to parameterized SQL using drizzle's `sql\`\`` template tag.
|
||
|
||
- [ ] **Step 5: Typecheck and commit**
|
||
|
||
```bash
|
||
NO_SUDO=1 scripts/dev.sh pnpm --filter @cmbot/bot typecheck
|
||
git add apps/bot/src/reminders
|
||
git -c commit.gpgsign=false commit -m "feat(reminders): add time-parsing + CRUD helpers"
|
||
```
|
||
|
||
---
|
||
|
||
## Task 3: Sender refactor — add media support
|
||
|
||
**Files:**
|
||
- Modify: `apps/bot/src/whatsapp/sender.ts`
|
||
|
||
- [ ] **Step 1: Replace `apps/bot/src/whatsapp/sender.ts`** with the extended version
|
||
|
||
```typescript
|
||
import { readFile, stat } from "node:fs/promises";
|
||
import type { WASocket, AnyMessageContent } from "@whiskeysockets/baileys";
|
||
import pino from "pino";
|
||
|
||
const logger = pino({ name: "sender" });
|
||
|
||
type SocketWithAssertSessions = WASocket & {
|
||
assertSessions?: (jids: string[], force: boolean) => Promise<boolean>;
|
||
};
|
||
|
||
const CHUNK_SIZE = 5;
|
||
|
||
async function chunked<T>(items: T[], size: number): Promise<T[][]> {
|
||
const out: T[][] = [];
|
||
for (let i = 0; i < items.length; i += size) out.push(items.slice(i, i + size));
|
||
return out;
|
||
}
|
||
|
||
async function ensureSessionsForGroup(
|
||
socket: WASocket,
|
||
groupJid: string,
|
||
): Promise<{ ok: number; failed: number; total: number }> {
|
||
const metadata = await socket.groupMetadata(groupJid);
|
||
const participantJids = metadata.participants.map((p) => p.id);
|
||
const internal = socket as SocketWithAssertSessions;
|
||
if (typeof internal.assertSessions !== "function") {
|
||
return { ok: 0, failed: 0, total: participantJids.length };
|
||
}
|
||
let ok = 0;
|
||
let failed = 0;
|
||
for (const chunk of await chunked(participantJids, CHUNK_SIZE)) {
|
||
try {
|
||
await internal.assertSessions(chunk, true);
|
||
ok += chunk.length;
|
||
} catch (err) {
|
||
failed += chunk.length;
|
||
logger.warn({ groupJid, err: (err as Error).message }, "assertSessions chunk failed");
|
||
}
|
||
}
|
||
return { ok, failed, total: participantJids.length };
|
||
}
|
||
|
||
async function sendWithRetry(
|
||
socket: WASocket,
|
||
groupJid: string,
|
||
content: AnyMessageContent,
|
||
): Promise<{ messageId: string | undefined }> {
|
||
await ensureSessionsForGroup(socket, groupJid);
|
||
try {
|
||
const result = await socket.sendMessage(groupJid, content);
|
||
return { messageId: result?.key?.id ?? undefined };
|
||
} catch (err) {
|
||
const message = (err as Error)?.message ?? "";
|
||
if (message.includes("No sessions")) {
|
||
await new Promise((r) => setTimeout(r, 2000));
|
||
await ensureSessionsForGroup(socket, groupJid);
|
||
const result = await socket.sendMessage(groupJid, content);
|
||
return { messageId: result?.key?.id ?? undefined };
|
||
}
|
||
throw err;
|
||
}
|
||
}
|
||
|
||
export async function sendTextToGroup(
|
||
socket: WASocket,
|
||
groupJid: string,
|
||
text: string,
|
||
): Promise<{ messageId: string | undefined }> {
|
||
return sendWithRetry(socket, groupJid, { text });
|
||
}
|
||
|
||
export type MediaKind = "image" | "video" | "document";
|
||
|
||
export async function sendMediaToGroup(
|
||
socket: WASocket,
|
||
groupJid: string,
|
||
kind: MediaKind,
|
||
filePath: string,
|
||
options: { caption?: string; mimeType?: string; filename?: string } = {},
|
||
): Promise<{ messageId: string | undefined }> {
|
||
// Validate the file exists and read into a buffer. For very large files
|
||
// (>50MB) Baileys also accepts a stream, but for our reminder use case
|
||
// files are typically <30MB which fits comfortably in memory.
|
||
await stat(filePath);
|
||
const buffer = await readFile(filePath);
|
||
|
||
const content: AnyMessageContent =
|
||
kind === "image"
|
||
? { image: buffer, caption: options.caption, mimetype: options.mimeType }
|
||
: kind === "video"
|
||
? { video: buffer, caption: options.caption, mimetype: options.mimeType }
|
||
: {
|
||
document: buffer,
|
||
caption: options.caption,
|
||
fileName: options.filename ?? "file",
|
||
mimetype: options.mimeType ?? "application/octet-stream",
|
||
};
|
||
|
||
return sendWithRetry(socket, groupJid, content);
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Typecheck and commit**
|
||
|
||
```bash
|
||
NO_SUDO=1 scripts/dev.sh pnpm --filter @cmbot/bot typecheck
|
||
git add apps/bot/src/whatsapp/sender.ts
|
||
git -c commit.gpgsign=false commit -m "feat(bot): extend sender with image/video/document support"
|
||
```
|
||
|
||
---
|
||
|
||
## Task 4: Media ingest from Telegram
|
||
|
||
**Files:**
|
||
- Create: `apps/bot/src/media/ingest.ts`
|
||
|
||
- [ ] **Step 1: Create `apps/bot/src/media/ingest.ts`**
|
||
|
||
```typescript
|
||
import { mkdir, writeFile } from "node:fs/promises";
|
||
import { dirname } from "node:path";
|
||
import { createHash } from "node:crypto";
|
||
import { mediaFiles } from "@cmbot/db";
|
||
import { newMediaPath, absoluteMediaPath } from "@cmbot/shared";
|
||
import { db } from "../db.js";
|
||
import { env } from "../env.js";
|
||
import { logger } from "../logger.js";
|
||
|
||
export type IngestInput = {
|
||
operatorId: string;
|
||
filenameOriginal: string;
|
||
mimeType: string;
|
||
buffer: Buffer;
|
||
};
|
||
|
||
export type IngestResult = {
|
||
mediaId: string;
|
||
storagePath: string;
|
||
};
|
||
|
||
export async function ingestMediaBuffer(input: IngestInput): Promise<IngestResult> {
|
||
const sha256 = createHash("sha256").update(input.buffer).digest("hex");
|
||
const storagePath = newMediaPath(input.filenameOriginal);
|
||
const absolute = absoluteMediaPath(storagePath, env.MEDIA_DIR);
|
||
await mkdir(dirname(absolute), { recursive: true });
|
||
await writeFile(absolute, input.buffer);
|
||
|
||
const [row] = await db
|
||
.insert(mediaFiles)
|
||
.values({
|
||
operatorId: input.operatorId,
|
||
filenameOriginal: input.filenameOriginal,
|
||
mimeType: input.mimeType,
|
||
sizeBytes: input.buffer.byteLength,
|
||
sha256,
|
||
storagePath,
|
||
})
|
||
.returning({ id: mediaFiles.id });
|
||
|
||
logger.info(
|
||
{ mediaId: row!.id, sizeBytes: input.buffer.byteLength, sha256 },
|
||
"media: ingested",
|
||
);
|
||
|
||
return { mediaId: row!.id, storagePath };
|
||
}
|
||
|
||
/**
|
||
* Download a Telegram file by file_id and ingest it. Returns the new media row.
|
||
*/
|
||
export async function ingestTelegramFile(
|
||
operatorId: string,
|
||
apiBase: string,
|
||
botToken: string,
|
||
fileId: string,
|
||
defaultFilename: string,
|
||
mimeType: string,
|
||
): Promise<IngestResult> {
|
||
// 1. getFile — Telegram returns a file_path
|
||
const getFileUrl = `${apiBase}/bot${botToken}/getFile?file_id=${encodeURIComponent(fileId)}`;
|
||
const getFileRes = await fetch(getFileUrl);
|
||
if (!getFileRes.ok) {
|
||
throw new Error(`Telegram getFile failed: ${getFileRes.status} ${getFileRes.statusText}`);
|
||
}
|
||
const getFileJson = (await getFileRes.json()) as {
|
||
ok: boolean;
|
||
result?: { file_path?: string };
|
||
};
|
||
if (!getFileJson.ok || !getFileJson.result?.file_path) {
|
||
throw new Error("Telegram getFile: missing file_path in response");
|
||
}
|
||
// 2. Download bytes
|
||
const downloadUrl = `${apiBase}/file/bot${botToken}/${getFileJson.result.file_path}`;
|
||
const dl = await fetch(downloadUrl);
|
||
if (!dl.ok) {
|
||
throw new Error(`Telegram file download failed: ${dl.status} ${dl.statusText}`);
|
||
}
|
||
const buffer = Buffer.from(await dl.arrayBuffer());
|
||
|
||
// The Telegram-side filename can be missing; fall back to defaultFilename.
|
||
const filename = getFileJson.result.file_path.split("/").pop() ?? defaultFilename;
|
||
return ingestMediaBuffer({
|
||
operatorId,
|
||
filenameOriginal: filename,
|
||
mimeType,
|
||
buffer,
|
||
});
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Typecheck and commit**
|
||
|
||
```bash
|
||
NO_SUDO=1 scripts/dev.sh pnpm --filter @cmbot/bot typecheck
|
||
git add apps/bot/src/media
|
||
git -c commit.gpgsign=false commit -m "feat(bot): add Telegram media ingest into /data/media"
|
||
```
|
||
|
||
---
|
||
|
||
## Task 5: Fire-reminder handler + reminder-jobs registration
|
||
|
||
**Files:**
|
||
- Modify: `apps/bot/src/scheduler/reminder-jobs.ts` (replace placeholder)
|
||
- Create: `apps/bot/src/scheduler/fire-reminder.ts`
|
||
|
||
- [ ] **Step 1: Create `apps/bot/src/scheduler/fire-reminder.ts`**
|
||
|
||
```typescript
|
||
import { reminderRuns, reminderRunTargets, whatsappGroups, mediaFiles } from "@cmbot/db";
|
||
import { db } from "../db.js";
|
||
import { logger } from "../logger.js";
|
||
import { sessionManager } from "../whatsapp/session-manager.js";
|
||
import { sendTextToGroup, sendMediaToGroup } from "../whatsapp/sender.js";
|
||
import { absoluteMediaPath } from "@cmbot/shared";
|
||
import { env } from "../env.js";
|
||
import { writeAuditLog } from "../audit.js";
|
||
import { getReminderWithDetails } from "../reminders/crud.js";
|
||
import { eq } from "drizzle-orm";
|
||
|
||
export type FireReminderPayload = { reminderId: string };
|
||
|
||
export async function fireReminder(payload: FireReminderPayload): Promise<void> {
|
||
const reminder = await getReminderWithDetails(payload.reminderId);
|
||
if (!reminder) {
|
||
logger.warn({ reminderId: payload.reminderId }, "fire-reminder: reminder not found");
|
||
return;
|
||
}
|
||
if (reminder.status !== "active") {
|
||
logger.info({ reminderId: reminder.id, status: reminder.status }, "fire-reminder: skipping (not active)");
|
||
return;
|
||
}
|
||
|
||
const [run] = await db
|
||
.insert(reminderRuns)
|
||
.values({ reminderId: reminder.id, status: "pending" })
|
||
.returning({ id: reminderRuns.id });
|
||
const runId = run!.id;
|
||
|
||
const session = sessionManager.getSession(reminder.accountId);
|
||
if (!session) {
|
||
logger.warn({ reminderId: reminder.id }, "fire-reminder: account not connected");
|
||
for (const target of reminder.targets) {
|
||
await db.insert(reminderRunTargets).values({
|
||
runId,
|
||
groupId: target.groupId,
|
||
status: "skipped",
|
||
error: "account not connected",
|
||
});
|
||
}
|
||
await db
|
||
.update(reminderRuns)
|
||
.set({ status: "skipped", errorSummary: "account not connected" })
|
||
.where(eq(reminderRuns.id, runId));
|
||
return;
|
||
}
|
||
|
||
let allSent = true;
|
||
let anySent = false;
|
||
for (const target of reminder.targets) {
|
||
const group = await db.query.whatsappGroups.findFirst({
|
||
where: (g, { eq }) => eq(g.id, target.groupId),
|
||
});
|
||
if (!group) {
|
||
await db.insert(reminderRunTargets).values({
|
||
runId,
|
||
groupId: target.groupId,
|
||
status: "skipped",
|
||
error: "group missing from db",
|
||
});
|
||
allSent = false;
|
||
continue;
|
||
}
|
||
const start = Date.now();
|
||
try {
|
||
let lastMessageId: string | undefined;
|
||
for (const part of reminder.messages) {
|
||
if (part.kind === "text" && part.textContent) {
|
||
const r = await sendTextToGroup(session.socket, group.waGroupJid, part.textContent);
|
||
lastMessageId = r.messageId;
|
||
} else if (part.mediaId) {
|
||
const media = await db.query.mediaFiles.findFirst({
|
||
where: (m, { eq }) => eq(m.id, part.mediaId!),
|
||
});
|
||
if (!media) throw new Error(`media row missing: ${part.mediaId}`);
|
||
const filePath = absoluteMediaPath(media.storagePath, env.MEDIA_DIR);
|
||
const kind: "image" | "video" | "document" =
|
||
part.kind === "image" ? "image" : part.kind === "video" ? "video" : "document";
|
||
const r = await sendMediaToGroup(session.socket, group.waGroupJid, kind, filePath, {
|
||
caption: part.textContent ?? undefined,
|
||
mimeType: media.mimeType,
|
||
filename: media.filenameOriginal,
|
||
});
|
||
lastMessageId = r.messageId;
|
||
}
|
||
// 1.5s jitter between message parts to stay under WA's rate limit
|
||
await new Promise((r) => setTimeout(r, 1500));
|
||
}
|
||
await db.insert(reminderRunTargets).values({
|
||
runId,
|
||
groupId: target.groupId,
|
||
status: "sent",
|
||
waMessageId: lastMessageId ?? null,
|
||
latencyMs: Date.now() - start,
|
||
});
|
||
anySent = true;
|
||
} catch (err) {
|
||
logger.error({ err, reminderId: reminder.id, groupId: target.groupId }, "fire-reminder: send failed");
|
||
await db.insert(reminderRunTargets).values({
|
||
runId,
|
||
groupId: target.groupId,
|
||
status: "failed",
|
||
error: (err as Error).message,
|
||
});
|
||
allSent = false;
|
||
}
|
||
}
|
||
|
||
const status = allSent ? "success" : anySent ? "partial" : "failed";
|
||
await db
|
||
.update(reminderRuns)
|
||
.set({ status })
|
||
.where(eq(reminderRuns.id, runId));
|
||
|
||
await writeAuditLog(db, {
|
||
operatorId: reminder.createdBy,
|
||
source: "system",
|
||
action: "reminder.fired",
|
||
targetType: "reminder",
|
||
targetId: reminder.id,
|
||
payload: { runId, status },
|
||
});
|
||
|
||
logger.info({ reminderId: reminder.id, runId, status }, "fire-reminder: done");
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Replace `apps/bot/src/scheduler/reminder-jobs.ts`** with the real implementation
|
||
|
||
```typescript
|
||
import type PgBoss from "pg-boss";
|
||
import { logger } from "../logger.js";
|
||
import { fireReminder, type FireReminderPayload } from "./fire-reminder.js";
|
||
|
||
export const REMINDER_FIRE_QUEUE = "reminder.fire";
|
||
|
||
export async function registerReminderJobs(boss: PgBoss): Promise<void> {
|
||
await boss.createQueue(REMINDER_FIRE_QUEUE);
|
||
await boss.work<FireReminderPayload>(REMINDER_FIRE_QUEUE, async ([job]) => {
|
||
if (!job) return;
|
||
logger.debug({ jobId: job.id, payload: job.data }, "reminder.fire: handling");
|
||
await fireReminder(job.data);
|
||
});
|
||
logger.info("reminder.fire: handler registered");
|
||
}
|
||
|
||
export async function scheduleReminderFire(
|
||
boss: PgBoss,
|
||
reminderId: string,
|
||
scheduledAt: Date,
|
||
): Promise<string | null> {
|
||
const id = await boss.send(
|
||
REMINDER_FIRE_QUEUE,
|
||
{ reminderId },
|
||
{
|
||
startAfter: scheduledAt,
|
||
retryLimit: 3,
|
||
retryDelay: 30,
|
||
retryBackoff: true,
|
||
// Use the reminderId as a singleton key so re-scheduling cancels the old job
|
||
singletonKey: `reminder:${reminderId}`,
|
||
},
|
||
);
|
||
logger.info({ reminderId, jobId: id, scheduledAt }, "reminder.fire: scheduled");
|
||
return id;
|
||
}
|
||
|
||
export async function cancelReminderFire(boss: PgBoss, reminderId: string): Promise<void> {
|
||
// pg-boss cancels by job id; we don't store the id, so we cancel by singleton key.
|
||
// Since we use singletonKey, scheduling a new job with the same key auto-cancels
|
||
// the old one. For explicit cancel-on-delete, schedule a far-future no-op or
|
||
// use boss.deleteSingleton — but pg-boss 10.x exposes only specific cancel APIs.
|
||
// For now we just log; the job will fire and find the reminder gone (handled by
|
||
// fire-reminder's "reminder not found" branch).
|
||
logger.info({ reminderId }, "reminder.fire: cancel requested (will fizzle on fire)");
|
||
}
|
||
```
|
||
|
||
NOTE: `cancelReminderFire` is a soft-cancel — the job still fires, but `fireReminder` exits early when the reminder row is gone. Hard cancel via pg-boss requires storing the `jobId` per reminder; out of scope for plan 2 (added in plan 2.5 if it becomes a real problem).
|
||
|
||
- [ ] **Step 3: Typecheck, restart bot, commit**
|
||
|
||
```bash
|
||
NO_SUDO=1 scripts/dev.sh pnpm --filter @cmbot/bot typecheck
|
||
NO_SUDO=1 scripts/dev.sh restart-bot
|
||
git add apps/bot/src/scheduler
|
||
git -c commit.gpgsign=false commit -m "feat(scheduler): add fire-reminder handler + job registration"
|
||
```
|
||
|
||
Verify in logs: `reminder.fire: handler registered`, no errors.
|
||
|
||
---
|
||
|
||
## Task 6: Wizard state for reminder creation
|
||
|
||
**Files:**
|
||
- Modify: `apps/bot/src/telegram/state.ts`
|
||
|
||
The wizard needs per-user state spanning multiple messages. State machine:
|
||
`idle → picked_account → picked_group → composing → set_time → confirming → done`.
|
||
|
||
Stored as a single object keyed by Telegram user ID.
|
||
|
||
- [ ] **Step 1: Append wizard helpers to `apps/bot/src/telegram/state.ts`**
|
||
|
||
```typescript
|
||
// Reminder creation wizard state.
|
||
export type WizardStep =
|
||
| "pick_account"
|
||
| "pick_group"
|
||
| "compose"
|
||
| "set_time"
|
||
| "confirm";
|
||
|
||
export type WizardState = {
|
||
step: WizardStep;
|
||
accountId?: string;
|
||
groupId?: string;
|
||
text?: string | null;
|
||
mediaId?: string | null;
|
||
caption?: string | null;
|
||
scheduledAt?: Date;
|
||
expiresAt: number;
|
||
};
|
||
|
||
const wizardState = new Map<number, WizardState>();
|
||
const WIZARD_TTL_MS = 30 * 60 * 1000;
|
||
|
||
export function getWizard(userId: number): WizardState | null {
|
||
const w = wizardState.get(userId);
|
||
if (!w) return null;
|
||
if (Date.now() >= w.expiresAt) {
|
||
wizardState.delete(userId);
|
||
return null;
|
||
}
|
||
return w;
|
||
}
|
||
|
||
export function startWizard(userId: number): WizardState {
|
||
const w: WizardState = { step: "pick_account", expiresAt: Date.now() + WIZARD_TTL_MS };
|
||
wizardState.set(userId, w);
|
||
return w;
|
||
}
|
||
|
||
export function updateWizard(userId: number, patch: Partial<WizardState>): WizardState | null {
|
||
const w = getWizard(userId);
|
||
if (!w) return null;
|
||
const next = { ...w, ...patch, expiresAt: Date.now() + WIZARD_TTL_MS };
|
||
wizardState.set(userId, next);
|
||
return next;
|
||
}
|
||
|
||
export function clearWizard(userId: number): void {
|
||
wizardState.delete(userId);
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Commit**
|
||
|
||
```bash
|
||
git add apps/bot/src/telegram/state.ts
|
||
git -c commit.gpgsign=false commit -m "feat(bot): add wizard state for reminder creation"
|
||
```
|
||
|
||
---
|
||
|
||
## Task 7: Reminder menu views (list, detail, wizard)
|
||
|
||
**Files:**
|
||
- Modify: `apps/bot/src/telegram/menus.ts`
|
||
|
||
Append the following render functions and update `mainMenu()` to include a 📅 Reminders button. Show the full diff carefully — many additions, one modification.
|
||
|
||
- [ ] **Step 1: Update `mainMenu()` in `apps/bot/src/telegram/menus.ts`** to add the Reminders button
|
||
|
||
Find:
|
||
```typescript
|
||
export function mainMenu(): MenuView {
|
||
const keyboard = new InlineKeyboard()
|
||
.text("📒 Accounts", "m:accounts")
|
||
.text("📡 Pair New", "m:pair")
|
||
.row()
|
||
.text("❓ Help", "m:help");
|
||
```
|
||
|
||
Replace with:
|
||
```typescript
|
||
export function mainMenu(): MenuView {
|
||
const keyboard = new InlineKeyboard()
|
||
.text("📒 Accounts", "m:accounts")
|
||
.text("📅 Reminders", "m:reminders")
|
||
.row()
|
||
.text("📡 Pair New", "m:pair")
|
||
.text("❓ Help", "m:help");
|
||
```
|
||
|
||
- [ ] **Step 2: Append reminder views at the end of `menus.ts`**
|
||
|
||
```typescript
|
||
import { listRemindersForOperator, getReminderWithDetails } from "../reminders/crud.js";
|
||
import { DateTime } from "luxon";
|
||
|
||
export async function remindersMenu(operatorId: string, operatorTimezone: string): Promise<MenuView> {
|
||
const list = await listRemindersForOperator(operatorId, 30);
|
||
const keyboard = new InlineKeyboard();
|
||
for (const r of list) {
|
||
const when = r.scheduledAt
|
||
? DateTime.fromJSDate(r.scheduledAt).setZone(operatorTimezone).toFormat("dd MMM HH:mm")
|
||
: "—";
|
||
const label = `${r.status === "active" ? "🟢" : r.status === "ended" ? "⚪" : "⏸"} ${r.name} · ${when}`;
|
||
keyboard.text(label.slice(0, 60), `rm:${r.id}`).row();
|
||
}
|
||
keyboard.text("➕ New Reminder", "rm:new").row().text("⬅ Main Menu", "m:main");
|
||
if (list.length === 0) {
|
||
return {
|
||
text: "📅 *Reminders*\n\nYou haven't created any reminders yet.",
|
||
keyboard,
|
||
};
|
||
}
|
||
return {
|
||
text: `📅 *Reminders* (${list.length})\n\nTap one to view, or *➕ New* to schedule a fresh one.`,
|
||
keyboard,
|
||
};
|
||
}
|
||
|
||
export async function reminderDetailMenu(
|
||
reminderId: string,
|
||
operatorTimezone: string,
|
||
): Promise<MenuView | null> {
|
||
const rem = await getReminderWithDetails(reminderId);
|
||
if (!rem) return null;
|
||
const when = rem.scheduledAt
|
||
? DateTime.fromJSDate(rem.scheduledAt).setZone(operatorTimezone).toFormat("yyyy-MM-dd HH:mm")
|
||
: "—";
|
||
const messagePreview = rem.messages
|
||
.map((m) => (m.kind === "text" ? m.textContent : `[${m.kind}]`))
|
||
.filter(Boolean)
|
||
.join("\n");
|
||
|
||
const keyboard = new InlineKeyboard()
|
||
.text("🗑 Delete", `rm_del:${reminderId}`)
|
||
.row()
|
||
.text("⬅ Reminders", "m:reminders")
|
||
.text("⬅ Main Menu", "m:main");
|
||
|
||
return {
|
||
text:
|
||
`📅 *${rem.name}*\n\n` +
|
||
`When: \`${when}\` (${rem.timezone})\n` +
|
||
`Status: \`${rem.status}\`\n` +
|
||
`Targets: ${rem.targets.length}\n\n` +
|
||
`*Body:*\n${messagePreview || "(empty)"}`,
|
||
keyboard,
|
||
};
|
||
}
|
||
|
||
export function reminderPickAccountMenu(
|
||
accounts: { id: string; label: string; phoneNumber: string | null }[],
|
||
): MenuView {
|
||
const keyboard = new InlineKeyboard();
|
||
for (const a of accounts) {
|
||
const phone = a.phoneNumber ? ` (+${a.phoneNumber})` : "";
|
||
keyboard.text(`📒 ${a.label}${phone}`, `rm_acc:${a.id}`).row();
|
||
}
|
||
keyboard.text("⬅ Cancel", "m:reminders");
|
||
return {
|
||
text: "➕ *New Reminder — Step 1 / 4*\n\nWhich WhatsApp account should send it?",
|
||
keyboard,
|
||
};
|
||
}
|
||
|
||
export function reminderPickGroupMenu(
|
||
groups: { id: string; name: string }[],
|
||
): MenuView {
|
||
const keyboard = new InlineKeyboard();
|
||
for (const g of groups.slice(0, 30)) {
|
||
const name = g.name.length > 32 ? `${g.name.slice(0, 31)}…` : g.name;
|
||
keyboard.text(`👥 ${name}`, `rm_grp:${g.id}`).row();
|
||
}
|
||
keyboard.text("⬅ Cancel", "m:reminders");
|
||
return {
|
||
text: "➕ *New Reminder — Step 2 / 4*\n\nWhich group?",
|
||
keyboard,
|
||
};
|
||
}
|
||
|
||
export function reminderComposeMenu(): MenuView {
|
||
const keyboard = new InlineKeyboard().text("⬅ Cancel", "m:reminders");
|
||
return {
|
||
text:
|
||
"➕ *New Reminder — Step 3 / 4*\n\n" +
|
||
"Send the message body now — text, photo, video, or document.\n\n" +
|
||
"Reply to *this* message with what you want sent. " +
|
||
"If you send media with a caption, the caption is included.",
|
||
keyboard,
|
||
};
|
||
}
|
||
|
||
export function reminderTimeMenu(): MenuView {
|
||
const keyboard = new InlineKeyboard()
|
||
.text("🕐 In 1 hour", "rm_t:in_1h")
|
||
.text("🕒 In 3 hours", "rm_t:in_3h")
|
||
.row()
|
||
.text("🌅 Tomorrow 9 AM", "rm_t:tomorrow_9am")
|
||
.text("📅 Next Mon 9 AM", "rm_t:next_mon_9am")
|
||
.row()
|
||
.text("⌨️ Custom date/time", "rm_t:custom")
|
||
.row()
|
||
.text("⬅ Cancel", "m:reminders");
|
||
return {
|
||
text:
|
||
"➕ *New Reminder — Step 4 / 4*\n\n" +
|
||
"When should it fire? Pick a quick option or type a date/time.",
|
||
keyboard,
|
||
};
|
||
}
|
||
|
||
export function reminderConfirmMenu(summary: {
|
||
accountLabel: string;
|
||
groupName: string;
|
||
body: string;
|
||
whenLocal: string;
|
||
}): MenuView {
|
||
const keyboard = new InlineKeyboard()
|
||
.text("✅ Schedule", "rm_save")
|
||
.text("⬅ Cancel", "m:reminders");
|
||
return {
|
||
text:
|
||
"*Review*\n\n" +
|
||
`Account: ${summary.accountLabel}\n` +
|
||
`Group: ${summary.groupName}\n` +
|
||
`When: ${summary.whenLocal}\n\n` +
|
||
`*Body:*\n${summary.body}`,
|
||
keyboard,
|
||
};
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 3: Typecheck and commit**
|
||
|
||
```bash
|
||
NO_SUDO=1 scripts/dev.sh pnpm --filter @cmbot/bot typecheck
|
||
git add apps/bot/src/telegram/menus.ts
|
||
git -c commit.gpgsign=false commit -m "feat(bot): add reminder menu views (list, detail, wizard steps)"
|
||
```
|
||
|
||
---
|
||
|
||
## Task 8: Wire reminder callbacks into bot.ts and message:text handler
|
||
|
||
**Files:**
|
||
- Modify: `apps/bot/src/telegram/callbacks.ts`
|
||
- Modify: `apps/bot/src/telegram/bot.ts`
|
||
- Create: `apps/bot/src/telegram/commands/reminders.ts`
|
||
|
||
This is the biggest task. The reminder wizard touches several of the existing patterns. Read carefully.
|
||
|
||
- [ ] **Step 1: Append callback handlers to `apps/bot/src/telegram/callbacks.ts`**
|
||
|
||
```typescript
|
||
import {
|
||
remindersMenu,
|
||
reminderDetailMenu,
|
||
reminderPickAccountMenu,
|
||
reminderPickGroupMenu,
|
||
reminderComposeMenu,
|
||
reminderTimeMenu,
|
||
reminderConfirmMenu,
|
||
} from "./menus.js";
|
||
import {
|
||
startWizard,
|
||
getWizard,
|
||
updateWizard,
|
||
clearWizard,
|
||
} from "./state.js";
|
||
import { createReminder, deleteReminder, getReminderWithDetails } from "../reminders/crud.js";
|
||
import { quickToDate, parseFreeText, type Quick } from "../reminders/time-parsing.js";
|
||
import { ingestTelegramFile } from "../media/ingest.js";
|
||
import { scheduleReminderFire, cancelReminderFire } from "../scheduler/reminder-jobs.js";
|
||
import { getBoss } from "../scheduler/pgboss-client.js";
|
||
import { env } from "../env.js";
|
||
import { DateTime } from "luxon";
|
||
import { DEFAULT_TIMEZONE } from "@cmbot/shared";
|
||
|
||
export async function showRemindersMenu(ctx: Context): Promise<void> {
|
||
await ctx.answerCallbackQuery();
|
||
const op = await findOperator(ctx);
|
||
if (!op) return;
|
||
const view = await remindersMenu(op.id, op.defaultTimezone ?? DEFAULT_TIMEZONE);
|
||
await showMenu(ctx, view);
|
||
}
|
||
|
||
export async function showReminderDetail(ctx: Context, reminderId: string): Promise<void> {
|
||
await ctx.answerCallbackQuery();
|
||
const op = await findOperator(ctx);
|
||
if (!op) return;
|
||
const view = await reminderDetailMenu(reminderId, op.defaultTimezone ?? DEFAULT_TIMEZONE);
|
||
if (!view) {
|
||
await ctx.answerCallbackQuery({ text: "Reminder not found.", show_alert: true });
|
||
return;
|
||
}
|
||
await showMenu(ctx, view);
|
||
}
|
||
|
||
export async function deleteReminderCallback(ctx: Context, reminderId: string): Promise<void> {
|
||
const op = await findOperator(ctx);
|
||
if (!op) {
|
||
await ctx.answerCallbackQuery();
|
||
return;
|
||
}
|
||
const rem = await getReminderWithDetails(reminderId);
|
||
if (!rem) {
|
||
await ctx.answerCallbackQuery({ text: "Reminder not found.", show_alert: true });
|
||
return;
|
||
}
|
||
await deleteReminder(reminderId);
|
||
await cancelReminderFire(getBoss(), reminderId);
|
||
await writeAuditLog(db, {
|
||
operatorId: op.id,
|
||
source: "telegram",
|
||
action: "reminder.deleted",
|
||
targetType: "reminder",
|
||
targetId: reminderId,
|
||
payload: { name: rem.name },
|
||
});
|
||
await ctx.answerCallbackQuery({ text: "Deleted." });
|
||
await showRemindersMenu(ctx);
|
||
}
|
||
|
||
export async function startReminderWizard(ctx: Context): Promise<void> {
|
||
await ctx.answerCallbackQuery();
|
||
const op = await findOperator(ctx);
|
||
if (!op) return;
|
||
const userId = ctx.from?.id;
|
||
if (!userId) return;
|
||
startWizard(userId);
|
||
|
||
const accounts = await db.query.whatsappAccounts.findMany({
|
||
where: (a, { eq }) => eq(a.operatorId, op.id),
|
||
orderBy: (a, { asc }) => [asc(a.label)],
|
||
});
|
||
if (accounts.length === 0) {
|
||
await showMenu(ctx, {
|
||
text: "You need to pair an account before scheduling a reminder.",
|
||
keyboard: new InlineKeyboard().text("⬅ Reminders", "m:reminders"),
|
||
});
|
||
return;
|
||
}
|
||
await showMenu(ctx, reminderPickAccountMenu(accounts));
|
||
}
|
||
|
||
export async function wizardPickAccount(ctx: Context, accountId: string): Promise<void> {
|
||
await ctx.answerCallbackQuery();
|
||
const userId = ctx.from?.id;
|
||
if (!userId) return;
|
||
updateWizard(userId, { step: "pick_group", accountId });
|
||
const groups = await db.query.whatsappGroups.findMany({
|
||
where: (g, { eq }) => eq(g.accountId, accountId),
|
||
orderBy: (g, { asc }) => [asc(g.name)],
|
||
});
|
||
await showMenu(ctx, reminderPickGroupMenu(groups));
|
||
}
|
||
|
||
export async function wizardPickGroup(ctx: Context, groupId: string): Promise<void> {
|
||
await ctx.answerCallbackQuery();
|
||
const userId = ctx.from?.id;
|
||
if (!userId) return;
|
||
updateWizard(userId, { step: "compose", groupId });
|
||
await showMenu(ctx, reminderComposeMenu());
|
||
}
|
||
|
||
export async function wizardSetTimeQuick(ctx: Context, quick: Quick): Promise<void> {
|
||
await ctx.answerCallbackQuery();
|
||
const userId = ctx.from?.id;
|
||
if (!userId) return;
|
||
const w = getWizard(userId);
|
||
if (!w) {
|
||
await ctx.reply("Wizard expired. Tap /menu to start again.");
|
||
return;
|
||
}
|
||
const op = await findOperator(ctx);
|
||
const tz = op?.defaultTimezone ?? DEFAULT_TIMEZONE;
|
||
const date = quickToDate(quick, tz);
|
||
updateWizard(userId, { step: "confirm", scheduledAt: date });
|
||
await showWizardConfirm(ctx);
|
||
}
|
||
|
||
export async function wizardSetTimeCustomPrompt(ctx: Context): Promise<void> {
|
||
await ctx.answerCallbackQuery();
|
||
const userId = ctx.from?.id;
|
||
if (!userId) return;
|
||
updateWizard(userId, { step: "set_time" });
|
||
await showMenu(ctx, {
|
||
text: "⌨️ Reply with date/time as `YYYY-MM-DD HH:MM`, e.g. `2026-05-15 09:00`.",
|
||
keyboard: new InlineKeyboard().text("⬅ Cancel", "m:reminders"),
|
||
});
|
||
}
|
||
|
||
export async function showWizardConfirm(ctx: Context): Promise<void> {
|
||
const userId = ctx.from?.id;
|
||
if (!userId) return;
|
||
const w = getWizard(userId);
|
||
if (!w || !w.accountId || !w.groupId || !w.scheduledAt) {
|
||
await ctx.reply("Wizard incomplete. Tap /menu and try again.");
|
||
return;
|
||
}
|
||
const op = await findOperator(ctx);
|
||
if (!op) return;
|
||
const account = await db.query.whatsappAccounts.findFirst({
|
||
where: (a, { eq, and }) => and(eq(a.id, w.accountId!), eq(a.operatorId, op.id)),
|
||
});
|
||
const group = await db.query.whatsappGroups.findFirst({
|
||
where: (g, { eq }) => eq(g.id, w.groupId!),
|
||
});
|
||
if (!account || !group) {
|
||
await ctx.reply("Account or group missing. Tap /menu and try again.");
|
||
return;
|
||
}
|
||
const tz = op.defaultTimezone ?? DEFAULT_TIMEZONE;
|
||
const whenLocal = DateTime.fromJSDate(w.scheduledAt).setZone(tz).toFormat("yyyy-MM-dd HH:mm");
|
||
const body = w.text || (w.mediaId ? `[${w.mediaId.startsWith("img") ? "image" : "media"}]${w.caption ? ` ${w.caption}` : ""}` : "(empty)");
|
||
await showMenu(ctx, reminderConfirmMenu({
|
||
accountLabel: account.label,
|
||
groupName: group.name,
|
||
body,
|
||
whenLocal: `${whenLocal} (${tz})`,
|
||
}));
|
||
}
|
||
|
||
export async function wizardSave(ctx: Context): Promise<void> {
|
||
await ctx.answerCallbackQuery();
|
||
const userId = ctx.from?.id;
|
||
if (!userId) return;
|
||
const w = getWizard(userId);
|
||
if (!w || !w.accountId || !w.groupId || !w.scheduledAt) {
|
||
await ctx.reply("Wizard incomplete. Tap /menu and try again.");
|
||
return;
|
||
}
|
||
const op = await findOperator(ctx);
|
||
if (!op) return;
|
||
const reminderId = await createReminder({
|
||
accountId: w.accountId,
|
||
groupId: w.groupId,
|
||
name: w.text?.slice(0, 50) ?? "Reminder",
|
||
scheduledAt: w.scheduledAt,
|
||
text: w.text ?? null,
|
||
mediaId: w.mediaId ?? null,
|
||
caption: w.caption ?? null,
|
||
createdBy: op.id,
|
||
timezone: op.defaultTimezone ?? DEFAULT_TIMEZONE,
|
||
});
|
||
await scheduleReminderFire(getBoss(), reminderId, w.scheduledAt);
|
||
await writeAuditLog(db, {
|
||
operatorId: op.id,
|
||
source: "telegram",
|
||
action: "reminder.created",
|
||
targetType: "reminder",
|
||
targetId: reminderId,
|
||
payload: { scheduledAt: w.scheduledAt.toISOString() },
|
||
});
|
||
clearWizard(userId);
|
||
await ctx.reply(
|
||
`✅ Scheduled. Tap /menu → 📅 Reminders to view.`,
|
||
);
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Create `apps/bot/src/telegram/commands/reminders.ts`**
|
||
|
||
```typescript
|
||
import type { Context } from "grammy";
|
||
import { showRemindersMenu } from "../callbacks.js";
|
||
|
||
export async function handleReminders(ctx: Context): Promise<void> {
|
||
await showRemindersMenu(ctx);
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 3: Wire callbacks + commands in `apps/bot/src/telegram/bot.ts`**
|
||
|
||
Add new imports:
|
||
```typescript
|
||
import { handleReminders } from "./commands/reminders.js";
|
||
import {
|
||
showRemindersMenu,
|
||
showReminderDetail,
|
||
deleteReminderCallback,
|
||
startReminderWizard,
|
||
wizardPickAccount,
|
||
wizardPickGroup,
|
||
wizardSetTimeQuick,
|
||
wizardSetTimeCustomPrompt,
|
||
wizardSave,
|
||
showWizardConfirm,
|
||
} from "./callbacks.js";
|
||
import { getWizard, updateWizard, clearWizard } from "./state.js";
|
||
import { ingestTelegramFile } from "../media/ingest.js";
|
||
import { parseFreeText, type Quick } from "../reminders/time-parsing.js";
|
||
import { DEFAULT_TIMEZONE } from "@cmbot/shared";
|
||
```
|
||
|
||
Add new commands and callbacks (place after the existing ones):
|
||
```typescript
|
||
bot.command("reminders", handleReminders);
|
||
|
||
bot.callbackQuery("m:reminders", showRemindersMenu);
|
||
bot.callbackQuery("rm:new", startReminderWizard);
|
||
bot.callbackQuery(/^rm_acc:(.+)$/, async (ctx) => {
|
||
await wizardPickAccount(ctx, ctx.match[1]!);
|
||
});
|
||
bot.callbackQuery(/^rm_grp:(.+)$/, async (ctx) => {
|
||
await wizardPickGroup(ctx, ctx.match[1]!);
|
||
});
|
||
bot.callbackQuery(/^rm_t:(.+)$/, async (ctx) => {
|
||
const quick = ctx.match[1]!;
|
||
if (quick === "custom") {
|
||
await wizardSetTimeCustomPrompt(ctx);
|
||
} else {
|
||
await wizardSetTimeQuick(ctx, quick as Quick);
|
||
}
|
||
});
|
||
bot.callbackQuery("rm_save", wizardSave);
|
||
bot.callbackQuery(/^rm_del:(.+)$/, async (ctx) => {
|
||
await deleteReminderCallback(ctx, ctx.match[1]!);
|
||
});
|
||
bot.callbackQuery(/^rm:(.+)$/, async (ctx) => {
|
||
await showReminderDetail(ctx, ctx.match[1]!);
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 4: Update the message:text handler in `bot.ts`**
|
||
|
||
The handler needs three new branches:
|
||
1. Wizard step `compose` + plain text → set `text`, advance to `set_time`, show time menu.
|
||
2. Wizard step `set_time` (custom path) → parse date, advance to `confirm`, show confirm.
|
||
|
||
Find the existing `bot.on("message:text", ...)` block and add wizard handling before the "Tap /menu" fallback. The full updated handler:
|
||
|
||
```typescript
|
||
bot.on("message:text", async (ctx) => {
|
||
const text = ctx.message?.text ?? "";
|
||
if (text.startsWith("/")) return;
|
||
const tgId = ctx.from?.id;
|
||
if (tgId === undefined) return;
|
||
|
||
// Pending "Pair New" label
|
||
if (consumePendingPairLabel(tgId)) {
|
||
const label = text.trim().replace(/^["'“”‘’]|["'“”‘’]$/g, "");
|
||
if (!label) {
|
||
await ctx.reply("That label is empty. Tap /menu and try again.");
|
||
return;
|
||
}
|
||
await executePairFlow(ctx, label);
|
||
return;
|
||
}
|
||
|
||
// Pending "Send Test" message body
|
||
const pendingGroupId = consumePendingSendToGroup(tgId);
|
||
if (pendingGroupId) {
|
||
const body = text.trim();
|
||
if (!body) {
|
||
await ctx.reply("Empty message. Tap /menu and try again.");
|
||
return;
|
||
}
|
||
await executeSendTest(ctx, pendingGroupId, body);
|
||
return;
|
||
}
|
||
|
||
// Reminder wizard
|
||
const w = getWizard(tgId);
|
||
if (w) {
|
||
if (w.step === "compose") {
|
||
updateWizard(tgId, { text: text.trim() });
|
||
// Advance to time step
|
||
const { reminderTimeMenu } = await import("./menus.js");
|
||
const view = reminderTimeMenu();
|
||
await ctx.reply(view.text, { reply_markup: view.keyboard, parse_mode: "Markdown" });
|
||
return;
|
||
}
|
||
if (w.step === "set_time") {
|
||
const op = await db.query.operators.findFirst({
|
||
where: (o, { eq }) => eq(o.telegramUserId, tgId),
|
||
});
|
||
const tz = op?.defaultTimezone ?? DEFAULT_TIMEZONE;
|
||
const parsed = parseFreeText(text, tz);
|
||
if (!parsed.ok) {
|
||
await ctx.reply(`❌ ${parsed.reason}\n\nTry again or tap /menu to cancel.`);
|
||
return;
|
||
}
|
||
updateWizard(tgId, { step: "confirm", scheduledAt: parsed.date });
|
||
await showWizardConfirm(ctx);
|
||
return;
|
||
}
|
||
}
|
||
|
||
await ctx.reply("Tap /menu to see what I can do.");
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 5: Add a media handler for the wizard's compose step**
|
||
|
||
Below the `message:text` handler in `bot.ts`, add:
|
||
|
||
```typescript
|
||
bot.on(["message:photo", "message:video", "message:document"], async (ctx) => {
|
||
const tgId = ctx.from?.id;
|
||
if (tgId === undefined) return;
|
||
const w = getWizard(tgId);
|
||
if (!w || w.step !== "compose") return;
|
||
const op = await db.query.operators.findFirst({
|
||
where: (o, { eq }) => eq(o.telegramUserId, tgId),
|
||
});
|
||
if (!op) return;
|
||
// Identify the file
|
||
const photo = ctx.message?.photo;
|
||
const video = ctx.message?.video;
|
||
const doc = ctx.message?.document;
|
||
let fileId: string | null = null;
|
||
let mimeType = "application/octet-stream";
|
||
let filename = "media";
|
||
let kind: "image" | "video" | "document" = "document";
|
||
if (photo && photo.length > 0) {
|
||
fileId = photo[photo.length - 1]!.file_id;
|
||
mimeType = "image/jpeg";
|
||
filename = "photo.jpg";
|
||
kind = "image";
|
||
} else if (video) {
|
||
fileId = video.file_id;
|
||
mimeType = video.mime_type ?? "video/mp4";
|
||
filename = video.file_name ?? "video.mp4";
|
||
kind = "video";
|
||
} else if (doc) {
|
||
fileId = doc.file_id;
|
||
mimeType = doc.mime_type ?? "application/octet-stream";
|
||
filename = doc.file_name ?? "document";
|
||
kind = "document";
|
||
}
|
||
if (!fileId) return;
|
||
|
||
await ctx.reply("📥 Downloading…");
|
||
try {
|
||
const result = await ingestTelegramFile(
|
||
op.id,
|
||
"https://api.telegram.org",
|
||
env.TELEGRAM_BOT_TOKEN,
|
||
fileId,
|
||
filename,
|
||
mimeType,
|
||
);
|
||
const caption = ctx.message?.caption ?? null;
|
||
updateWizard(tgId, { mediaId: result.mediaId, caption, text: caption });
|
||
// Advance to time step
|
||
const { reminderTimeMenu } = await import("./menus.js");
|
||
const view = reminderTimeMenu();
|
||
await ctx.reply(`✅ ${kind} stored. Now pick a time.`, {
|
||
reply_markup: view.keyboard,
|
||
parse_mode: "Markdown",
|
||
});
|
||
} catch (err) {
|
||
logger.error({ err }, "wizard media ingest failed");
|
||
await ctx.reply(`❌ Couldn't download/store the file: ${(err as Error).message}`);
|
||
}
|
||
});
|
||
```
|
||
|
||
NOTE: this requires `db` and `logger` to be imported in `bot.ts` if they aren't already. Check the existing imports and add what's missing.
|
||
|
||
- [ ] **Step 6: Update setMyCommands** to include `/reminders`
|
||
|
||
Find the `setMyCommands` block and add a `reminders` entry between `accounts` and `pair`:
|
||
```typescript
|
||
{ command: "reminders", description: "List and schedule reminders" },
|
||
```
|
||
|
||
- [ ] **Step 7: Typecheck, restart, commit**
|
||
|
||
```bash
|
||
NO_SUDO=1 scripts/dev.sh pnpm --filter @cmbot/bot typecheck
|
||
NO_SUDO=1 scripts/dev.sh restart-bot
|
||
git add apps/bot
|
||
git -c commit.gpgsign=false commit -m "feat(bot): wire reminder wizard + list/detail callbacks"
|
||
```
|
||
|
||
---
|
||
|
||
## Task 9: VERIFY — end-to-end one-off reminder
|
||
|
||
This is a manual verification step.
|
||
|
||
- [ ] **Step 1: Schedule a near-future text reminder**
|
||
|
||
In Telegram:
|
||
1. `/menu` → 📅 Reminders → ➕ New Reminder
|
||
2. Pick your paired account.
|
||
3. Pick a small test group.
|
||
4. Reply with a short text, e.g. `Test reminder from bot`.
|
||
5. Tap 🕐 In 1 hour — wait, that's too long. Instead pick "Custom" and reply with a date/time ~1 minute in the future, e.g. `2026-05-09 16:30` (replacing with current local time + 1 min).
|
||
6. ✅ Schedule.
|
||
|
||
- [ ] **Step 2: Watch logs**
|
||
|
||
```bash
|
||
docker compose --env-file .env.development -f docker-compose.base.yml -f docker-compose.dev.yml logs -f bot
|
||
```
|
||
|
||
After ~1 minute you should see:
|
||
```
|
||
reminder.fire: scheduled
|
||
... (delay) ...
|
||
reminder.fire: handling
|
||
fire-reminder: done status=success
|
||
```
|
||
|
||
And a message arriving in your test WhatsApp group.
|
||
|
||
- [ ] **Step 3: Verify in DB**
|
||
|
||
```bash
|
||
NO_SUDO=1 scripts/dev.sh exec sh -c 'cd packages/db && pnpm exec tsx -e "
|
||
(async () => {
|
||
const { Pool } = await import(\"pg\");
|
||
const p = new Pool({ connectionString: process.env.DATABASE_URL });
|
||
console.log(\"reminders:\", (await p.query(\"SELECT id, name, status, scheduled_at FROM reminders\")).rows);
|
||
console.log(\"runs:\", (await p.query(\"SELECT reminder_id, status, fired_at FROM reminder_runs ORDER BY fired_at DESC LIMIT 5\")).rows);
|
||
console.log(\"run_targets:\", (await p.query(\"SELECT run_id, status, latency_ms, error FROM reminder_run_targets ORDER BY run_id DESC LIMIT 5\")).rows);
|
||
await p.end();
|
||
})();
|
||
"'
|
||
```
|
||
|
||
Expected: reminder row exists, one reminder_runs row with `status='success'`, one reminder_run_targets row with `status='sent'` and a non-null `latency_ms`.
|
||
|
||
- [ ] **Step 4: Try a media reminder**
|
||
|
||
Repeat steps 1-6 but in step 4 send a small photo with a caption instead of typing text. Verify the photo arrives in the WhatsApp group.
|
||
|
||
- [ ] **Step 5: Try a delete**
|
||
|
||
Schedule a 5-minute-out reminder, then immediately go to /menu → 📅 Reminders → tap it → 🗑 Delete. The pg-boss job will still fire but `fireReminder` will skip with "reminder not found" (soft cancel).
|
||
|
||
- [ ] **Step 6: Sign-off (no commit)**
|
||
|
||
If all the above works, the headline reminder feature is solid. If anything fails, capture the error and we'll patch.
|
||
|
||
---
|
||
|
||
## Plan 2 done — what's working
|
||
|
||
After all 9 tasks:
|
||
|
||
- pg-boss running against the same Postgres, with retry-with-backoff job semantics.
|
||
- Reminder CRUD via Telegram menu wizard: pick account → pick group → compose text/media → pick time (quick or custom) → confirm.
|
||
- Media upload from Telegram (photo, video, document) — downloaded once, hashed, stored on the shared volume, referenced by reminder.
|
||
- Fire-reminder handler sends each message part in order, throttled with 1.5s jitter, records per-target results.
|
||
- Run history visible (via DB queries; UI improvements can come later).
|
||
- Soft cancel on delete — fired job skips when reminder is gone.
|
||
|
||
## Deferred / next plan
|
||
|
||
- **Recurring reminders (RRULE)** — schema is ready; needs wizard quick-picks (every day at 9am, every Monday, etc.) and pg-boss `repeatEvery` wiring.
|
||
- **Multi-group targets** per reminder.
|
||
- **Multi-part messages** (text + image + caption stack).
|
||
- **Run history view in menu** — currently you have to query Postgres directly to see past sends.
|
||
- **Web dashboard (plan 3)** — replaces the Telegram wizard with a richer UI for media upload and bulk scheduling.
|