Realtime
Subscribe to a live query and get a fresh, access-controlled snapshot every time a matching record changes, typed exactly like find(), pushed over one multiplexed SSE connection, with one server-side adapter to wire.
Realtime turns any read into a live subscription. Call client.collections.posts.live({ where: { published: true } }, onSnapshot) and your callback fires with the full find() result immediately, then again whenever a matching post is created, updated, or deleted. The snapshot is the exact same shape and type as find(), relations, pagination envelope, and all, re-run server-side through the collection's access rules. Many subscriptions share one SSE connection, and you turn the whole thing on with a single realtime adapter in your config.
What it does
- Live queries, fully typed.
live()/liveIter()pushfind()-shaped snapshots; the snapshot type is inferred from your query options, sosnapshot.docs[0].titleis type-checked end to end. - One snapshot now, then on every change. The first snapshot arrives immediately on connect (no separate initial fetch); later snapshots arrive whenever a record matching your
where/with/orderBychanges. - Access-controlled at the source. Each snapshot is the result of re-running your query server-side under the subscriber's session, realtime never leaks rows the user can't read.
- One connection for many topics. A single multiplexer holds one
POST /realtimeSSE stream and fans out to every active subscription, sidestepping the browser's 6-connections-per-domain limit. { realtime: true }on TanStack Query. Opt afind/count/getquery into live mode with one flag and the hook streams instead of one-shot fetching.- Pluggable transport. Default polling needs zero setup; add
pgNotifyAdapter()orredisStreamsAdapter({ client })for push-based, horizontally-scalable delivery.
Quick start
Realtime works on the client out of the box, createClient() always wires a realtime API. The one thing you must turn on is the server side: set realtime in your runtime config so the app records changes and serves the SSE stream.
import { runtimeConfig } from "questpie/app";
import env from "./env"; // declared + validated in env.ts, see Environment
export default runtimeConfig({
db: { url: env.DATABASE_URL },
realtime: true,
});Now subscribe from the client. live() takes your query options, a snapshot callback, and returns an unsubscribe function:
import { client } from "@/lib/client.js";
// First snapshot fires immediately; then on every matching change.
const unsubscribe = client.collections.posts.live(
{ where: { published: true }, orderBy: { createdAt: "desc" }, limit: 10 },
(snapshot) => {
// snapshot is the SAME shape & type as client.collections.posts.find(...)
console.log(snapshot.totalDocs, snapshot.docs[0]?.title);
},
);
// Later, stop listening (e.g. on unmount):
unsubscribe();That callback now re-fires with a fresh, access-filtered top-10 every time a published post changes.
The first snapshot is your initial data
You do not need a separate find() before subscribing. live() delivers a full snapshot the moment the connection opens, then keeps it fresh. Treat the first callback invocation as "data loaded".
Example → inferred snapshot
The snapshot type equals the find() result type for the same options, FindResult<Row, Relations, TQuery>. Narrow with columns? No: live() deliberately carries a subset of find() options (no columns), so a snapshot is always the full row plus any with relations you requested:
client.collections.posts.live(
{ where: { published: true }, with: { author: true }, limit: 5 },
(snapshot) => {
snapshot.docs[0].author.name;
// ^? string, `with: { author: true }` hydrated the relation
snapshot.hasNextPage;
// ^? boolean, full PaginatedResult envelope, same as find()
},
);A snapshot is a PaginatedResult<T> ({ docs, totalDocs, totalPages, page, hasPrevPage, hasNextPage, prevPage, nextPage, ... }), or, if your query uses groupBy… it can't, because groupBy isn't a live option (see below).
Live query options
live() and liveIter() accept LiveQueryOptions, the slice of find() options that the realtime wire protocol carries as topic fields:
type LiveQueryOptions<TSelect, TRelations> = {
where?: Where<TSelect, TRelations>;
with?: With<TRelations>;
limit?: number;
offset?: number;
orderBy?: OrderBy<TSelect>;
locale?: string;
};`columns`, `groupBy`, `search`, `includeDeleted`, `stage` are not live options
LiveQueryOptions intentionally excludes columns, groupBy, search, includeDeleted, and stage. Realtime snapshots are the full query re-run server-side from the topic fields, so column projection / grouping / soft-delete-included / stage-pinned reads aren't expressed over the wire. If you pass them they're simply not part of the subscription. For those, use a one-shot find().
LiveSubscribeOptions, the third argument
live() takes an optional third argument to control the subscription lifecycle:
type LiveSubscribeOptions = {
signal?: AbortSignal; // abort → auto-unsubscribe
onError?: (error: Error) => void; // SSE connection failed
};const ac = new AbortController();
client.collections.posts.live(
{ where: { published: true } },
(snapshot) => render(snapshot),
{
signal: ac.signal,
onError: (err) => console.error("realtime dropped:", err),
},
);
// ac.abort() unsubscribes, equivalent to calling the returned function.signal unsubscribes on abort (so you can wire it to a component's lifetime). onError fires when the underlying SSE connection fails, wire it so a dropped connection surfaces instead of leaving your UI stuck on stale data.
liveIter(), the async-iterable form
liveIter() is live() as an AsyncGenerator. Same snapshots, consumed with for await. Built for workers, agents, scripts, and tests rather than React effects:
const ac = new AbortController();
for await (const snapshot of client.collections.posts.liveIter(
{ where: { published: true }, limit: 10 },
{ signal: ac.signal },
)) {
console.log("posts now:", snapshot.totalDocs);
// ac.abort() ends the loop.
}It takes only { signal? }, no onError (unlike live()). A connection failure makes the generator throw, so wrap the loop in try/catch if you need to handle drops. Terminate by aborting the signal.
Globals
Globals are singletons, so their live API mirrors get() with a smaller option set. GlobalLiveQueryOptions is { with?, locale? } only, no where/limit/offset/orderBy:
const stop = client.globals.siteSettings.live(
{ with: { logo: true } },
(snapshot) => {
// snapshot is the SAME shape & type as client.globals.siteSettings.get(...)
document.title = snapshot.title;
},
);live() and liveIter() carry the same LiveSubscribeOptions / { signal? } semantics as collections.
Realtime with TanStack Query
If you use the @questpie/tanstack-query bindings, you don't reach for live() directly, you pass { realtime: true } as the second argument to a query builder, and the hook streams instead of fetching once. This works on exactly three reads: collections.<name>.find, collections.<name>.count, and globals.<name>.get.
import { useQuery } from "@tanstack/react-query";
import { q } from "@/lib/query.js";
function LivePosts() {
const { data } = useQuery(
// 1st arg: query options. 2nd arg: { realtime: true }.
q.collections.posts.find({ where: { published: true }, limit: 10 }, { realtime: true }),
);
return <p>{data?.totalDocs ?? 0} published posts (live)</p>;
}Under the hood the hook uses TanStack's experimental_streamedQuery with refetchMode: "append", seeding from find() and then replacing the result with each pushed snapshot. The query key is identical to the non-realtime version, so the same cache entry is reused.
A live count reads `totalDocs` off the snapshot
q.collections.posts.count(opts, { realtime: true }) streams the full collection snapshot and extracts totalDocs, so your count stays live without a dedicated count subscription.
`{ realtime: true }` only exists on find / count / get
findOne and every mutation reject a { realtime: true } argument at the type level, passing it is a compile error (the tests assert @ts-expect-error). There's no live findOne; subscribe with find({ where: { id } }) (or live()) and read docs[0].
If the server has no realtime, `{ realtime: true }` falls back to a stuck stream
The client always wires client.realtime, so the TanStack flag is honored client-side regardless. The real prerequisite is server-side realtime: set realtime in your config so POST /realtime exists. Without it the SSE connect fails, the stream errors out (and find()/get()/live()'s onError fires) rather than silently degrading to a poll. Turn realtime on in the server config before relying on live queries.
The TanStack realtime topic is derived purely from your read options via buildCollectionTopic / buildGlobalTopic, config-level locale/stage are not added to the topic, only options.locale is. Two subscriptions that differ only by where get distinct live streams.
Server-side: turning realtime on
Everything above is the client API. None of it works until you turn realtime on in the server config, that's the one server requirement, and you've already seen it in Quick start: set realtime in runtimeConfig and the app records every change to an outbox and serves the POST /realtime SSE stream. With no adapter the service polls that outbox (or auto-wires pg_notify on a Postgres app); add pgNotifyAdapter() or redisStreamsAdapter({ client }) to push changes the instant they happen, across every horizontally-scaled instance.
The full server surface, RealtimeConfig options, the outbox/poll model, every transport adapter (pgNotifyAdapter, redisStreamsAdapter, cloudflareRealtimeAdapter), and building your own, lives on one page: see Realtime adapter.
One connection, by design, don't open your own SSE per query
On the client side, every live() / liveIter() call (and the TanStack flag) shares one multiplexed POST /realtime SSE connection, 50 subscriptions cost one HTTP connection, sidestepping the browser's ~6-per-domain cap over HTTP/1.1. Always subscribe through the typed wrappers (or client.realtime) so they share that connection rather than opening your own stream.
Raw realtime, client.realtime
The typed wrappers are the recommended path. If you need to subscribe to a hand-built topic (e.g. a generic snapshot consumer in the admin), the raw RealtimeAPI is exposed as client.realtime:
import { buildCollectionTopic } from "questpie/client";
// Typed wrapper (preferred), TData inferred, topic built for you:
const stop = client.collections.posts.live({ where: { published: true } }, onSnap);
// Raw equivalent, you build the topic and supply TData yourself:
const stopRaw = client.realtime.subscribe<MySnapshot>(
buildCollectionTopic("posts", { where: { published: true } }),
(data) => onSnap(data),
/* signal */ undefined,
/* customId */ undefined,
/* onError */ (err) => console.error(err),
);RealtimeAPI is { subscribe<TData>(topic, callback, signal?, customId?, onError?) => unsubscribe; stream<TData>(topic, signal?, customId?) => AsyncGenerator; destroy(); topicCount; subscriberCount }. subscribe is the callback form, stream the async-iterable form. destroy() tears the multiplexer down entirely (it re-creates on the next subscribe). Prefer the typed wrappers, they infer TData from the query and can't drift from the topic shape.
buildCollectionTopic, buildGlobalTopic, and the RealtimeAPI / TopicConfig / TopicInput types are re-exported from questpie/client. The multiplexer, createRealtimeAPI, and sseSnapshotStream are internal (not part of the public questpie/client surface), you reach them only through client.realtime.
Localization
Pass locale in the live options to subscribe to a specific localized variant, it becomes part of the topic, so the en and de views of the same query are independent subscriptions:
client.collections.posts.live({ where: { published: true }, locale: "de" }, onSnap);setLocale() on the client mutates request headers for find/create/etc., but does not retarget already-open realtime subscriptions, realtime carries locale per topic. To switch a subscription's locale, unsubscribe and re-subscribe with the new locale.
Pitfalls
- Always unsubscribe.
live()returns an unsubscribe function and accepts asignal;liveIter()ends onsignalabort. Leaking subscriptions keeps the multiplexer's SSE connection alive and the server pushing snapshots. In React, call the returned function in your effect cleanup (or pass anAbortControllersignal). - No realtime config = no live mode.
live()/liveIter()/{ realtime: true }needrealtimeset in the server config. Without it the SSE endpoint isn't served and subscriptions error (viaonError/ a thrown generator). - Snapshots are full re-runs, not deltas. Every push is the complete
find()result for your query (re-evaluated under access rules), not a patch. For very large result sets, narrow withwhere/limitrather than streaming thousands of rows on every change. liveIter()throws on disconnect;live()callsonError. Pick the form that matches your error handling, wrapfor awaitintry/catch, or passonErrortolive().
TypeScript
The realtime option and result types come from questpie/client:
import type {
RealtimeAPI,
TopicConfig,
TopicInput,
FindResult,
PaginatedResult,
} from "questpie/client";LiveQueryOptions, GlobalLiveQueryOptions, and LiveSubscribeOptions are structural option types on the client method signatures (inferred at the call site, you rarely import them by name). The snapshot type is FindResult<Row, Relations, TQuery> for collections and ApplyQuery<GlobalSelect, Relations, TQuery> for globals, identical to find() / get().
Related
- Realtime adapter, the server-side transport this client API rides:
RealtimeConfig, the outbox/poll model, and thepg_notify/ Redis / Cloudflare adapters. - Collections,
find(), the paginated envelope, and access rules that every snapshot is re-run through. - Globals, the singleton
get()whose shapeglobals.<name>.live()mirrors. - Relations,
withhydration, whichlive()carries into the snapshot. - Configuration, where
realtimelives inQuestpieConfig, alongside the other adapters. - Getting started, sets up
clientand theqquery-options proxy used in the examples above. - Runnable example:
examples/tanstack-barbershop, a TanStack Start app using the typed client and TanStack Query bindings.
TanStack Query
One factory turns your typed client into ready-made queryOptions() and mutationOptions() for every collection, global, and route, fully typed, with stable keys, error mapping, and opt-in realtime, that you pass straight into useQuery and useMutation.
Overview
Adapters are the swappable backends behind QUESTPIE's infrastructure, queue, search, realtime, storage, email, and KV. Pick one per concern in your config and the same app code runs on any of them.