Jobs
A job is one typed background task, define it with job(), validate its payload with Zod, dispatch it from anywhere with queue.<name>.publish(payload), and run it on Postgres, Redis, or Cloudflare Queues without changing a line.
A job moves slow or unreliable work, sending email, indexing search, calling a third-party API, off the request path and into a queue a worker drains in the background. You declare a job once with job({ name, schema, handler }), drop the file in jobs/, and codegen wires it onto app.queue. From then on any collection hook, route, service, or even another job dispatches it with one typed call, queue.sendWelcomeEmail.publish(payload), and the payload is Zod-validated before the handler ever runs. Swap the adapter in .build() and the exact same job code runs on Postgres, Redis, or Cloudflare Queues.
What it does
- Run work in the background. Hand off a task and return immediately; a worker picks it up and runs the handler.
- Validate every payload. Each job carries a required Zod
schema; bad payloads are rejected at dispatch time, before they reach the handler. - Dispatch with full type safety.
queue.<name>.publish(payload)infers the payload type from the job's schema, a wrong shape is a compile error, not a runtime surprise. - Retry on failure. Set
retryLimit,retryDelay, andretryBackoffper job; the adapter re-runs failed jobs for you. - Schedule recurring work. Add
options.cronfor a declarative recurring job, or call.schedule(payload, cron)to register one at runtime. - Reach the full app context. Handlers receive
db,collections,email,search,queue, and every other service, a job can do anything a route can, including enqueue other jobs.
Quick start
A job is a file in jobs/ whose default export is a job({...}). Declare a durable name, a Zod schema for the payload, and a handler:
import { job } from "questpie";
import { z } from "zod";
export default job({
name: "send-welcome-email",
schema: z.object({
userId: z.string(),
}),
// `payload` is z.infer<typeof schema>; everything else is your full AppContext.
handler: async ({ payload, collections, email }) => {
const user = await collections.user.findOne({ where: { id: payload.userId } });
if (!user) return;
await email.send({
to: user.email,
subject: "Welcome!",
html: `<p>Hi ${user.name}, thanks for signing up.</p>`,
});
},
// Optional, full surface under "Full API" below.
options: {
retryLimit: 3,
retryDelay: 30, // seconds
retryBackoff: true,
},
});Provide a queue adapter once in .build(). Codegen fills jobs from your jobs/ files, you only supply the adapter:
import { pgBossAdapter } from "questpie/adapters/pg-boss";
export default questpie(/* modules */).build({
// `jobs` is filled by codegen from your jobs/ files, you only supply the adapter.
queue: { adapter: pgBossAdapter({ connectionString: env.DATABASE_URL }) },
});Run codegen so every discovered job becomes a typed method on app.queue:
questpie generate # registers the job, adds queue.sendWelcomeEmailNow dispatch it from any hook, route, or service, the call is fully typed off the job's schema:
collection("user").hooks({
afterChange: ({ data, operation, onAfterCommit, queue }) => {
if (operation !== "create") return;
// Defer the dispatch until the row is committed (see the callout below).
onAfterCommit(async () => {
await queue.sendWelcomeEmail.publish({ userId: data.id });
});
},
});That's the whole loop: define → wire an adapter → publish. A worker drains the queue (see Running a worker), and the full options surface follows below.
Import `job` from `questpie`, not `#questpie/factories`
Unlike collection(), job() takes no generated field types, so there's no #questpie/factories variant. job is exported from the questpie package root and, equivalently, from questpie/queue and questpie/services, import { job } from "questpie" is correct everywhere.
Don't dispatch directly from an after* hook
afterChange runs inside the CRUD transaction. If you call queue.x.publish(...) directly and the transaction later rolls back, you've enqueued work for a row that never landed, and on a single-connection database (PGlite in dev) it can deadlock against the open transaction. Wrap the dispatch in onAfterCommit(...) so it fires only after the write is durable. See Hooks.
Example → what you get
The job file above is the single source of truth. From it you get a typed dispatch surface and a fully typed handler argument:
// Typed dispatch, the payload type is inferred from `schema`:
const jobId = await app.queue.sendWelcomeEmail.publish({ userId: "usr_123" });
// ^? string | null ← backend job id, or null when the adapter has no stable id
await app.queue.sendWelcomeEmail.publish({ userId: 123 });
// ^ Type error: number is not assignable to string
// Inside the handler, `payload` is already schema-parsed:
// handler: async ({ payload }) => { payload.userId // string }publish() returns the backend job id (a string), or null when the adapter can't supply a stable id (the Cloudflare Queues binding form). It does not return the handler's result, jobs are fire-and-forget.
How the name maps to the dispatch key
A job declares a durable name (the string the queue backend stores), but you dispatch it through a camelCase registration key that codegen derives from the file/name:
job({ name: "send-welcome-email", /* ... */ });
// └── durable name (what the adapter stores)
queue.sendWelcomeEmail.publish({ userId });
// └── camelCase registration key (how you dispatch)When name is a string literal, the job is also reachable by its durable name as a bracket alias, handy in module code that only knows the queue name, or when the name has characters that can't be a JS identifier:
// Both refer to the same job:
await queue.sendWelcomeEmail.publish({ userId });
await queue["send-welcome-email"].publish({ userId });The bracket alias only exists when name is a literal (not widened to string). Internally every dispatch uses the durable name for the adapter operation, regardless of which key you access it through.
The handler
The handler receives one argument: the validated payload plus your full app context, flattened. Destructure what you need.
handler: async ({
payload, // schema-validated, Zod-parsed payload
locale, // locale resolved for this run, when set
db, collections, globals, queue, email, search, // ...the rest is your AppContext, realtime, kv, storage, services, logger, t, app, // the same services a route or hook gets
}) => {
// ...
};payloadis the schema's output type, already parsed, by the time your handler runs, the input has passedschema.parse().localeis the locale resolved for this run (when one was set on the dispatching context).- Everything else is the flat
AppContext,db,collections,globals,queue,email,search,realtime,kv,storage,services,logger,t,app,session. Because the handler gets the samequeueclient, a job can enqueue other jobs (fan out, chain, or re-queue).
`schema` is required, and the payload is validated twice
Every job must carry a Zod schema; it isn't optional. The payload is parsed at dispatch time (publish/schedule throw if it fails) and the framework re-validates the raw job data again before invoking your handler. So inside the handler, payload is always well-formed.
Dispatching
app.queue.<name> exposes three methods. All of them lazily start the adapter on first use and validate the payload against the job's schema.
publish(payload, options?)
Enqueue one run of the job. Returns the backend job id (a string), or null when the adapter has no stable id.
const jobId = await queue.sendWelcomeEmail.publish(
{ userId: "usr_123" },
{ priority: 10, singletonKey: "welcome:usr_123" }, // optional PublishOptions
);The optional second argument is PublishOptions, spread-merged over the job's own options ({ ...jobDef.options, ...publishOptions }, so call-time wins). Throws if the payload fails the schema.
schedule(payload, cron, options?)
Register a recurring run on a cron expression. Note cron is the second positional argument, not a field in options:
await queue.generateReport.schedule(
{ kind: "daily" },
"0 6 * * *", // every day at 06:00
);options here is Omit<PublishOptions, "startAfter">, startAfter is meaningless for a recurring schedule. This is the imperative scheduling path; the declarative one is options.cron on the job itself. Throws on adapters without scheduling support (Cloudflare Queues).
unschedule()
Cancel all scheduled occurrences for this job. Takes no arguments:
await queue.generateReport.unschedule();Throws on adapters without scheduling support.
Full API
job(definition), the definition shape
interface JobDefinition<TPayload, TResult = void, TName extends string = string> {
name: TName; // durable job name (use a string literal)
schema: z.ZodSchema<TPayload>; // required, validates the payload
handler: (args: JobHandlerArgs<TPayload>) => Promise<TResult>;
options?: {
priority?: number; // higher = more important
retryLimit?: number; // number of retry attempts
retryDelay?: number; // seconds between retries
retryBackoff?: boolean; // exponential backoff
expireInSeconds?: number; // job expiration (seconds)
startAfter?: number | string | Date; // delay the first run
cron?: string; // recurring schedule (declarative)
};
}job() itself is an identity function at runtime, it returns the definition object unchanged. All typing and validation live in the JobDefinition type and in the queue client at dispatch time.
options, per-job defaults
These are the defaults baked into the job. Each is optional. At dispatch time they're merged with PublishOptions via { ...jobDef.options, ...publishOptions }, so call-time options win.
| Option | Type | Effect |
|---|---|---|
priority | number | Higher values run before lower ones. |
retryLimit | number | How many times to retry a failed run. |
retryDelay | number | Seconds to wait between retries. |
retryBackoff | boolean | Use exponential backoff between retries. |
expireInSeconds | number | Drop the job if it hasn't run within this window. |
startAfter | number | string | Date | Delay the first run (seconds / ISO string / Date). |
cron | string | Register the job as recurring, see Recurring jobs. |
Adapter support varies, see the adapter comparison.
`retryDelay` is in seconds, not milliseconds
retryDelay, expireInSeconds, and the numeric form of startAfter are all in seconds. retryDelay: 30 means thirty seconds between retries, not thirty milliseconds. The BullMQ adapter multiplies retryDelay by 1000 internally; you still pass seconds.
PublishOptions
The per-call override surface for publish(). It's a superset of options minus cron, plus singletonKey:
interface PublishOptions {
priority?: number;
startAfter?: number | string | Date;
singletonKey?: string; // dedup, only one queued job may carry this key
retryLimit?: number;
retryDelay?: number; // seconds
retryBackoff?: boolean;
expireInSeconds?: number;
}singletonKey deduplicates: while a job with a given key is queued, a second publish with the same key is suppressed. It's native on pg-boss, maps to the BullMQ job id (dedup by id, not a true singleton lock), and is ignored by Cloudflare Queues. schedule() accepts Omit<PublishOptions, "startAfter">.
Recurring jobs
For a job that should run on a schedule, the recommended path is declarative: add options.cron to the definition. A worker registers it automatically on boot.
import { job } from "questpie";
import { z } from "zod";
export default job({
name: "purge-expired-sessions",
// A cron run is invoked with an EMPTY payload, the schema must accept {}.
schema: z.object({}),
handler: async ({ db, logger }) => {
// ...delete expired sessions...
logger.info("Purged expired sessions");
},
options: {
cron: "0 3 * * *", // every day at 03:00
},
});When a worker starts (queue.listen()), it calls registerSchedules(), which reads every job's options.cron and registers it as a recurring schedule.
A cron job's schema must accept an empty payload
registerSchedules() registers cron jobs by calling schema.parse({}), recurring runs carry no payload. If your schema requires fields, it throws "Job <name> has cron schedule but schema does not accept an empty payload." Use z.object({}), or make every field optional or defaulted.
`startAfter` is ignored for cron jobs
When a job is registered via options.cron, its cron and startAfter are stripped from the options handed to the adapter, startAfter has no meaning for a recurring schedule.
Choose options.cron (declarative, auto-registered by the worker) for fixed schedules baked into your app. Reach for queue.<name>.schedule(payload, cron) only when you need to register a schedule with a payload or set one up dynamically at runtime.
Running a worker
Defining a job and dispatching it only enqueues work, something has to drain the queue. QUESTPIE picks the runtime model from the adapter's capabilities, and you start the worker programmatically by importing your built app.
There is no `questpie worker` CLI command
Workers run programmatically, not via the CLI. The questpie CLI has commands for codegen, migrations, seeding, and deploy, but nothing for queues. Write a small entrypoint script (below) and run it with your runtime.
Long-running worker (Node / Bun)
For a persistent worker process, write an entrypoint that calls app.queue.listen() and run it as its own process:
import { app } from "#questpie";
await app.queue.listen({
teamSize: 5, // max concurrent jobs (pg-boss teamSize / BullMQ concurrency)
batchSize: 3, // jobs fetched per poll
});
console.log("Worker listening for jobs...");Run it, for example, with bun run src/worker.ts. listen():
- Calls
registerSchedules()first, so cron jobs get registered when the worker boots. - Installs
SIGINT/SIGTERMgraceful-shutdown handlers by default (shutdownTimeoutMsdefaults to 10000 ms; on timeout the process is force-exited). - Returns a handle whose
stop()tears down the signal handlers and the adapter.
It throws on push-only adapters (Cloudflare Queues). The helper startJobWorker(app.queue, options) is a thin wrapper over app.queue.listen(options) if you prefer that name.
Serverless / cron-tick (process one batch)
In a serverless function or a scheduled tick, you don't want a long-lived listener, you want to drain one bounded batch and exit. Use runOnce():
import { app } from "#questpie";
export async function handler() {
const { processed } = await app.queue.runOnce({ batchSize: 25 });
return { processed };
}runOnce() processes one batch and returns { processed } (the count). Pass jobs: [...] to restrict it to specific jobs, by registration key or durable name (it normalizes either). Default batchSize is 10. It throws on push-only adapters. The helper runJobWorkerOnce(app.queue, options) is the equivalent thin wrapper.
Push model (Cloudflare Queues)
Cloudflare Queues pushes batches to your worker rather than letting you poll. Wire createPushConsumer() to the platform's queue() entrypoint:
import { app } from "#questpie";
const consume = app.queue.createPushConsumer();
export default {
async queue(batch, _env, _ctx) {
await consume(batch as never);
},
};createPushConsumer() throws on pg-boss / BullMQ (they're poll-based, not push).
Detect capabilities before calling listen / runOnce / schedule
Each adapter advertises what it supports via app.queue.capabilities, longRunningConsumer, runOnceConsumer, pushConsumer, scheduling, singleton. If you target multiple runtimes from one codebase, check the relevant flag before calling a method; that's cheaper than catching the throw.
Adapters
You pick the queue backend by passing an adapter to .build({ queue: { adapter } }). Each adapter lives in its own subpath so its driver stays out of your bundle unless you import it.
| Adapter | Import | Backed by | Long-running | Run-once | Push | Scheduling | Singleton |
|---|---|---|---|---|---|---|---|
| pg-boss | questpie/adapters/pg-boss | Postgres | yes | yes | no | yes | yes |
| BullMQ | questpie/adapters/bullmq | Redis | yes | yes | no | yes | yes |
| Cloudflare Queues | questpie/adapters/cloudflare-queues | CF Queues | no | no | yes | no | no |
pg-boss (Postgres)
The simplest production setup, reuse your existing Postgres database, no extra infrastructure.
import { pgBossAdapter } from "questpie/adapters/pg-boss";
.build({
queue: { adapter: pgBossAdapter({ connectionString: env.DATABASE_URL }) },
});PgBossAdapterOptions is pg-boss's own ConstructorOptions, so options pass straight through to new PgBoss(...) (connectionString, db, schema, max, …). Queues are created on demand. Supports long-running workers, runOnce, scheduling, and native singletonKey.
BullMQ (Redis)
import { bullMQAdapter } from "questpie/adapters/bullmq";
.build({
queue: {
adapter: bullMQAdapter({
connection: { host: env.REDIS_HOST, port: env.REDIS_PORT },
}),
},
});Takes { connection, queuePrefix?, workerOptions? }. One BullMQ Worker per job name; queues and workers connect lazily (start() is a no-op). Option mapping: teamSize → concurrency, singletonKey → job id (dedup by id, not a true lock), retryLimit → attempts (retryLimit + 1), retryDelay → backoff delay in milliseconds, expireInSeconds → removeOnComplete: { age }.
Cloudflare Queues (push)
import { cloudflareQueuesAdapter } from "questpie/adapters/cloudflare-queues";
.build({
queue: { adapter: cloudflareQueuesAdapter({ queue: env.MY_QUEUE }) },
});Pass either a queue binding or an enqueue function (the constructor throws if you pass neither). Push-only: schedule() / unschedule() throw, use Cloudflare's platform cron triggers, and publish() returns null when using the binding form (send() has no stable id). startAfter and retryDelay map to a delaySeconds, capped at 24 hours.
An adapter is required once any job exists
If your app has at least one job but you don't supply queue: { adapter }, the queue service throws: "Queue adapter is required when jobs are defined. Provide adapter in .build({ queue: { adapter: ... } })." With no jobs at all, app.queue is an empty {}.
Extending: a custom adapter
The queue backend is open, QueueAdapter is the contract every adapter (built-in or yours) implements, so you can wire in any broker. Implement the required methods, declare your capabilities, and pass an instance to .build({ queue: { adapter } }):
import type { QueueAdapter } from "questpie/queue";
class MyAdapter implements QueueAdapter {
capabilities = {
longRunningConsumer: true,
runOnceConsumer: false,
pushConsumer: false,
scheduling: false,
singleton: false,
};
async start() { /* connect */ }
async stop() { /* disconnect */ }
async publish(jobName, payload, options) { /* enqueue */ return "job-id"; }
async schedule(jobName, cron, payload, options) { /* register cron */ }
async unschedule(jobName) { /* cancel cron */ }
on(event, handler) { /* "error" listener */ }
// Optional, presence drives capability fallbacks:
async listen(handlers, options) { /* long-running consumer */ }
async runOnce(handlers, options) { return { processed: 0 }; }
createPushConsumer(args) { return async (batch) => {}; }
}start / stop / publish / schedule / unschedule / on are required; listen / runOnce / createPushConsumer are optional, whether you implement each one drives the corresponding capability flag when capabilities leaves it unset. Each handler in the QueueHandlerMap is keyed by the durable job name and receives { id, data }; the framework re-validates data against the job's Zod schema before your user handler runs. This is the QUESTPIE principle in action: the three built-in adapters use the exact same QueueAdapter seam your custom adapter does, there is no privileged internal queue API.
Built-in job: index-records
QUESTPIE ships one real job out of the box, the canonical job() example. When a searchable collection changes, the search service enqueues index-records so indexing happens off the request path:
// Reachable like any other job, by its durable name (it has a literal name):
await app.queue["index-records"].publish({
items: [
{ collection: "posts", recordId: "123" },
{ collection: "posts", recordId: "456" },
],
});It's defined exactly like a job you'd write, job({ name: "index-records", schema, options: { retryLimit: 3, retryDelay: 30, retryBackoff: true }, handler }), and the search service auto-wires itself to queue["index-records"].publish when the job exists, so you rarely call it yourself. It's an on-demand job (no options.cron).
TypeScript
Job definitions are fully typed inline, payload is inferred from schema, and publish / schedule accept exactly that shape. You rarely need manual annotations.
To reference a job's payload type elsewhere, say, a helper that builds payloads, use InferJobPayload:
import type { InferJobPayload } from "questpie";
import type sendWelcomeEmail from "../jobs/send-welcome-email";
type WelcomePayload = InferJobPayload<typeof sendWelcomeEmail>;
// ^? { userId: string }InferJobPayload<T> extracts the payload from a JobDefinition (or the lighter QueueJobType marker) and resolves to never for anything else.
`publish` returns an id, not the handler's result
There's an InferJobResult<T> helper, but the handler's return value is not surfaced through the queue client, publish() resolves to the backend job id (string) or null, never TResult. Jobs are fire-and-forget; if you need a result, write it to the database from the handler and read it back.
To replace the handler context type globally (instead of the default flat AppContext), augment the Questpie.JobHandlerContext interface:
declare global {
namespace Questpie {
interface JobHandlerContext {
// Whatever you put here REPLACES (does not merge with) AppContext
// for every job handler.
}
}
}Leaving it empty keeps the default: the full AppContext (db, queue, email, storage, kv, collections, globals, services, t, session, …).
The public queue types, JobDefinition, JobHandlerArgs, PublishOptions, QueueClient, QueueJobClient, QueueAdapter, QueueAdapterCapabilities, InferJobPayload, WorkerOptions, and the rest, are all exported from questpie/queue.
Related
- Hooks, where most jobs are dispatched (
afterChange+onAfterCommit), and why deferring matters. - Collections, the data your jobs read and write via
ctx.collections. - Validation, the same Zod you use for a job
schemapowers collection validation. - Emails, the
emailservice a job uses to send mail off the request path. - Getting started, wire an adapter and run codegen in a fresh app.
- Runnable example:
examples/tanstack-barbershop, appointment reminders + asrc/worker.tslong-running worker.
Routes
Custom HTTP endpoints declared as files, chain a builder to get a typed handler, automatic input validation, access control, a nested typed client method, and OpenAPI/MCP metadata, all from one file.
Seeds
Seed initial, required, demo, or test data with fully typed app context, category filters, dependency ordering, undo handlers, and CLI tracking.