QUESTPIE
Adapters

Queue adapter

Pick where your background jobs run, Postgres (pg-boss), Redis (BullMQ), or Cloudflare Queues, by passing one adapter to .build({ queue }). The same job code runs on any of them.

The queue adapter is the slot that decides where your background work runs. You write a job once; the adapter is the plug-in backend that stores, schedules, and hands jobs to a worker. Swap pgBossAdapter for bullMQAdapter and the exact same job code moves from Postgres to Redis, no handler change, no dispatch change. QUESTPIE ships three adapters and an open QueueAdapter contract so you can write your own.

This page is the infrastructure reference: which adapter to choose, how each one is configured, the capability flags that gate listen / runOnce / schedule, and how to plug in a custom backend. For how to define and dispatch jobs (job(), queue.<name>.publish, the handler context), see Jobs.

Prerequisites

Read Jobs first, it owns job(), queue.<name>.publish/schedule, and the worker entrypoints. This page assumes you know how a job is defined and dispatched, and focuses only on the adapter that backs the queue. See also Configuration for the .build({ ... }) model.

What it does

  • Choose your backend with one line. Pass an adapter to .build({ queue: { adapter } }); everything above it, the typed app.queue client, dispatch, retries, scheduling, stays identical.
  • Stay portable. Job handlers and publish calls never name the backend. Moving from pg-boss in dev to BullMQ in production is an adapter swap, nothing else.
  • Run on Postgres, Redis, or Cloudflare. Reuse your existing database (pg-boss), a Redis instance (BullMQ), or a push-based serverless queue (Cloudflare Queues).
  • Advertise capabilities. Each adapter declares whether it supports long-running workers, one-batch draining, push delivery, cron scheduling, and singleton dedup, so the runtime can fail fast or branch.
  • Keep drivers out of your bundle. Each adapter lives at its own import subpath (questpie/adapters/pg-boss, …), so pg-boss / bullmq / Cloudflare bindings load only when you import them.
  • Extensible by contract. QueueAdapter is the same seam the three built-ins implement, bring any broker.

Quick start

Pick one adapter and pass it to .build(). Codegen fills jobs from your jobs/ files; you only supply the adapter.

src/questpie/server/config/app.ts
import { pgBossAdapter } from "questpie/adapters/pg-boss";

export default questpie(/* modules */).build({
  // `jobs` is filled by codegen from your jobs/ files, you only supply the adapter.
  queue: { adapter: pgBossAdapter({ connectionString: env.DATABASE_URL }) },
});

That single line wires app.queue to Postgres. Define a job, run questpie generate, and dispatch it as shown in Jobs, the adapter does the rest.

The adapter is required once any job exists

If your app has at least one job but no queue: { adapter }, the queue service throws: "Queue adapter is required when jobs are defined. Provide adapter in .build({ queue: { adapter: ... } })." With no jobs at all, app.queue is an empty {} and no adapter is needed.

Choosing an adapter

AdapterImportBacked byBest for
pg-bossquestpie/adapters/pg-bossPostgresThe default, reuse your app database, zero extra infrastructure.
BullMQquestpie/adapters/bullmqRedisHigh-throughput queues when you already run Redis.
Cloudflare Queuesquestpie/adapters/cloudflare-queuesCF QueuesServerless on Cloudflare Workers (push delivery).

Each adapter advertises what it can do through capability flags. These gate which worker methods you may call:

Capabilitypg-bossBullMQCloudflare Queues
longRunningConsumer (queue.listen())yesyesno
runOnceConsumer (queue.runOnce())yesyesno
pushConsumer (queue.createPushConsumer())nonoyes
scheduling (options.cron, queue.<n>.schedule())yesyesno
singleton (singletonKey dedup)yesyesno

Detect capabilities at runtime with `app.queue.capabilities`

The resolved flags are exposed on app.queue.capabilities, longRunningConsumer, runOnceConsumer, pushConsumer, scheduling, singleton. If one codebase targets multiple runtimes, check the relevant flag before calling listen / runOnce / schedule rather than catching the throw. When an adapter omits capabilities, the runtime infers them: longRunningConsumer/runOnceConsumer/pushConsumer default to "is the matching method implemented?", scheduling to "are both schedule and unschedule functions?", and singleton to false.

The consumer models

The adapter's capabilities decide how a worker drains the queue. The mechanics of starting a worker live in Jobs → Running a worker; here's how each model maps to the adapters:

  • Long-running (queue.listen()), a persistent process polls and runs jobs until stopped. pg-boss and BullMQ.
  • Run-once (queue.runOnce()), drain one bounded batch and exit, for a serverless function or scheduled tick. pg-boss and BullMQ.
  • Push (queue.createPushConsumer()), the platform delivers batches to your worker; you don't poll. Cloudflare Queues only.

There is no `questpie worker` CLI command

Workers start programmatically, you import your built app and call app.queue.listen() (long-running) or app.queue.runOnce() (serverless tick), or wire app.queue.createPushConsumer() to the platform's queue entrypoint. The questpie CLI has no queue command. The full entrypoint recipes are in Jobs → Running a worker.

pg-boss (Postgres)

The default and simplest production setup: jobs live in your existing Postgres database, so there's no extra service to run.

src/questpie/server/config/app.ts
import { pgBossAdapter } from "questpie/adapters/pg-boss";

.build({
  queue: { adapter: pgBossAdapter({ connectionString: env.DATABASE_URL }) },
});

PgBossAdapterOptions is pg-boss's own ConstructorOptions, a direct re-export, so every option passes straight through to new PgBoss(...):

import { pgBossAdapter } from "questpie/adapters/pg-boss";

pgBossAdapter({
  connectionString: env.DATABASE_URL, // or pass `db`, `host`/`port`/`user`/…
  schema: "pgboss",                    // dedicated schema for queue tables (pg-boss default)
  max: 10,                             // connection pool size
});

Behavior worth knowing:

  • Queues are created on demand. The adapter calls createQueue(jobName) the first time it touches a job and caches it, you never declare queues yourself.
  • PublishOptions maps ~1:1 to pg-boss send options (priority, startAfter, singletonKey, retryLimit, retryDelay, expireInSeconds), passed through directly.
  • Batches are processed serially. pg-boss always hands an array to its work() callback; the adapter iterates items one at a time and reports per-item failures via boss.fail(...) so a sibling failure doesn't sink the rest of the batch.
  • runOnce needs pg-boss fetch() support, it throws "PgBossAdapter.runOnce requires pg-boss fetch() support." if the installed pg-boss version doesn't expose it.

Supports long-running workers, runOnce, cron scheduling, and native singletonKey.

BullMQ (Redis)

Use BullMQ when you already run Redis and want a dedicated, high-throughput queue.

src/questpie/server/config/app.ts
import { bullMQAdapter } from "questpie/adapters/bullmq";

.build({
  queue: {
    adapter: bullMQAdapter({
      connection: { host: env.REDIS_HOST, port: env.REDIS_PORT },
    }),
  },
});

BullMQAdapterOptions:

interface BullMQAdapterOptions {
  connection: ConnectionOptions;   // bullmq/ioredis connection (required)
  queuePrefix?: string;            // namespace key prefix for queues/workers
  workerOptions?: Omit<WorkerOptions, "connection" | "prefix">; // extra bullmq Worker opts
}

Behavior worth knowing:

  • Lazy connect. start() is a no-op; queues and workers connect when first used.
  • One Worker per job name. listen() creates a BullMQ Worker for each job; teamSize maps to the worker's concurrency.
  • Option mapping (PublishOptions → BullMQ JobsOptions): startAfterdelay (ms), singletonKeyjobId (dedup by id, not a true singleton lock), retryLimitattempts (retryLimit + 1), retryDelaybackoff.delay in milliseconds (type: exponential when retryBackoff, else fixed), expireInSecondsremoveOnComplete: { age }, prioritypriority.
  • Scheduling uses BullMQ's repeatable jobs (repeat: { pattern: cron, key: "questpie:<jobName>" }); unschedule() removes every repeatable tied to that name.

`retryDelay` is still in seconds, even on BullMQ

You always pass retryDelay in seconds, matching every other adapter. The BullMQ adapter multiplies it by 1000 internally to get BullMQ's millisecond backoff.delay. Don't pre-multiply.

Supports long-running workers, runOnce, cron scheduling, and singletonKey (id-dedup).

Cloudflare Queues (push)

For QUESTPIE apps deployed to Cloudflare Workers. Cloudflare pushes batches to your worker, so there's no long-running listener and no in-app scheduling, those come from the platform.

src/questpie/server/config/app.ts
import { cloudflareQueuesAdapter } from "questpie/adapters/cloudflare-queues";

.build({
  queue: { adapter: cloudflareQueuesAdapter({ queue: env.MY_QUEUE }) },
});

CloudflareQueuesAdapterOptions, pass either queue or enqueue (the constructor throws if you pass neither):

interface CloudflareQueuesAdapterOptions {
  // A Cloudflare Queues producer binding (or a function returning one).
  queue?: CloudflareQueueBinding | (() => MaybePromise<CloudflareQueueBinding>);
  // ...or your own enqueue function (returns a message id, or null).
  enqueue?: (
    message: CloudflareQueueEnvelope,
    opts?: { delaySeconds?: number },
  ) => Promise<string | null>;
  // Decode a raw pushed body into an envelope (defaults to requiring a `jobName` string).
  decode?: (body: unknown) => CloudflareQueueEnvelope | null;
}

Wire the push consumer to your Worker's queue() entrypoint (full recipe in Jobs → Push model):

Cloudflare Worker entrypoint
import { app } from "#questpie";

const consume = app.queue.createPushConsumer();

export default {
  async queue(batch, _env, _ctx) {
    await consume(batch as never);
  },
};

Behavior worth knowing:

  • Push-only. schedule() / unschedule() throw, "…does not support cron scheduling. Use platform cron triggers." Use Cloudflare's Cron Triggers for recurring work, calling runOnce-style draining from a scheduled handler if needed.
  • publish() returns null with the queue binding form, because Cloudflare's send() exposes no stable message id.
  • Delays cap at 24 hours. startAfterdelaySeconds and retryDelay → retry delaySeconds, both throwing if they exceed 24h (or if startAfter is an invalid ISO string).
  • Retry handling. On a handler error the consumer calls message.retry(...). The message is only acked (dropped) when a retryLimit was published and attempts >= retryLimit + 1; with no retryLimit set, a failing message always retries. Undecodable messages and messages with no matching handler are acked and skipped.
  • runtime: "cloudflare" marker. The adapter carries this so QUESTPIE's runtime-compatibility check can verify a Cloudflare Workers deploy uses a push-based queue. A custom adapter targeting Workers must set runtime: "cloudflare" and implement createPushConsumer().

The QueueAdapter contract

Every adapter, built-in or custom, implements this interface. It's the single seam the queue client talks to:

interface QueueAdapter {
  capabilities?: Partial<QueueAdapterCapabilities>;

  // Required:
  start(): Promise<void>;
  stop(): Promise<void>;
  publish(jobName: string, payload: any, options?: PublishOptions): Promise<string | null>;
  schedule(jobName: string, cron: string, payload: any,
    options?: Omit<PublishOptions, "startAfter">): Promise<void>;
  unschedule(jobName: string): Promise<void>;
  on(event: "error", handler: (error: Error) => void): void;

  // Optional, presence drives capability fallbacks:
  listen?(handlers: QueueHandlerMap, options?: QueueListenOptions): Promise<void>;
  runOnce?(handlers: QueueHandlerMap, options?: QueueRunOnceOptions): Promise<QueueRunOnceResult>;
  createPushConsumer?(args: { handlers: QueueHandlerMap }): QueuePushConsumerHandler;
}

start / stop / publish / schedule / unschedule / on are required; listen / runOnce / createPushConsumer are optional, whether you implement each one determines the matching capability flag when capabilities leaves it unset (see the capability callout above).

Two contract details adapter authors rely on:

  • Handlers are keyed by the durable job name. The framework builds a QueueHandlerMap (Record<jobName, handler>) and passes it to listen / runOnce / createPushConsumer. Each handler receives a QueueJobRecord, { id: string; data: unknown }.
  • data is raw, the framework re-validates it. Adapters store and replay the raw payload; QUESTPIE re-parses data against the job's Zod schema before invoking the user handler. Your adapter never needs to validate. (See Jobs for the dispatch- and handler-time validation passes.)

Extending: a custom adapter

The queue backend is open. Implement QueueAdapter, declare your capabilities, and pass an instance to .build({ queue: { adapter } }), your adapter plugs into the exact same seam the three built-ins use, with no privileged internal API.

import type { QueueAdapter } from "questpie/queue";

class MyAdapter implements QueueAdapter {
  capabilities = {
    longRunningConsumer: true,
    runOnceConsumer: false,
    pushConsumer: false,
    scheduling: false,
    singleton: false,
  };

  async start() { /* connect to broker */ }
  async stop() { /* disconnect */ }
  async publish(jobName, payload, options) { /* enqueue */ return "job-id"; }
  async schedule(jobName, cron, payload, options) { /* register cron */ }
  async unschedule(jobName) { /* cancel cron */ }
  on(event, handler) { /* register an "error" listener */ }

  // Optional, implement what your broker supports:
  async listen(handlers, options) { /* long-running consumer */ }
  async runOnce(handlers, options) { return { processed: 0 }; }
  createPushConsumer(args) { return async (batch) => { /* drain a pushed batch */ }; }
}

// .build({ queue: { adapter: new MyAdapter() } })

This is the QUESTPIE principle in action: core, modules, and your own code reach the queue through one contract. The full "build your own adapter" loop, across every adapter kind, lives in Building a plugin.

TypeScript

The adapter contract types are public from questpie/queue; each adapter's option types come from its own subpath:

import type {
  QueueAdapter,
  QueueAdapterCapabilities,
  QueueHandlerMap,
  QueueJobRecord,
  QueuePushBatch,
} from "questpie/queue";

import type { PgBossAdapterOptions } from "questpie/adapters/pg-boss";
import type { BullMQAdapterOptions } from "questpie/adapters/bullmq";
import type {
  CloudflareQueuesAdapterOptions,
  CloudflareQueueEnvelope,
} from "questpie/adapters/cloudflare-queues";

questpie/queue re-exports the whole contract surface (QueueAdapter, the Queue* types, capabilities). The concrete adapters are deliberately not re-exported from questpie/queue, importing them from questpie/adapters/* keeps pg-boss / bullmq / Cloudflare bindings out of your bundle unless you use them. The job-facing types (JobDefinition, PublishOptions, QueueClient, InferJobPayload, …) are documented in Jobs → TypeScript.

  • Jobs, the feature this adapter serves: job(), dispatch, the handler context, and the worker entrypoints. Start here.
  • Configuration, the .build({ ... }) model and runtime-compatibility checks that read the adapter.
  • Building a plugin, implement QueueAdapter (and the other adapter contracts) for any backend.
  • Adapters, the full list of swappable backends (queue, search, realtime, storage, email, KV).

On this page