QUESTPIE
Client

Realtime

Subscribe to a live query and get a fresh, access-controlled snapshot every time a matching record changes, typed exactly like find(), pushed over one multiplexed SSE connection, with one server-side adapter to wire.

Realtime turns any read into a live subscription. Call client.collections.posts.live({ where: { published: true } }, onSnapshot) and your callback fires with the full find() result immediately, then again whenever a matching post is created, updated, or deleted. The snapshot is the exact same shape and type as find(), relations, pagination envelope, and all, re-run server-side through the collection's access rules. Many subscriptions share one SSE connection, and you turn the whole thing on with a single realtime adapter in your config.

What it does

  • Live queries, fully typed. live() / liveIter() push find()-shaped snapshots; the snapshot type is inferred from your query options, so snapshot.docs[0].title is type-checked end to end.
  • One snapshot now, then on every change. The first snapshot arrives immediately on connect (no separate initial fetch); later snapshots arrive whenever a record matching your where/with/orderBy changes.
  • Access-controlled at the source. Each snapshot is the result of re-running your query server-side under the subscriber's session, realtime never leaks rows the user can't read.
  • One connection for many topics. A single multiplexer holds one POST /realtime SSE stream and fans out to every active subscription, sidestepping the browser's 6-connections-per-domain limit.
  • { realtime: true } on TanStack Query. Opt a find / count / get query into live mode with one flag and the hook streams instead of one-shot fetching.
  • Pluggable transport. Default polling needs zero setup; add pgNotifyAdapter() or redisStreamsAdapter({ client }) for push-based, horizontally-scalable delivery.

Quick start

Realtime works on the client out of the box, createClient() always wires a realtime API. The one thing you must turn on is the server side: set realtime in your runtime config so the app records changes and serves the SSE stream.

src/questpie/server/questpie.config.ts
import { runtimeConfig } from "questpie/app";

import env from "./env"; // declared + validated in env.ts, see Environment

export default runtimeConfig({
  db: { url: env.DATABASE_URL },
  realtime: true,
});

Now subscribe from the client. live() takes your query options, a snapshot callback, and returns an unsubscribe function:

A client component / effect
import { client } from "@/lib/client.js";

// First snapshot fires immediately; then on every matching change.
const unsubscribe = client.collections.posts.live(
  { where: { published: true }, orderBy: { createdAt: "desc" }, limit: 10 },
  (snapshot) => {
    // snapshot is the SAME shape & type as client.collections.posts.find(...)
    console.log(snapshot.totalDocs, snapshot.docs[0]?.title);
  },
);

// Later, stop listening (e.g. on unmount):
unsubscribe();

That callback now re-fires with a fresh, access-filtered top-10 every time a published post changes.

The first snapshot is your initial data

You do not need a separate find() before subscribing. live() delivers a full snapshot the moment the connection opens, then keeps it fresh. Treat the first callback invocation as "data loaded".

Example → inferred snapshot

The snapshot type equals the find() result type for the same options, FindResult<Row, Relations, TQuery>. Narrow with columns? No: live() deliberately carries a subset of find() options (no columns), so a snapshot is always the full row plus any with relations you requested:

client.collections.posts.live(
  { where: { published: true }, with: { author: true }, limit: 5 },
  (snapshot) => {
    snapshot.docs[0].author.name;
    //         ^? string, `with: { author: true }` hydrated the relation
    snapshot.hasNextPage;
    //         ^? boolean, full PaginatedResult envelope, same as find()
  },
);

A snapshot is a PaginatedResult<T> ({ docs, totalDocs, totalPages, page, hasPrevPage, hasNextPage, prevPage, nextPage, ... }), or, if your query uses groupBy… it can't, because groupBy isn't a live option (see below).

Live query options

live() and liveIter() accept LiveQueryOptions, the slice of find() options that the realtime wire protocol carries as topic fields:

type LiveQueryOptions<TSelect, TRelations> = {
  where?: Where<TSelect, TRelations>;
  with?: With<TRelations>;
  limit?: number;
  offset?: number;
  orderBy?: OrderBy<TSelect>;
  locale?: string;
};

`columns`, `groupBy`, `search`, `includeDeleted`, `stage` are not live options

LiveQueryOptions intentionally excludes columns, groupBy, search, includeDeleted, and stage. Realtime snapshots are the full query re-run server-side from the topic fields, so column projection / grouping / soft-delete-included / stage-pinned reads aren't expressed over the wire. If you pass them they're simply not part of the subscription. For those, use a one-shot find().

LiveSubscribeOptions, the third argument

live() takes an optional third argument to control the subscription lifecycle:

type LiveSubscribeOptions = {
  signal?: AbortSignal;          // abortauto-unsubscribe
  onError?: (error: Error) => void; // SSE connection failed
};
const ac = new AbortController();
client.collections.posts.live(
  { where: { published: true } },
  (snapshot) => render(snapshot),
  {
    signal: ac.signal,
    onError: (err) => console.error("realtime dropped:", err),
  },
);
// ac.abort() unsubscribes, equivalent to calling the returned function.

signal unsubscribes on abort (so you can wire it to a component's lifetime). onError fires when the underlying SSE connection fails, wire it so a dropped connection surfaces instead of leaving your UI stuck on stale data.

liveIter(), the async-iterable form

liveIter() is live() as an AsyncGenerator. Same snapshots, consumed with for await. Built for workers, agents, scripts, and tests rather than React effects:

const ac = new AbortController();

for await (const snapshot of client.collections.posts.liveIter(
  { where: { published: true }, limit: 10 },
  { signal: ac.signal },
)) {
  console.log("posts now:", snapshot.totalDocs);
  // ac.abort() ends the loop.
}

It takes only { signal? }, no onError (unlike live()). A connection failure makes the generator throw, so wrap the loop in try/catch if you need to handle drops. Terminate by aborting the signal.

Globals

Globals are singletons, so their live API mirrors get() with a smaller option set. GlobalLiveQueryOptions is { with?, locale? } only, no where/limit/offset/orderBy:

const stop = client.globals.siteSettings.live(
  { with: { logo: true } },
  (snapshot) => {
    // snapshot is the SAME shape & type as client.globals.siteSettings.get(...)
    document.title = snapshot.title;
  },
);

live() and liveIter() carry the same LiveSubscribeOptions / { signal? } semantics as collections.

Realtime with TanStack Query

If you use the @questpie/tanstack-query bindings, you don't reach for live() directly, you pass { realtime: true } as the second argument to a query builder, and the hook streams instead of fetching once. This works on exactly three reads: collections.<name>.find, collections.<name>.count, and globals.<name>.get.

A component
import { useQuery } from "@tanstack/react-query";

import { q } from "@/lib/query.js";

function LivePosts() {
  const { data } = useQuery(
    // 1st arg: query options. 2nd arg: { realtime: true }.
    q.collections.posts.find({ where: { published: true }, limit: 10 }, { realtime: true }),
  );

  return <p>{data?.totalDocs ?? 0} published posts (live)</p>;
}

Under the hood the hook uses TanStack's experimental_streamedQuery with refetchMode: "append", seeding from find() and then replacing the result with each pushed snapshot. The query key is identical to the non-realtime version, so the same cache entry is reused.

A live count reads `totalDocs` off the snapshot

q.collections.posts.count(opts, { realtime: true }) streams the full collection snapshot and extracts totalDocs, so your count stays live without a dedicated count subscription.

`{ realtime: true }` only exists on find / count / get

findOne and every mutation reject a { realtime: true } argument at the type level, passing it is a compile error (the tests assert @ts-expect-error). There's no live findOne; subscribe with find({ where: { id } }) (or live()) and read docs[0].

If the server has no realtime, `{ realtime: true }` falls back to a stuck stream

The client always wires client.realtime, so the TanStack flag is honored client-side regardless. The real prerequisite is server-side realtime: set realtime in your config so POST /realtime exists. Without it the SSE connect fails, the stream errors out (and find()/get()/live()'s onError fires) rather than silently degrading to a poll. Turn realtime on in the server config before relying on live queries.

The TanStack realtime topic is derived purely from your read options via buildCollectionTopic / buildGlobalTopic, config-level locale/stage are not added to the topic, only options.locale is. Two subscriptions that differ only by where get distinct live streams.

Server-side: turning realtime on

Everything above is the client API. None of it works until you turn realtime on in the server config, that's the one server requirement, and you've already seen it in Quick start: set realtime in runtimeConfig and the app records every change to an outbox and serves the POST /realtime SSE stream. With no adapter the service polls that outbox (or auto-wires pg_notify on a Postgres app); add pgNotifyAdapter() or redisStreamsAdapter({ client }) to push changes the instant they happen, across every horizontally-scaled instance.

The full server surface, RealtimeConfig options, the outbox/poll model, every transport adapter (pgNotifyAdapter, redisStreamsAdapter, cloudflareRealtimeAdapter), and building your own, lives on one page: see Realtime adapter.

One connection, by design, don't open your own SSE per query

On the client side, every live() / liveIter() call (and the TanStack flag) shares one multiplexed POST /realtime SSE connection, 50 subscriptions cost one HTTP connection, sidestepping the browser's ~6-per-domain cap over HTTP/1.1. Always subscribe through the typed wrappers (or client.realtime) so they share that connection rather than opening your own stream.

Raw realtime, client.realtime

The typed wrappers are the recommended path. If you need to subscribe to a hand-built topic (e.g. a generic snapshot consumer in the admin), the raw RealtimeAPI is exposed as client.realtime:

import { buildCollectionTopic } from "questpie/client";

// Typed wrapper (preferred), TData inferred, topic built for you:
const stop = client.collections.posts.live({ where: { published: true } }, onSnap);

// Raw equivalent, you build the topic and supply TData yourself:
const stopRaw = client.realtime.subscribe<MySnapshot>(
  buildCollectionTopic("posts", { where: { published: true } }),
  (data) => onSnap(data),
  /* signal */ undefined,
  /* customId */ undefined,
  /* onError */ (err) => console.error(err),
);

RealtimeAPI is { subscribe<TData>(topic, callback, signal?, customId?, onError?) => unsubscribe; stream<TData>(topic, signal?, customId?) => AsyncGenerator; destroy(); topicCount; subscriberCount }. subscribe is the callback form, stream the async-iterable form. destroy() tears the multiplexer down entirely (it re-creates on the next subscribe). Prefer the typed wrappers, they infer TData from the query and can't drift from the topic shape.

buildCollectionTopic, buildGlobalTopic, and the RealtimeAPI / TopicConfig / TopicInput types are re-exported from questpie/client. The multiplexer, createRealtimeAPI, and sseSnapshotStream are internal (not part of the public questpie/client surface), you reach them only through client.realtime.

Localization

Pass locale in the live options to subscribe to a specific localized variant, it becomes part of the topic, so the en and de views of the same query are independent subscriptions:

client.collections.posts.live({ where: { published: true }, locale: "de" }, onSnap);

setLocale() on the client mutates request headers for find/create/etc., but does not retarget already-open realtime subscriptions, realtime carries locale per topic. To switch a subscription's locale, unsubscribe and re-subscribe with the new locale.

Pitfalls

  • Always unsubscribe. live() returns an unsubscribe function and accepts a signal; liveIter() ends on signal abort. Leaking subscriptions keeps the multiplexer's SSE connection alive and the server pushing snapshots. In React, call the returned function in your effect cleanup (or pass an AbortController signal).
  • No realtime config = no live mode. live()/liveIter()/{ realtime: true } need realtime set in the server config. Without it the SSE endpoint isn't served and subscriptions error (via onError / a thrown generator).
  • Snapshots are full re-runs, not deltas. Every push is the complete find() result for your query (re-evaluated under access rules), not a patch. For very large result sets, narrow with where/limit rather than streaming thousands of rows on every change.
  • liveIter() throws on disconnect; live() calls onError. Pick the form that matches your error handling, wrap for await in try/catch, or pass onError to live().

TypeScript

The realtime option and result types come from questpie/client:

import type {
  RealtimeAPI,
  TopicConfig,
  TopicInput,
  FindResult,
  PaginatedResult,
} from "questpie/client";

LiveQueryOptions, GlobalLiveQueryOptions, and LiveSubscribeOptions are structural option types on the client method signatures (inferred at the call site, you rarely import them by name). The snapshot type is FindResult<Row, Relations, TQuery> for collections and ApplyQuery<GlobalSelect, Relations, TQuery> for globals, identical to find() / get().

  • Realtime adapter, the server-side transport this client API rides: RealtimeConfig, the outbox/poll model, and the pg_notify / Redis / Cloudflare adapters.
  • Collections, find(), the paginated envelope, and access rules that every snapshot is re-run through.
  • Globals, the singleton get() whose shape globals.<name>.live() mirrors.
  • Relations, with hydration, which live() carries into the snapshot.
  • Configuration, where realtime lives in QuestpieConfig, alongside the other adapters.
  • Getting started, sets up client and the q query-options proxy used in the examples above.
  • Runnable example: examples/tanstack-barbershop, a TanStack Start app using the typed client and TanStack Query bindings.

On this page