Queue Adapter
Implement a custom job queue adapter for QUESTPIE.
Queue adapters power background jobs, scheduled/recurring work, and serverless push consumers. The built-in PgBossAdapter uses PostgreSQL for everything; you might want RabbitMQ, SQS, BullMQ, or Cloudflare Queues instead.
Interface
export interface QueueAdapter {
capabilities?: Partial<QueueAdapterCapabilities>;
start(): Promise<void>;
stop(): Promise<void>;
publish(
jobName: string,
payload: any,
options?: PublishOptions,
): Promise<string | null>;
schedule(
jobName: string,
cron: string,
payload: any,
options?: Omit<PublishOptions, "startAfter">,
): Promise<void>;
unschedule(jobName: string): Promise<void>;
listen?(
handlers: QueueHandlerMap,
options?: QueueListenOptions,
): Promise<void>;
runOnce?(
handlers: QueueHandlerMap,
options?: QueueRunOnceOptions,
): Promise<QueueRunOnceResult>;
createPushConsumer?(
args: QueuePushConsumerFactoryArgs,
): QueuePushConsumerHandler;
on(event: "error", handler: (error: Error) => void): void;
}Capabilities
Declare what your adapter supports so QUESTPIE knows which runtime paths are available:
export interface QueueAdapterCapabilities {
longRunningConsumer: boolean; // supports listen()
runOnceConsumer: boolean; // supports runOnce()
pushConsumer: boolean; // supports createPushConsumer()
scheduling: boolean; // supports schedule/unschedule
singleton: boolean; // supports singleton jobs
}Required methods
| Method | Description |
|---|---|
start() | Connect to the underlying provider. Called once at app startup. |
stop() | Disconnect cleanly. Called during graceful shutdown. |
publish(jobName, payload, options?) | Enqueue a job immediately. Return the job ID or null. |
schedule(jobName, cron, payload, options?) | Register a recurring job with a cron expression. |
unschedule(jobName) | Remove a previously scheduled recurring job. |
on("error", handler) | Surface provider errors to QUESTPIE's error handling. |
Optional: consumer styles
Pick the consumer API that matches your runtime:
listen(handlers, options?) -- long-running worker
A persistent worker process that polls or subscribes for jobs. Use this for traditional server deployments.
listen?(
handlers: QueueHandlerMap,
options?: QueueListenOptions, // { teamSize?, batchSize? }
): Promise<void>;runOnce(handlers, options?) -- bounded batch
Process one bounded batch of jobs and return. Perfect for serverless cron ticks or CI pipelines.
runOnce?(
handlers: QueueHandlerMap,
options?: QueueRunOnceOptions, // { batchSize?, jobs? }
): Promise<QueueRunOnceResult>; // { processed: number }createPushConsumer(args) -- push-based
The provider pushes batches to your runtime (e.g. Cloudflare Queues, AWS SQS with Lambda triggers). You return a handler function.
createPushConsumer?(
args: QueuePushConsumerFactoryArgs,
): QueuePushConsumerHandler;Supporting types
export type QueueHandlerMap = Record<string, QueueJobHandler>;
export type QueueJobHandler = (job: QueueJobRecord) => Promise<void>;
export interface QueueJobRecord {
id: string;
data: unknown;
}Minimal example
import type {
PublishOptions,
QueueAdapter,
QueueHandlerMap,
QueueRunOnceOptions,
QueueRunOnceResult,
} from "questpie/server";
export class InMemoryQueueAdapter implements QueueAdapter {
capabilities = {
longRunningConsumer: false,
runOnceConsumer: true,
pushConsumer: false,
scheduling: true,
singleton: false,
} as const;
private jobs: Array<{ id: string; name: string; payload: unknown }> = [];
private errorHandlers = new Set<(error: Error) => void>();
async start(): Promise<void> {}
async stop(): Promise<void> {
this.jobs = [];
}
async publish(
jobName: string,
payload: unknown,
_options?: PublishOptions,
): Promise<string> {
const id = crypto.randomUUID();
this.jobs.push({ id, name: jobName, payload });
return id;
}
async schedule(): Promise<void> {
// Wire up your provider's cron registration API here.
}
async unschedule(): Promise<void> {
// Wire up your provider's cron removal API here.
}
async runOnce(
handlers: QueueHandlerMap,
options?: QueueRunOnceOptions,
): Promise<QueueRunOnceResult> {
const max = options?.batchSize ?? 10;
let processed = 0;
for (const job of this.jobs.slice(0, max)) {
const handler = handlers[job.name];
if (!handler) continue;
try {
await handler({ id: job.id, data: job.payload });
processed += 1;
} catch (error) {
const normalized =
error instanceof Error ? error : new Error(String(error));
for (const onError of this.errorHandlers) onError(normalized);
}
}
this.jobs = this.jobs.slice(processed);
return { processed };
}
on(event: "error", handler: (error: Error) => void): void {
if (event === "error") this.errorHandlers.add(handler);
}
}Registration
import { config } from "questpie";
import { InMemoryQueueAdapter } from "./my-queue-adapter";
export default config({
// ...
queue: {
adapter: new InMemoryQueueAdapter(),
jobs: {
// your job definitions...
},
},
});The queue config requires both an adapter and a jobs map. Jobs are defined separately -- see the Queue guide for details.
Testing tips
- Start by testing
publish()+runOnce()before implementing scheduling. - Verify unknown job names are silently skipped or reported the way you expect.
- Test error propagation through
on("error"). - If you support scheduling, confirm repeated deployments do not duplicate cron registrations.
- Test
stop()actually cleans up connections -- leaked connections cause flaky CI.
Reference implementations
- PgBossAdapter -- PostgreSQL-based, supports long-running consumers, runOnce, scheduling, and singleton jobs
- CloudflareQueuesAdapter -- push-based queue for Cloudflare Workers
- QueueAdapter interface source