docs(plan): add Task 9b — Postgres-only cache, rate-limit, search
Operational rule: cache, queue, search, and rate-limiting all use Postgres — no Redis or external systems. New Task 9b adds: - pg_trgm extension + GIN trigram indexes on whatsapp_groups.name and reminders.name for fuzzy search - BRIN indexes on reminder_runs.fired_at and audit_log.created_at for cheap time-series scans - Common-filter B-tree indexes on reminders.status and (account_id, scheduled_at) - cache_entries table + cacheGet / cacheSet / cacheGetOrSet helpers - rate_limit_buckets table + checkRateLimit (atomic UPSERT, sliding window) - search.ts with trigramMatch / trigramRank Drizzle SQL fragments - Vitest unit tests for cache and rate-limit helpers Also rewrites Task 12 (rate-limit middleware) to enforce limits inside Server Actions where DB access exists, rather than edge middleware where it doesn't.
This commit is contained in:
parent
4b859bc44a
commit
24e61f4cdd
@ -13,6 +13,7 @@
|
||||
**Phase guide:**
|
||||
- A — Telegram removal (Tasks 1-4)
|
||||
- B — Web app skeleton (Tasks 5-9)
|
||||
- B+ — Postgres extensions + indexes + cache/rate-limit tables (Task 9b)
|
||||
- C — Foundation: layout, SSE, middleware (Tasks 10-12)
|
||||
- D — Read-only pages (Tasks 13-16)
|
||||
- E — Mutations: pair, unpair, send-test (Tasks 17-19)
|
||||
@ -20,6 +21,14 @@
|
||||
- G — PWA (Task 22)
|
||||
- H — Verify + push (Tasks 23-24)
|
||||
|
||||
**Operational rule:** Cache, queue, search, and rate-limiting all use Postgres — no Redis, no external systems. Specifically:
|
||||
- Queue: `pg-boss` (already in plan 2)
|
||||
- IPC: Postgres `LISTEN/NOTIFY` (Task 1, 11)
|
||||
- Cache: a Drizzle-backed `cache_entries` table with TTL + a tiny `getOrSet` helper (Task 9b)
|
||||
- Rate limit: a `rate_limit_buckets` table with sliding-window UPSERT (Task 9b → consumed by Task 12)
|
||||
- Search: pg_trgm extension + GIN indexes for fuzzy `name` lookup; uses `name % query` with similarity ranking (Task 9b → consumed by Tasks 14, 15)
|
||||
- Time-series scans (`reminder_runs.fired_at`, `audit_log.created_at`): BRIN indexes for cheap append-mostly scanning
|
||||
|
||||
---
|
||||
|
||||
## File structure produced by this plan
|
||||
@ -1239,6 +1248,372 @@ Expected: `cm WhatsApp Bot` (one match — the H1).
|
||||
|
||||
---
|
||||
|
||||
# Phase B+ — Postgres optimization (cache / rate-limit / search)
|
||||
|
||||
## Task 9b: pg_trgm + indexes + cache/rate-limit tables + helpers
|
||||
|
||||
**Files:**
|
||||
- Modify: `packages/db/src/schema.ts` (add `cacheEntries`, `rateLimitBuckets` tables)
|
||||
- Generate: `packages/db/migrations/0002_*.sql` (extension + indexes + tables)
|
||||
- Create: `apps/web/src/lib/cache.ts`
|
||||
- Create: `apps/web/src/lib/rate-limit.ts`
|
||||
- Create: `apps/web/src/lib/search.ts`
|
||||
- Create: `apps/web/src/lib/cache.test.ts`
|
||||
- Create: `apps/web/src/lib/rate-limit.test.ts`
|
||||
|
||||
### Step 1: Add tables to `packages/db/src/schema.ts`
|
||||
|
||||
Append at the end of the file:
|
||||
|
||||
```typescript
|
||||
export const cacheEntries = pgTable("cache_entries", {
|
||||
key: text("key").primaryKey(),
|
||||
value: jsonb("value").notNull(),
|
||||
expiresAt: timestamp("expires_at", { withTimezone: true }).notNull(),
|
||||
});
|
||||
|
||||
export const rateLimitBuckets = pgTable("rate_limit_buckets", {
|
||||
key: text("key").primaryKey(),
|
||||
windowStart: timestamp("window_start", { withTimezone: true }).notNull(),
|
||||
count: integer("count").notNull(),
|
||||
expiresAt: timestamp("expires_at", { withTimezone: true }).notNull(),
|
||||
});
|
||||
```
|
||||
|
||||
### Step 2: Generate the migration
|
||||
|
||||
```bash
|
||||
NO_SUDO=1 scripts/db.sh generate
|
||||
```
|
||||
|
||||
This creates `packages/db/migrations/0002_*.sql` with the two new tables.
|
||||
|
||||
### Step 3: Edit the generated migration to add the extension + indexes
|
||||
|
||||
Open the new `0002_*.sql` and add at the **top**:
|
||||
|
||||
```sql
|
||||
CREATE EXTENSION IF NOT EXISTS pg_trgm;
|
||||
```
|
||||
|
||||
And at the **bottom** (after the auto-generated CREATE TABLE statements):
|
||||
|
||||
```sql
|
||||
-- Trigram fuzzy search indexes (pg_trgm)
|
||||
CREATE INDEX IF NOT EXISTS whatsapp_groups_name_trgm
|
||||
ON whatsapp_groups USING gin (name gin_trgm_ops);
|
||||
CREATE INDEX IF NOT EXISTS reminders_name_trgm
|
||||
ON reminders USING gin (name gin_trgm_ops);
|
||||
|
||||
-- Common-filter B-tree indexes
|
||||
CREATE INDEX IF NOT EXISTS reminders_status_idx
|
||||
ON reminders (status);
|
||||
CREATE INDEX IF NOT EXISTS reminders_account_scheduled_idx
|
||||
ON reminders (account_id, scheduled_at DESC NULLS LAST);
|
||||
CREATE INDEX IF NOT EXISTS reminder_runs_reminder_fired_idx
|
||||
ON reminder_runs (reminder_id, fired_at DESC);
|
||||
|
||||
-- BRIN indexes for append-mostly time-series columns
|
||||
CREATE INDEX IF NOT EXISTS reminder_runs_fired_at_brin
|
||||
ON reminder_runs USING brin (fired_at);
|
||||
CREATE INDEX IF NOT EXISTS audit_log_created_at_brin
|
||||
ON audit_log USING brin (created_at);
|
||||
|
||||
-- Expiry indexes for the new utility tables
|
||||
CREATE INDEX IF NOT EXISTS cache_entries_expires_idx
|
||||
ON cache_entries (expires_at);
|
||||
CREATE INDEX IF NOT EXISTS rate_limit_buckets_expires_idx
|
||||
ON rate_limit_buckets (expires_at);
|
||||
```
|
||||
|
||||
### Step 4: Apply the migration
|
||||
|
||||
```bash
|
||||
NO_SUDO=1 scripts/db.sh migrate
|
||||
```
|
||||
|
||||
Expected: "Migrations applied." Verify the extension:
|
||||
|
||||
```bash
|
||||
NO_SUDO=1 scripts/dev.sh exec sh -c 'cd packages/db && pnpm exec tsx -e "
|
||||
(async () => {
|
||||
const { Pool } = await import(\"pg\");
|
||||
const p = new Pool({ connectionString: process.env.DATABASE_URL });
|
||||
const r = await p.query(\"SELECT extname FROM pg_extension WHERE extname=\\\$1\", [\"pg_trgm\"]);
|
||||
console.log(\"pg_trgm:\", r.rows);
|
||||
await p.end();
|
||||
})();
|
||||
"'
|
||||
```
|
||||
|
||||
Expected: `pg_trgm: [ { extname: 'pg_trgm' } ]`.
|
||||
|
||||
### Step 5: Write failing test for the cache helper
|
||||
|
||||
Create `apps/web/src/lib/cache.test.ts`:
|
||||
|
||||
```typescript
|
||||
import { describe, it, expect, beforeEach } from "vitest";
|
||||
import { cacheGet, cacheSet, cacheGetOrSet, cacheDelete } from "./cache.js";
|
||||
import { db } from "./db.js";
|
||||
import { cacheEntries } from "@cmbot/db";
|
||||
import { sql } from "drizzle-orm";
|
||||
|
||||
describe("cache helpers", () => {
|
||||
beforeEach(async () => {
|
||||
await db.delete(cacheEntries).where(sql`true`);
|
||||
});
|
||||
|
||||
it("set + get round-trip with TTL", async () => {
|
||||
await cacheSet("k1", { hello: "world" }, 60);
|
||||
const v = await cacheGet<{ hello: string }>("k1");
|
||||
expect(v).toEqual({ hello: "world" });
|
||||
});
|
||||
|
||||
it("getOrSet computes once, then returns cached", async () => {
|
||||
let calls = 0;
|
||||
const compute = async () => {
|
||||
calls++;
|
||||
return { stamp: 42 };
|
||||
};
|
||||
const a = await cacheGetOrSet("k2", 60, compute);
|
||||
const b = await cacheGetOrSet("k2", 60, compute);
|
||||
expect(a).toEqual({ stamp: 42 });
|
||||
expect(b).toEqual({ stamp: 42 });
|
||||
expect(calls).toBe(1);
|
||||
});
|
||||
|
||||
it("expired entries are skipped", async () => {
|
||||
await cacheSet("k3", { stale: true }, -1); // already expired
|
||||
const v = await cacheGet("k3");
|
||||
expect(v).toBeNull();
|
||||
});
|
||||
|
||||
it("delete removes the entry", async () => {
|
||||
await cacheSet("k4", { x: 1 }, 60);
|
||||
await cacheDelete("k4");
|
||||
expect(await cacheGet("k4")).toBeNull();
|
||||
});
|
||||
});
|
||||
```
|
||||
|
||||
### Step 6: Run failing test
|
||||
|
||||
```bash
|
||||
NO_SUDO=1 scripts/dev.sh pnpm --filter @cmbot/web test
|
||||
```
|
||||
|
||||
Expected: 4 tests fail (`./cache.js` not found).
|
||||
|
||||
### Step 7: Implement `apps/web/src/lib/cache.ts`
|
||||
|
||||
```typescript
|
||||
import "server-only";
|
||||
import { sql, eq } from "drizzle-orm";
|
||||
import { cacheEntries } from "@cmbot/db";
|
||||
import { db } from "./db";
|
||||
|
||||
export async function cacheGet<T>(key: string): Promise<T | null> {
|
||||
const row = await db.query.cacheEntries.findFirst({
|
||||
where: (c, { eq, and, gt }) => and(eq(c.key, key), gt(c.expiresAt, new Date())),
|
||||
});
|
||||
return (row?.value ?? null) as T | null;
|
||||
}
|
||||
|
||||
export async function cacheSet(key: string, value: unknown, ttlSeconds: number): Promise<void> {
|
||||
const expiresAt = new Date(Date.now() + ttlSeconds * 1000);
|
||||
// UPSERT — atomic single-row write, no race
|
||||
await db
|
||||
.insert(cacheEntries)
|
||||
.values({ key, value: value as never, expiresAt })
|
||||
.onConflictDoUpdate({
|
||||
target: cacheEntries.key,
|
||||
set: { value: value as never, expiresAt },
|
||||
});
|
||||
}
|
||||
|
||||
export async function cacheDelete(key: string): Promise<void> {
|
||||
await db.delete(cacheEntries).where(eq(cacheEntries.key, key));
|
||||
}
|
||||
|
||||
export async function cacheGetOrSet<T>(
|
||||
key: string,
|
||||
ttlSeconds: number,
|
||||
compute: () => Promise<T>,
|
||||
): Promise<T> {
|
||||
const hit = await cacheGet<T>(key);
|
||||
if (hit !== null) return hit;
|
||||
const fresh = await compute();
|
||||
await cacheSet(key, fresh, ttlSeconds);
|
||||
return fresh;
|
||||
}
|
||||
|
||||
/** Lazy expiry sweeper — call periodically (e.g. on bot startup). */
|
||||
export async function cacheSweep(): Promise<{ removed: number }> {
|
||||
const r = await db.execute(sql`DELETE FROM cache_entries WHERE expires_at < now() RETURNING key`);
|
||||
return { removed: r.rowCount ?? 0 };
|
||||
}
|
||||
```
|
||||
|
||||
### Step 8: Run cache tests (expect pass)
|
||||
|
||||
```bash
|
||||
NO_SUDO=1 scripts/dev.sh pnpm --filter @cmbot/web test -- cache
|
||||
```
|
||||
|
||||
Expected: 4 tests pass.
|
||||
|
||||
### Step 9: Write failing test for the rate-limit helper
|
||||
|
||||
Create `apps/web/src/lib/rate-limit.test.ts`:
|
||||
|
||||
```typescript
|
||||
import { describe, it, expect, beforeEach } from "vitest";
|
||||
import { checkRateLimit } from "./rate-limit.js";
|
||||
import { db } from "./db.js";
|
||||
import { rateLimitBuckets } from "@cmbot/db";
|
||||
import { sql } from "drizzle-orm";
|
||||
|
||||
describe("checkRateLimit", () => {
|
||||
beforeEach(async () => {
|
||||
await db.delete(rateLimitBuckets).where(sql`true`);
|
||||
});
|
||||
|
||||
it("allows requests under the limit", async () => {
|
||||
for (let i = 0; i < 5; i++) {
|
||||
const r = await checkRateLimit("test:1", { max: 5, windowSec: 10 });
|
||||
expect(r.limited).toBe(false);
|
||||
}
|
||||
});
|
||||
|
||||
it("blocks the (max+1)th request within the window", async () => {
|
||||
for (let i = 0; i < 3; i++) {
|
||||
await checkRateLimit("test:2", { max: 3, windowSec: 10 });
|
||||
}
|
||||
const r = await checkRateLimit("test:2", { max: 3, windowSec: 10 });
|
||||
expect(r.limited).toBe(true);
|
||||
expect(r.count).toBeGreaterThan(3);
|
||||
});
|
||||
|
||||
it("isolates buckets by key", async () => {
|
||||
await checkRateLimit("a", { max: 1, windowSec: 10 });
|
||||
await checkRateLimit("a", { max: 1, windowSec: 10 });
|
||||
const aLimited = await checkRateLimit("a", { max: 1, windowSec: 10 });
|
||||
const bFresh = await checkRateLimit("b", { max: 1, windowSec: 10 });
|
||||
expect(aLimited.limited).toBe(true);
|
||||
expect(bFresh.limited).toBe(false);
|
||||
});
|
||||
});
|
||||
```
|
||||
|
||||
### Step 10: Run failing test
|
||||
|
||||
```bash
|
||||
NO_SUDO=1 scripts/dev.sh pnpm --filter @cmbot/web test -- rate-limit
|
||||
```
|
||||
|
||||
### Step 11: Implement `apps/web/src/lib/rate-limit.ts`
|
||||
|
||||
```typescript
|
||||
import "server-only";
|
||||
import { sql } from "drizzle-orm";
|
||||
import { db } from "./db";
|
||||
|
||||
export type RateLimitOptions = { max: number; windowSec: number };
|
||||
export type RateLimitResult = { limited: boolean; count: number };
|
||||
|
||||
/**
|
||||
* Sliding-window rate limit using a single atomic Postgres UPSERT.
|
||||
*
|
||||
* Returns `{ limited: count > max, count }`. The UPSERT resets the
|
||||
* window when the existing row is older than `windowSec`, otherwise
|
||||
* increments the count in place.
|
||||
*/
|
||||
export async function checkRateLimit(
|
||||
key: string,
|
||||
opts: RateLimitOptions,
|
||||
): Promise<RateLimitResult> {
|
||||
const { windowSec } = opts;
|
||||
const result = await db.execute(sql`
|
||||
INSERT INTO rate_limit_buckets (key, window_start, count, expires_at)
|
||||
VALUES (${key}, now(), 1, now() + (${windowSec} * interval '1 second'))
|
||||
ON CONFLICT (key) DO UPDATE
|
||||
SET count = CASE
|
||||
WHEN rate_limit_buckets.window_start < now() - (${windowSec} * interval '1 second')
|
||||
THEN 1
|
||||
ELSE rate_limit_buckets.count + 1
|
||||
END,
|
||||
window_start = CASE
|
||||
WHEN rate_limit_buckets.window_start < now() - (${windowSec} * interval '1 second')
|
||||
THEN now()
|
||||
ELSE rate_limit_buckets.window_start
|
||||
END,
|
||||
expires_at = now() + (${windowSec} * interval '1 second')
|
||||
RETURNING count;
|
||||
`);
|
||||
const count = Number((result.rows[0] as { count: number }).count);
|
||||
return { limited: count > opts.max, count };
|
||||
}
|
||||
|
||||
export async function rateLimitSweep(): Promise<{ removed: number }> {
|
||||
const r = await db.execute(sql`DELETE FROM rate_limit_buckets WHERE expires_at < now()`);
|
||||
return { removed: r.rowCount ?? 0 };
|
||||
}
|
||||
```
|
||||
|
||||
### Step 12: Run rate-limit tests (expect pass)
|
||||
|
||||
```bash
|
||||
NO_SUDO=1 scripts/dev.sh pnpm --filter @cmbot/web test -- rate-limit
|
||||
```
|
||||
|
||||
Expected: 3 tests pass.
|
||||
|
||||
### Step 13: Implement `apps/web/src/lib/search.ts` (no test — thin wrapper)
|
||||
|
||||
```typescript
|
||||
import "server-only";
|
||||
import { sql, type SQL } from "drizzle-orm";
|
||||
|
||||
/**
|
||||
* Build a Drizzle WHERE fragment that fuzzy-matches `column` against `query`
|
||||
* using pg_trgm's similarity operator. Returns true (no filter) when query is
|
||||
* empty so callers can compose unconditionally.
|
||||
*
|
||||
* Caller must ensure a `gin_trgm_ops` index exists on the column.
|
||||
*/
|
||||
export function trigramMatch(column: SQL, query: string | null | undefined): SQL {
|
||||
const q = (query ?? "").trim();
|
||||
if (!q) return sql`true`;
|
||||
// `%` is the trigram similarity operator. Threshold is GUC-controlled
|
||||
// (`pg_trgm.similarity_threshold`, default 0.3).
|
||||
return sql`${column} % ${q}`;
|
||||
}
|
||||
|
||||
/** Order-by fragment that ranks rows by similarity descending. */
|
||||
export function trigramRank(column: SQL, query: string | null | undefined): SQL {
|
||||
const q = (query ?? "").trim();
|
||||
if (!q) return sql`1`;
|
||||
return sql`similarity(${column}, ${q}) DESC`;
|
||||
}
|
||||
```
|
||||
|
||||
### Step 14: Run all web tests + commit
|
||||
|
||||
```bash
|
||||
NO_SUDO=1 scripts/dev.sh pnpm --filter @cmbot/web test
|
||||
NO_SUDO=1 scripts/dev.sh pnpm --filter @cmbot/web typecheck
|
||||
git add packages/db apps/web/src/lib pnpm-lock.yaml
|
||||
git -c commit.gpgsign=false commit -m "feat(db,web): pg_trgm + indexes + Postgres-backed cache and rate-limit"
|
||||
```
|
||||
|
||||
Subsequent tasks rely on these helpers:
|
||||
- **Task 12 (rate-limit middleware)** — uses `checkRateLimit('ip:' + ip, { max: 30, windowSec: 10 })` instead of the in-memory Map shown earlier.
|
||||
- **Task 14 (groups list)** — query uses `trigramMatch(whatsappGroups.name, query)` for fuzzy filter.
|
||||
- **Task 15 (reminders list)** — same pattern for `reminders.name`.
|
||||
|
||||
---
|
||||
|
||||
# Phase C — Foundation: layout, SSE, middleware
|
||||
|
||||
## Task 10: App shell layout (responsive nav)
|
||||
@ -1491,25 +1866,11 @@ git -c commit.gpgsign=false commit -m "feat(web): SSE endpoint + useEvents hook"
|
||||
|
||||
- [ ] **Step 1: Create `apps/web/src/middleware.ts`**
|
||||
|
||||
NOTE: Edge runtime middleware can't connect directly to Postgres (no `pg` driver). The path-blocking still happens here at the edge, but the rate-limit check is delegated to a tiny **Node-runtime** route handler that does the DB hit. Middleware sets a header so the handler knows which IP to bill. Most requests don't even reach the rate-limit check because they're page navigations, not API calls.
|
||||
|
||||
```typescript
|
||||
import { NextRequest, NextResponse } from "next/server";
|
||||
|
||||
const WINDOW_MS = 10_000;
|
||||
const MAX_REQUESTS = 30;
|
||||
|
||||
const buckets = new Map<string, { count: number; windowStart: number }>();
|
||||
|
||||
function isRateLimited(ip: string): boolean {
|
||||
const now = Date.now();
|
||||
const bucket = buckets.get(ip);
|
||||
if (!bucket || now - bucket.windowStart > WINDOW_MS) {
|
||||
buckets.set(ip, { count: 1, windowStart: now });
|
||||
return false;
|
||||
}
|
||||
bucket.count += 1;
|
||||
return bucket.count > MAX_REQUESTS;
|
||||
}
|
||||
|
||||
export function middleware(req: NextRequest) {
|
||||
const path = req.nextUrl.pathname;
|
||||
|
||||
@ -1518,16 +1879,6 @@ export function middleware(req: NextRequest) {
|
||||
return new NextResponse("Not Found", { status: 404 });
|
||||
}
|
||||
|
||||
// Rate limit by source IP. SSE endpoints are exempt (long-lived connection).
|
||||
if (path === "/api/events") return NextResponse.next();
|
||||
const ip =
|
||||
req.headers.get("x-forwarded-for")?.split(",")[0]?.trim() ??
|
||||
req.headers.get("x-real-ip") ??
|
||||
"unknown";
|
||||
if (isRateLimited(ip)) {
|
||||
return new NextResponse("Too Many Requests", { status: 429 });
|
||||
}
|
||||
|
||||
return NextResponse.next();
|
||||
}
|
||||
|
||||
@ -1536,6 +1887,22 @@ export const config = {
|
||||
};
|
||||
```
|
||||
|
||||
NOTE: rate limiting is enforced **inside Server Actions and route handlers** (Node runtime, has DB access) using the `checkRateLimit` helper from Task 9b. Each Server Action begins with:
|
||||
|
||||
```typescript
|
||||
import { checkRateLimit } from "@/lib/rate-limit";
|
||||
import { headers } from "next/headers";
|
||||
|
||||
async function rateLimit() {
|
||||
const h = await headers();
|
||||
const ip = h.get("x-forwarded-for")?.split(",")[0]?.trim() ?? "unknown";
|
||||
const r = await checkRateLimit(`ip:${ip}`, { max: 30, windowSec: 10 });
|
||||
if (r.limited) throw new Error("Too many requests");
|
||||
}
|
||||
```
|
||||
|
||||
Tasks 17 onward call `await rateLimit()` as the first line of every server action. This is more reliable than edge-side limiting and uses Postgres as the source of truth (no Redis).
|
||||
|
||||
- [ ] **Step 2: Commit**
|
||||
|
||||
```bash
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user