QUESTPIE

Queue

Background job processing with pg-boss, BullMQ, or Cloudflare Queues.

QUESTPIE queues use adapter implementations. pgBossAdapter stores jobs in PostgreSQL for long-running Node/Bun workers. bullMQAdapter stores jobs in Redis through BullMQ. cloudflareQueuesAdapter uses Cloudflare Queues push consumers for Workers.

Configuration

questpie.config.ts
import { runtimeConfig } from "questpie/app";
import { pgBossAdapter } from "questpie/adapters/pg-boss";

export default runtimeConfig({
	queue: {
		adapter: pgBossAdapter({
			connectionString: process.env.DATABASE_URL,
		}),
	},
});

pg-boss creates its own schema in your database (pgboss.* tables) for job storage, scheduling, and dead-letter queues.

BullMQ

Use BullMQ when Redis is already part of the deployment and you want Redis-backed queue storage for Node/Bun workers:

questpie.config.ts
import { runtimeConfig } from "questpie/app";
import { bullMQAdapter } from "questpie/adapters/bullmq";

export default runtimeConfig({
	queue: {
		adapter: bullMQAdapter({
			connection: { url: process.env.REDIS_URL! },
			queuePrefix: "my-app",
		}),
	},
});

The built-in BullMQ adapter targets open-source BullMQ. It does not expose per-group FIFO semantics. If a workload requires one active job per customer, tenant, or aggregate while allowing different groups to run in parallel, use a queue adapter that has native group semantics instead of emulating groups with job IDs or one queue per group.

Defining Jobs

Jobs are defined in jobs/ using the file convention:

src/questpie/server/jobs/send-email.ts
import { job } from "questpie/services";
import z from "zod";

export default job({
	name: "send-email",
	schema: z.object({
		to: z.string().email(),
		subject: z.string(),
		body: z.string(),
	}),
	handler: async ({ payload, email, logger }) => {
		await email.send({
			to: payload.to,
			subject: payload.subject,
			html: payload.body,
		});
		logger.info({ to: payload.to }, "Email sent");
	},
	options: {
		retryLimit: 3,
		retryDelay: 5,
		retryBackoff: true,
	},
});

See Jobs for the full job definition guide.

Publishing Jobs

From hooks, routes, or other jobs — use the typed queue context:

handler: async ({ queue }) => {
	await queue.sendEmail.publish({
		to: "user@example.com",
		subject: "Welcome",
		body: "<h1>Hello!</h1>",
	});
};

The queue object is fully typed — autocompletion shows all registered jobs and their payload schemas.

Running Queue Workers

Publishing enqueues the job in the configured queue backend. A job handler runs only when a worker consumes the queue:

src/worker.ts
import { app } from "#questpie";

await app.queue.listen({
	teamSize: 5,
	batchSize: 3,
});

In production, run at least two process types against the same generated app and environment:

ProcessResponsibility
Web/APIHandles HTTP requests and publishes jobs
WorkerCalls app.queue.listen() and executes job handlers
SchedulerOptional one-off process that registers cron jobs

teamSize controls concurrent work inside one worker process. Scale horizontally by running more worker processes. QUESTPIE installs graceful shutdown handlers for SIGINT and SIGTERM by default, so workers can drain cleanly during deploys.

For serverless or cron-style environments, process a bounded batch instead:

scripts/process-queue-once.ts
import { app } from "#questpie";

const result = await app.queue.runOnce({
	batchSize: 10,
	jobs: ["sendEmail"],
});

await app.queue.stop();

console.log(`Processed ${result.processed} jobs`);

Cloudflare Workers

Cloudflare uses push consumers. Configure cloudflareQueuesAdapter in runtimeConfig, then export a Worker with createCloudflareWorkerHandlers:

questpie.config.ts
import { runtimeConfig } from "questpie/app";
import {
	cloudflareQueuesAdapter,
	type CloudflareQueueBinding,
} from "questpie/adapters/cloudflare";

async function getQueue(): Promise<CloudflareQueueBinding> {
	const { env } = await import("cloudflare:workers");
	return env.QUESTPIE_QUEUE as CloudflareQueueBinding;
}

export default runtimeConfig({
	queue: {
		adapter: cloudflareQueuesAdapter({ queue: getQueue }),
	},
});
worker.ts
import { app } from "#questpie";
import { createCloudflareWorkerHandlers } from "questpie/adapters/cloudflare";

export default createCloudflareWorkerHandlers(app, { basePath: "/api" });

Cloudflare Queues call the exported queue() handler. You do not run app.queue.listen() on Workers, and pgBossAdapter is not a Cloudflare Worker adapter.

Job Options

OptionTypeDefaultDescription
retryLimitnumber0Max retry attempts on failure
retryDelaynumber0Seconds between retries
retryBackoffbooleanfalseExponential backoff on retries
expireInSecondsnumber900Job expires if not completed
startAfterDate | stringnowDelay job start
singletonKeystringPrevent duplicate jobs with same key
prioritynumber0Higher = processed first
cronstringCron expression for recurring jobs

Delayed Jobs

Use startAfter to schedule future execution:

await queue.sendReminder.publish(
	{ appointmentId: "abc" },
	{ startAfter: new Date("2026-05-01T09:00:00Z") },
);

Recurring Jobs

Use options.cron for recurring work. app.queue.listen() registers cron schedules automatically before it starts workers:

src/questpie/server/jobs/send-daily-digest.ts
import { job } from "questpie/services";
import z from "zod";

export default job({
	name: "send-daily-digest",
	schema: z.object({}),
	options: {
		cron: "0 8 * * *",
		retryLimit: 3,
	},
	handler: async ({ services }) => {
		await services.digest.sendDaily();
	},
});

Cron jobs must accept an empty payload because schedule registration validates the payload with schema.parse({}).

If you need a deploy step that only registers schedules, run:

scripts/register-schedules.ts
import { app } from "#questpie";

await app.queue.registerSchedules();
await app.queue.stop();

On Cloudflare, recurring jobs use Cron Triggers. Add matching cron expressions to wrangler.toml; the Cloudflare adapter's scheduled() handler publishes every QUESTPIE job whose options.cron equals the trigger cron.

Singleton Jobs

Prevent duplicate jobs with singletonKey:

await queue.syncInventory.publish(
	{ warehouseId: "wh-1" },
	{ singletonKey: "sync-wh-1" },
);

If a job with the same singleton key is already queued/active, the publish is a no-op.

Monitoring

pg-boss stores job status in PostgreSQL. Query the pgboss.job table for monitoring:

SELECT state, COUNT(*) FROM pgboss.job GROUP BY state;

States: createdactivecompleted / failed / expired

  • Jobs — Defining job handlers
  • Services — Reusable business logic used by jobs
  • Email — Sending emails from jobs
  • Config API — Queue adapter configuration

On this page