feat(scheduler): add pg-boss client + lifecycle

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
yiekheng 2026-05-09 17:19:01 +08:00
parent bf3586fe7b
commit 113adc7edf
5 changed files with 100 additions and 1 deletions

View File

@ -16,11 +16,12 @@
"@cmbot/db": "workspace:*",
"@cmbot/shared": "workspace:*",
"@whiskeysockets/baileys": "7.0.0-rc10",
"drizzle-orm": "^0.36.0",
"grammy": "^1.31.0",
"pg-boss": "^12.18.2",
"pino": "^9.5.0",
"pino-pretty": "^11.3.0",
"qrcode": "^1.5.4",
"drizzle-orm": "^0.36.0",
"zod": "^3.23.8"
},
"devDependencies": {

View File

@ -3,12 +3,17 @@ import { pool } from "./db.js";
import { startHealthServer, setSessionCountsProvider } from "./health.js";
import { createTelegramBot } from "./telegram/bot.js";
import { sessionManager } from "./whatsapp/session-manager.js";
import { startBoss, stopBoss } from "./scheduler/pgboss-client.js";
import { registerReminderJobs } from "./scheduler/reminder-jobs.js";
async function main(): Promise<void> {
logger.info("bot starting");
const health = startHealthServer();
setSessionCountsProvider(() => sessionManager.getCounts());
const boss = await startBoss();
await registerReminderJobs(boss);
const tg = createTelegramBot();
void tg.start({
onStart: (info) => logger.info({ username: info.username }, "telegram polling started"),
@ -21,6 +26,7 @@ async function main(): Promise<void> {
logger.info({ signal }, "shutting down");
await tg.stop();
await sessionManager.stopAll();
await stopBoss();
health.close();
await pool.end();
process.exit(0);

View File

@ -0,0 +1,32 @@
import { PgBoss } from "pg-boss";
import { env } from "../env.js";
import { logger } from "../logger.js";
let boss: PgBoss | null = null;
export async function startBoss(): Promise<PgBoss> {
if (boss) return boss;
const instance = new PgBoss({
connectionString: env.DATABASE_URL,
schema: "pgboss",
});
instance.on("error", (err: unknown) => logger.error({ err }, "pg-boss: error"));
await instance.start();
boss = instance;
logger.info("pg-boss started");
return instance;
}
export async function stopBoss(): Promise<void> {
if (!boss) return;
await boss.stop({ graceful: true, timeout: 5000 });
boss = null;
logger.info("pg-boss stopped");
}
export { PgBoss };
export function getBoss(): PgBoss {
if (!boss) throw new Error("pg-boss not started");
return boss;
}

View File

@ -0,0 +1,7 @@
import type { PgBoss } from "pg-boss";
import { logger } from "../logger.js";
// Wired up properly in Task 5. Placeholder so index.ts can import.
export async function registerReminderJobs(_boss: PgBoss): Promise<void> {
logger.debug("registerReminderJobs: placeholder (task 5 will fill in)");
}

53
pnpm-lock.yaml generated
View File

@ -32,6 +32,9 @@ importers:
grammy:
specifier: ^1.31.0
version: 1.42.0
pg-boss:
specifier: ^12.18.2
version: 12.18.2
pino:
specifier: ^9.5.0
version: 9.14.0
@ -1228,6 +1231,10 @@ packages:
resolution: {integrity: sha512-nTjqfcBFEipKdXCv4YDQWCfmcLZKm81ldF0pAopTvyrFGVbcR6P/VAAd5G7N+0tTr8QqiU0tFadD6FK4NtJwOA==}
engines: {node: '>= 0.6'}
cron-parser@5.5.0:
resolution: {integrity: sha512-oML4lKUXxizYswqmxuOCpgFS8BNUJpIu6k/2HVHyaL8Ynnf3wdf9tkns0yRdJLSIjkJ+b0DXHMZEHGpMwjnPww==}
engines: {node: '>=18'}
curve25519-js@0.0.4:
resolution: {integrity: sha512-axn2UMEnkhyDUPWOwVKBMVIzSQy2ejH2xRGy1wq81dqRwApXfIzfbE3hIX0ZRFBIihf/KDqK158DLwESu4AK1w==}
@ -1522,6 +1529,10 @@ packages:
resolution: {integrity: sha512-M6Rm/bbG6De/gKGxOpeOobx/dnGuP0dz40adqx38boqHhlWssBJZgLCPBNtb9NkrmnKYiV04xELq+R6PFOnoLA==}
engines: {node: '>=4.4.0'}
non-error@0.1.0:
resolution: {integrity: sha512-TMB1uHiGsHRGv1uYclfhivcnf0/PdFp2pNqRxXjncaAsjYMoisaQJI+SSZCqRq+VliwRTC8tsMQfmrWjDMhkPQ==}
engines: {node: '>=20'}
ogg-opus-decoder@1.7.3:
resolution: {integrity: sha512-w47tiZpkLgdkpa+34VzYD8mHUj8I9kfWVZa82mBbNwDvB1byfLXSSzW/HxA4fI3e9kVlICSpXGFwMLV1LPdjwg==}
@ -1566,6 +1577,11 @@ packages:
resolution: {integrity: sha512-//nshmD55c46FuFw26xV/xFAaB5HF9Xdap7HJBBnrKdAd6/GxDBaNA1870O79+9ueg61cZLSVc+OaFlfmObYVQ==}
engines: {node: '>= 14.16'}
pg-boss@12.18.2:
resolution: {integrity: sha512-06kXeWvVWY+BUNsOt2me1okg6NXx2DBnAQHTurA9jtrvbAO9qUOSE3/0ERERQDrokI+FREFM2Twha+JbrFT/8Q==}
engines: {node: '>=22.12.0'}
hasBin: true
pg-cloudflare@1.3.0:
resolution: {integrity: sha512-6lswVVSztmHiRtD6I8hw4qP/nDm1EJbKMRhf3HCYaqud7frGysPv7FYJ5noZQdhQtN2xJnimfMtvQq21pdbzyQ==}
@ -1715,6 +1731,10 @@ packages:
engines: {node: '>=10'}
hasBin: true
serialize-error@13.0.1:
resolution: {integrity: sha512-bBZaRwLH9PN5HbLCjPId4dP5bNGEtumcErgOX952IsvOhVPrm3/AeK1y0UHA/QaPG701eg0yEnOKsCOC6X/kaA==}
engines: {node: '>=20'}
set-blocking@2.0.0:
resolution: {integrity: sha512-KiKBS8AnWGEyLzofFfmvKwpdPzqiy16LvQfK3yv/fVH7Bj13/wl3JSR1J+rfgRE9q7xUJK4qvgS8raSOeLUehw==}
@ -1771,6 +1791,10 @@ packages:
resolution: {integrity: sha512-ki4hZQfh5rX0QDLLkOCj+h+CVNkqmp/CMf8v8kZpkNVK6jGQooMytqzLZYUVYIZcFZ6yDB70EfD8POcFXiF5oA==}
engines: {node: '>=18'}
tagged-tag@1.0.0:
resolution: {integrity: sha512-yEFYrVhod+hdNyx7g5Bnkkb0G6si8HJurOoOEgC8B/O0uXLHlaey/65KRv6cuWBNhBgHKAROVpc7QyYqE5gFng==}
engines: {node: '>=20'}
thread-stream@3.1.0:
resolution: {integrity: sha512-OqyPZ9u96VohAyMfJykzmivOrY2wfMSf3C5TtFJVgN+Hm6aj+voFhlK+kZEIv2FBh1X6Xp3DlnCOfEQ3B2J86A==}
@ -1811,6 +1835,10 @@ packages:
resolution: {integrity: sha512-lCPgus1NuTiBdaITWqzSH/Ff6HVL8HHGBtOXHg1dHRfcshN79XkygSdh0M6g8b0td91ILLG5MTkLOkp5UvyPJw==}
hasBin: true
type-fest@5.6.0:
resolution: {integrity: sha512-8ZiHFm91orbSAe2PSAiSVBVko18pbhbiB3U9GglSzF/zCGkR+rxpHx6sEMCUm4kxY4LjDIUGgCfUMtwfZfjfUA==}
engines: {node: '>=20'}
typescript@5.9.3:
resolution: {integrity: sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==}
engines: {node: '>=14.17'}
@ -2729,6 +2757,10 @@ snapshots:
content-type@1.0.5: {}
cron-parser@5.5.0:
dependencies:
luxon: 3.7.2
curve25519-js@0.0.4: {}
dateformat@4.6.3: {}
@ -3002,6 +3034,8 @@ snapshots:
node-wav@0.0.2:
optional: true
non-error@0.1.0: {}
ogg-opus-decoder@1.7.3:
dependencies:
'@wasm-audio-decoders/common': 9.0.7
@ -3044,6 +3078,14 @@ snapshots:
pathval@2.0.1: {}
pg-boss@12.18.2:
dependencies:
cron-parser: 5.5.0
pg: 8.20.0
serialize-error: 13.0.1
transitivePeerDependencies:
- pg-native
pg-cloudflare@1.3.0:
optional: true
@ -3252,6 +3294,11 @@ snapshots:
semver@7.8.0: {}
serialize-error@13.0.1:
dependencies:
non-error: 0.1.0
type-fest: 5.6.0
set-blocking@2.0.0: {}
sharp@0.34.5:
@ -3329,6 +3376,8 @@ snapshots:
dependencies:
'@tokenizer/token': 0.3.0
tagged-tag@1.0.0: {}
thread-stream@3.1.0:
dependencies:
real-require: 0.2.0
@ -3369,6 +3418,10 @@ snapshots:
'@turbo/windows-64': 2.9.12
'@turbo/windows-arm64': 2.9.12
type-fest@5.6.0:
dependencies:
tagged-tag: 1.0.0
typescript@5.9.3: {}
uint8array-extras@1.5.0: {}