QUESTPIE
Extend the PlatformCustom Adapters

Queue Adapter

Implement a custom job queue adapter for QUESTPIE.

Queue adapters power background jobs, scheduled/recurring work, and serverless push consumers. The built-in PgBossAdapter uses PostgreSQL for everything; you might want RabbitMQ, SQS, BullMQ, or Cloudflare Queues instead.

Interface

export interface QueueAdapter {
	capabilities?: Partial<QueueAdapterCapabilities>;

	start(): Promise<void>;
	stop(): Promise<void>;

	publish(
		jobName: string,
		payload: any,
		options?: PublishOptions,
	): Promise<string | null>;

	schedule(
		jobName: string,
		cron: string,
		payload: any,
		options?: Omit<PublishOptions, "startAfter">,
	): Promise<void>;

	unschedule(jobName: string): Promise<void>;

	listen?(
		handlers: QueueHandlerMap,
		options?: QueueListenOptions,
	): Promise<void>;

	runOnce?(
		handlers: QueueHandlerMap,
		options?: QueueRunOnceOptions,
	): Promise<QueueRunOnceResult>;

	createPushConsumer?(
		args: QueuePushConsumerFactoryArgs,
	): QueuePushConsumerHandler;

	on(event: "error", handler: (error: Error) => void): void;
}

Capabilities

Declare what your adapter supports so QUESTPIE knows which runtime paths are available:

export interface QueueAdapterCapabilities {
	longRunningConsumer: boolean; // supports listen()
	runOnceConsumer: boolean;     // supports runOnce()
	pushConsumer: boolean;        // supports createPushConsumer()
	scheduling: boolean;          // supports schedule/unschedule
	singleton: boolean;           // supports singleton jobs
}

Required methods

MethodDescription
start()Connect to the underlying provider. Called once at app startup.
stop()Disconnect cleanly. Called during graceful shutdown.
publish(jobName, payload, options?)Enqueue a job immediately. Return the job ID or null.
schedule(jobName, cron, payload, options?)Register a recurring job with a cron expression.
unschedule(jobName)Remove a previously scheduled recurring job.
on("error", handler)Surface provider errors to QUESTPIE's error handling.

Optional: consumer styles

Pick the consumer API that matches your runtime:

listen(handlers, options?) -- long-running worker

A persistent worker process that polls or subscribes for jobs. Use this for traditional server deployments.

listen?(
	handlers: QueueHandlerMap,
	options?: QueueListenOptions, // { teamSize?, batchSize? }
): Promise<void>;

runOnce(handlers, options?) -- bounded batch

Process one bounded batch of jobs and return. Perfect for serverless cron ticks or CI pipelines.

runOnce?(
	handlers: QueueHandlerMap,
	options?: QueueRunOnceOptions, // { batchSize?, jobs? }
): Promise<QueueRunOnceResult>;  // { processed: number }

createPushConsumer(args) -- push-based

The provider pushes batches to your runtime (e.g. Cloudflare Queues, AWS SQS with Lambda triggers). You return a handler function.

createPushConsumer?(
	args: QueuePushConsumerFactoryArgs,
): QueuePushConsumerHandler;

Supporting types

export type QueueHandlerMap = Record<string, QueueJobHandler>;
export type QueueJobHandler = (job: QueueJobRecord) => Promise<void>;

export interface QueueJobRecord {
	id: string;
	data: unknown;
}

Minimal example

my-queue-adapter.ts
import type {
	PublishOptions,
	QueueAdapter,
	QueueHandlerMap,
	QueueRunOnceOptions,
	QueueRunOnceResult,
} from "questpie/server";

export class InMemoryQueueAdapter implements QueueAdapter {
	capabilities = {
		longRunningConsumer: false,
		runOnceConsumer: true,
		pushConsumer: false,
		scheduling: true,
		singleton: false,
	} as const;

	private jobs: Array<{ id: string; name: string; payload: unknown }> = [];
	private errorHandlers = new Set<(error: Error) => void>();

	async start(): Promise<void> {}

	async stop(): Promise<void> {
		this.jobs = [];
	}

	async publish(
		jobName: string,
		payload: unknown,
		_options?: PublishOptions,
	): Promise<string> {
		const id = crypto.randomUUID();
		this.jobs.push({ id, name: jobName, payload });
		return id;
	}

	async schedule(): Promise<void> {
		// Wire up your provider's cron registration API here.
	}

	async unschedule(): Promise<void> {
		// Wire up your provider's cron removal API here.
	}

	async runOnce(
		handlers: QueueHandlerMap,
		options?: QueueRunOnceOptions,
	): Promise<QueueRunOnceResult> {
		const max = options?.batchSize ?? 10;
		let processed = 0;

		for (const job of this.jobs.slice(0, max)) {
			const handler = handlers[job.name];
			if (!handler) continue;

			try {
				await handler({ id: job.id, data: job.payload });
				processed += 1;
			} catch (error) {
				const normalized =
					error instanceof Error ? error : new Error(String(error));
				for (const onError of this.errorHandlers) onError(normalized);
			}
		}

		this.jobs = this.jobs.slice(processed);
		return { processed };
	}

	on(event: "error", handler: (error: Error) => void): void {
		if (event === "error") this.errorHandlers.add(handler);
	}
}

Registration

questpie.config.ts
import { config } from "questpie";
import { InMemoryQueueAdapter } from "./my-queue-adapter";

export default config({
	// ...
	queue: {
		adapter: new InMemoryQueueAdapter(),
		jobs: {
			// your job definitions...
		},
	},
});

The queue config requires both an adapter and a jobs map. Jobs are defined separately -- see the Queue guide for details.

Testing tips

  • Start by testing publish() + runOnce() before implementing scheduling.
  • Verify unknown job names are silently skipped or reported the way you expect.
  • Test error propagation through on("error").
  • If you support scheduling, confirm repeated deployments do not duplicate cron registrations.
  • Test stop() actually cleans up connections -- leaked connections cause flaky CI.

Reference implementations

On this page