Queue
Background job processing with pg-boss, BullMQ, or Cloudflare Queues.
QUESTPIE queues use adapter implementations. pgBossAdapter stores jobs in PostgreSQL for long-running Node/Bun workers. bullMQAdapter stores jobs in Redis through BullMQ. cloudflareQueuesAdapter uses Cloudflare Queues push consumers for Workers.
Configuration
import { runtimeConfig } from "questpie/app";
import { pgBossAdapter } from "questpie/adapters/pg-boss";
export default runtimeConfig({
queue: {
adapter: pgBossAdapter({
connectionString: process.env.DATABASE_URL,
}),
},
});pg-boss creates its own schema in your database (pgboss.* tables) for job storage, scheduling, and dead-letter queues.
BullMQ
Use BullMQ when Redis is already part of the deployment and you want Redis-backed queue storage for Node/Bun workers:
import { runtimeConfig } from "questpie/app";
import { bullMQAdapter } from "questpie/adapters/bullmq";
export default runtimeConfig({
queue: {
adapter: bullMQAdapter({
connection: { url: process.env.REDIS_URL! },
queuePrefix: "my-app",
}),
},
});The built-in BullMQ adapter targets open-source BullMQ. It does not expose per-group FIFO semantics. If a workload requires one active job per customer, tenant, or aggregate while allowing different groups to run in parallel, use a queue adapter that has native group semantics instead of emulating groups with job IDs or one queue per group.
Defining Jobs
Jobs are defined in jobs/ using the file convention:
import { job } from "questpie/services";
import z from "zod";
export default job({
name: "send-email",
schema: z.object({
to: z.string().email(),
subject: z.string(),
body: z.string(),
}),
handler: async ({ payload, email, logger }) => {
await email.send({
to: payload.to,
subject: payload.subject,
html: payload.body,
});
logger.info({ to: payload.to }, "Email sent");
},
options: {
retryLimit: 3,
retryDelay: 5,
retryBackoff: true,
},
});See Jobs for the full job definition guide.
Publishing Jobs
From hooks, routes, or other jobs — use the typed queue context:
handler: async ({ queue }) => {
await queue.sendEmail.publish({
to: "user@example.com",
subject: "Welcome",
body: "<h1>Hello!</h1>",
});
};The queue object is fully typed — autocompletion shows all registered jobs and their payload schemas.
Running Queue Workers
Publishing enqueues the job in the configured queue backend. A job handler runs only when a worker consumes the queue:
import { app } from "#questpie";
await app.queue.listen({
teamSize: 5,
batchSize: 3,
});In production, run at least two process types against the same generated app and environment:
| Process | Responsibility |
|---|---|
| Web/API | Handles HTTP requests and publishes jobs |
| Worker | Calls app.queue.listen() and executes job handlers |
| Scheduler | Optional one-off process that registers cron jobs |
teamSize controls concurrent work inside one worker process. Scale horizontally by running more worker processes. QUESTPIE installs graceful shutdown handlers for SIGINT and SIGTERM by default, so workers can drain cleanly during deploys.
For serverless or cron-style environments, process a bounded batch instead:
import { app } from "#questpie";
const result = await app.queue.runOnce({
batchSize: 10,
jobs: ["sendEmail"],
});
await app.queue.stop();
console.log(`Processed ${result.processed} jobs`);Cloudflare Workers
Cloudflare uses push consumers. Configure cloudflareQueuesAdapter in runtimeConfig, then export a Worker with createCloudflareWorkerHandlers:
import { runtimeConfig } from "questpie/app";
import {
cloudflareQueuesAdapter,
type CloudflareQueueBinding,
} from "questpie/adapters/cloudflare";
async function getQueue(): Promise<CloudflareQueueBinding> {
const { env } = await import("cloudflare:workers");
return env.QUESTPIE_QUEUE as CloudflareQueueBinding;
}
export default runtimeConfig({
queue: {
adapter: cloudflareQueuesAdapter({ queue: getQueue }),
},
});import { app } from "#questpie";
import { createCloudflareWorkerHandlers } from "questpie/adapters/cloudflare";
export default createCloudflareWorkerHandlers(app, { basePath: "/api" });Cloudflare Queues call the exported queue() handler. You do not run app.queue.listen() on Workers, and pgBossAdapter is not a Cloudflare Worker adapter.
Job Options
| Option | Type | Default | Description |
|---|---|---|---|
retryLimit | number | 0 | Max retry attempts on failure |
retryDelay | number | 0 | Seconds between retries |
retryBackoff | boolean | false | Exponential backoff on retries |
expireInSeconds | number | 900 | Job expires if not completed |
startAfter | Date | string | now | Delay job start |
singletonKey | string | — | Prevent duplicate jobs with same key |
priority | number | 0 | Higher = processed first |
cron | string | — | Cron expression for recurring jobs |
Delayed Jobs
Use startAfter to schedule future execution:
await queue.sendReminder.publish(
{ appointmentId: "abc" },
{ startAfter: new Date("2026-05-01T09:00:00Z") },
);Recurring Jobs
Use options.cron for recurring work. app.queue.listen() registers cron schedules automatically before it starts workers:
import { job } from "questpie/services";
import z from "zod";
export default job({
name: "send-daily-digest",
schema: z.object({}),
options: {
cron: "0 8 * * *",
retryLimit: 3,
},
handler: async ({ services }) => {
await services.digest.sendDaily();
},
});Cron jobs must accept an empty payload because schedule registration validates the payload with schema.parse({}).
If you need a deploy step that only registers schedules, run:
import { app } from "#questpie";
await app.queue.registerSchedules();
await app.queue.stop();On Cloudflare, recurring jobs use Cron Triggers. Add matching cron expressions to wrangler.toml; the Cloudflare adapter's scheduled() handler publishes every QUESTPIE job whose options.cron equals the trigger cron.
Singleton Jobs
Prevent duplicate jobs with singletonKey:
await queue.syncInventory.publish(
{ warehouseId: "wh-1" },
{ singletonKey: "sync-wh-1" },
);If a job with the same singleton key is already queued/active, the publish is a no-op.
Monitoring
pg-boss stores job status in PostgreSQL. Query the pgboss.job table for monitoring:
SELECT state, COUNT(*) FROM pgboss.job GROUP BY state;States: created → active → completed / failed / expired
Related Pages
- Jobs — Defining job handlers
- Services — Reusable business logic used by jobs
- Email — Sending emails from jobs
- Config API — Queue adapter configuration