DVM Operators
This document describes the Differential View Maintenance (DVM) operators implemented by pgstream. Each operator transforms a stream of row-level changes (deltas) propagated from source tables through the operator tree.
Prior Art
- Budiu, M. et al. (2023). "DBSP: Automatic Incremental View Maintenance." VLDB 2023. (comparison)
- Gupta, A. & Mumick, I.S. (1999). Materialized Views: Techniques, Implementations, and Applications. MIT Press.
- Koch, C. et al. (2014). "DBToaster: Higher-order Delta Processing for Dynamic, Frequently Fresh Views." VLDB Journal.
- PostgreSQL 9.4+ — Materialized views with
REFRESH MATERIALIZED VIEW CONCURRENTLY.
Overview
When a stream table is created, the defining SQL query is parsed into a tree of DVM operators. During an differential refresh, changes flow bottom-up through this tree:
Aggregate
│
Project
│
Filter
│
┌───────┴───────┐
Join │
┌─┴─┐ │
Scan(A) Scan(B) Scan(C)
Each operator implements a differentiation rule: given the delta (Δ) to its input(s), it produces the corresponding delta to its output. This is conceptually similar to automatic differentiation in calculus.
The general contract:
- Input: a set of
('+', row)and('-', row)tuples (inserts and deletes) - Output: a set of
('+', row)and('-', row)tuples
Updates are modeled as a delete of the old row followed by an insert of the new row.
Operators
Scan
Module: src/dvm/operators/scan.rs
The leaf operator. Reads CDC changes from a source table's change buffer.
Delta Rule:
$$\Delta(\text{Scan}(R)) = \Delta R$$
The scan operator is a direct passthrough — inserts in the source become inserts in the output, deletes become deletes.
SQL Generation:
SELECT op, row_data FROM pgstream_changes.changes_<oid>
WHERE xid >= <last_consumed_xid>
Notes:
- Each source table has a dedicated change buffer table created by the CDC module.
- Row data is stored as JSONB with column names as keys.
- The
__pgs_row_idcolumn (xxHash of primary key) is included for deduplication.
Filter
Module: src/dvm/operators/filter.rs
Applies a WHERE clause predicate to the delta stream.
Delta Rule:
$$\Delta(\sigma_p(R)) = \sigma_p(\Delta R)$$
Filtering is applied to the deltas in the same way as to the base data — only rows satisfying the predicate pass through.
SQL Generation:
SELECT * FROM (<input_delta>) AS d
WHERE <predicate>
Example:
If the defining query is:
SELECT * FROM orders WHERE status = 'shipped'
And a new row (id=5, status='pending') is inserted, it does not appear in the delta output. If (id=3, status='shipped') is inserted, it passes through.
Edge Cases:
- For updates that change the predicate column (e.g.,
statusfrom'pending'to'shipped'), the CDC produces a delete of the old row and insert of the new row. The filter passes the insert (matches) and blocks the delete (doesn't match the old row against the predicate), correctly resulting in a net insert.
Project
Module: src/dvm/operators/project.rs
Applies column projection from the target list.
Delta Rule:
$$\Delta(\pi_L(R)) = \pi_L(\Delta R)$$
Projects the same columns from the delta that the query projects from the base data.
SQL Generation:
SELECT <target_columns> FROM (<input_delta>) AS d
Notes:
- Projection is applied after filtering for efficiency.
- Computed expressions in the target list (e.g.,
price * quantity AS total) are evaluated on the delta rows.
Join (Inner)
Module: src/dvm/operators/join.rs
Implements inner join between two inputs.
Delta Rule:
For $R \bowtie S$:
$$\Delta(R \bowtie S) = (\Delta R \bowtie S) \cup (R' \bowtie \Delta S)$$
Where $R' = R \cup \Delta R$ (the new state of R after applying deltas).
In practice, when only one side has changes (common case), the delta join simplifies to joining the changed rows against the current state of the other side.
SQL Generation:
-- Changes to left side joined with current right side
SELECT '+' AS op, l.*, r.*
FROM (<left_delta> WHERE op = '+') AS l
JOIN <right_table> AS r ON <join_condition>
UNION ALL
-- Current left side joined with changes to right side
SELECT '+' AS op, l.*, r.*
FROM <left_table> AS l
JOIN (<right_delta> WHERE op = '+') AS r ON <join_condition>
(And corresponding DELETE queries for op = '-'.)
Notes:
- The join uses the current state of the non-changed side, not the change buffer.
- For equi-joins, this is efficient — the join key narrows the scan.
- Non-equi joins (theta joins) may require broader scans.
Outer Join
Module: src/dvm/operators/outer_join.rs (LEFT JOIN), src/dvm/operators/full_join.rs (FULL JOIN)
Implements LEFT, RIGHT, and FULL OUTER JOIN.
RIGHT JOIN Handling:
RIGHT JOIN is automatically converted to a LEFT JOIN with swapped left/right operands during query parsing. This normalization happens transparently — the user can write RIGHT JOIN and the parser rewrites it to an equivalent LEFT JOIN before the operator tree is constructed.
Delta Rule:
Similar to inner join, but additionally handles NULL-padded rows:
$$\Delta(R \text{ LEFT JOIN } S) = (\Delta R \bowtie_L S) \cup (R' \bowtie_L \Delta S)$$
With special handling for:
- Rows in ΔR that have no match in S → emit
('+', row, NULLs) - Rows in ΔS that create a first match for an R row → emit
('-', row, NULLs)and('+', row, s_data) - Rows in ΔS that remove the last match for an R row → emit
('-', row, s_data)and('+', row, NULLs)
SQL Generation (LEFT JOIN):
Uses anti-join detection (via NOT EXISTS) to correctly handle the NULL padding transitions.
FULL OUTER JOIN Delta Rule:
FULL OUTER JOIN extends the LEFT JOIN delta with symmetric right-side handling. The delta is computed as an 8-part UNION ALL:
- Parts 1–5: Same as LEFT JOIN delta (inserted/deleted rows from both sides, with NULL-padding transitions)
- Parts 6–7: Symmetric anti-join transitions for the right side (rows in ΔL that remove/create the last/first match for an S row)
- Part 8: Right-side insertions that have no match in the left side → emit
('+', NULLs, s_data)
Each part uses pre-computed delta flags (__has_ins_*, __has_del_*) to efficiently detect first-match/last-match transitions without redundant subqueries.
Nested Join Support:
Module: src/dvm/operators/join_common.rs
All join operators (inner, left, full) support nested children — i.e., a join whose left or right operand is itself another join. The join_common module provides shared helpers:
build_snapshot_sql()— returns the table reference for simple (Scan) operands, or a parenthesized subquery with disambiguated columns for nested join operandsrewrite_join_condition()— rewrites column references in ON conditions to use the correct alias prefixes for nested children (e.g.,o.cust_id→dl.o__cust_id)
This enables queries with 3 or more joined tables, e.g.:
SELECT o.id, c.name, p.title
FROM orders o
JOIN customers c ON o.cust_id = c.id
JOIN products p ON o.prod_id = p.id
Limitations:
- FULL OUTER JOIN delta computation can be expensive due to dual-side NULL tracking (8 UNION ALL parts).
- Performance degrades with high-cardinality join keys.
NATURAL JOINis supported — common columns are resolved automatically and synthesized into an explicit equi-join condition.
Aggregate
Module: src/dvm/operators/aggregate.rs
Handles GROUP BY with aggregate functions (COUNT, SUM, AVG, MIN, MAX, BOOL_AND, BOOL_OR, STRING_AGG, ARRAY_AGG, JSON_AGG, JSONB_AGG, BIT_AND, BIT_OR, BIT_XOR, JSON_OBJECT_AGG, JSONB_OBJECT_AGG, STDDEV_POP, STDDEV_SAMP, VAR_POP, VAR_SAMP, MODE, PERCENTILE_CONT, PERCENTILE_DISC, JSON_ARRAYAGG, JSON_OBJECTAGG) and the FILTER (WHERE …) and WITHIN GROUP (ORDER BY …) clauses.
Delta Rule:
$$\Delta(\gamma_{G, \text{agg}}(R)) = \gamma_{G, \text{agg}}(R' \text{ WHERE } G \in \text{affected_keys}) - \gamma_{G, \text{agg}}(R \text{ WHERE } G \in \text{affected_keys})$$
Where:
- $G$ = grouping columns
affected_keys= the set of group key values that appear in ΔR- $R'$ = $R \cup \Delta R$ (the new state)
Strategy:
- Identify affected groups — Collect all group key values that appear in the delta (either inserted or deleted rows).
- Recompute old values — Query the storage table for current aggregate values of affected groups.
- Recompute new values — Query the updated source for new aggregate values of affected groups.
- Diff — For each affected group:
- If old exists and new differs → emit
('-', old)and('+', new) - If old exists and new is gone → emit
('-', old)(group eliminated) - If no old and new exists → emit
('+', new)(new group appeared)
- If old exists and new differs → emit
Supported Aggregate Functions:
| Function | DVM Strategy | Notes |
|---|---|---|
COUNT(*) | Algebraic | Fully differential |
COUNT(expr) | Algebraic | Fully differential |
SUM(expr) | Algebraic | Fully differential |
AVG(expr) | Algebraic | Decomposed to SUM/COUNT internally |
MIN(expr) | Semi-algebraic | Uses LEAST merge; falls back to per-group rescan when min row is deleted |
MAX(expr) | Semi-algebraic | Uses GREATEST merge; falls back to per-group rescan when max row is deleted |
BOOL_AND(expr) | Group-rescan | Affected groups are re-aggregated from source data |
BOOL_OR(expr) | Group-rescan | Affected groups are re-aggregated from source data |
STRING_AGG(expr, sep) | Group-rescan | Affected groups are re-aggregated from source data |
ARRAY_AGG(expr) | Group-rescan | Affected groups are re-aggregated from source data |
JSON_AGG(expr) | Group-rescan | Affected groups are re-aggregated from source data |
JSONB_AGG(expr) | Group-rescan | Affected groups are re-aggregated from source data |
BIT_AND(expr) | Group-rescan | Affected groups are re-aggregated from source data |
BIT_OR(expr) | Group-rescan | Affected groups are re-aggregated from source data |
BIT_XOR(expr) | Group-rescan | Affected groups are re-aggregated from source data |
JSON_OBJECT_AGG(key, value) | Group-rescan | Affected groups are re-aggregated from source data |
JSONB_OBJECT_AGG(key, value) | Group-rescan | Affected groups are re-aggregated from source data |
STDDEV_POP(expr) / STDDEV(expr) | Group-rescan | Affected groups are re-aggregated from source data |
STDDEV_SAMP(expr) | Group-rescan | Affected groups are re-aggregated from source data |
VAR_POP(expr) | Group-rescan | Affected groups are re-aggregated from source data |
VAR_SAMP(expr) / VARIANCE(expr) | Group-rescan | Affected groups are re-aggregated from source data |
MODE() WITHIN GROUP (ORDER BY expr) | Group-rescan | Ordered-set aggregate; affected groups re-aggregated |
PERCENTILE_CONT(frac) WITHIN GROUP (ORDER BY expr) | Group-rescan | Ordered-set aggregate; affected groups re-aggregated |
PERCENTILE_DISC(frac) WITHIN GROUP (ORDER BY expr) | Group-rescan | Ordered-set aggregate; affected groups re-aggregated |
CORR(Y, X) | Group-rescan | Regression aggregate; affected groups re-aggregated |
COVAR_POP(Y, X) | Group-rescan | Regression aggregate; affected groups re-aggregated |
COVAR_SAMP(Y, X) | Group-rescan | Regression aggregate; affected groups re-aggregated |
REGR_AVGX(Y, X) | Group-rescan | Regression aggregate; affected groups re-aggregated |
REGR_AVGY(Y, X) | Group-rescan | Regression aggregate; affected groups re-aggregated |
REGR_COUNT(Y, X) | Group-rescan | Regression aggregate; affected groups re-aggregated |
REGR_INTERCEPT(Y, X) | Group-rescan | Regression aggregate; affected groups re-aggregated |
REGR_R2(Y, X) | Group-rescan | Regression aggregate; affected groups re-aggregated |
REGR_SLOPE(Y, X) | Group-rescan | Regression aggregate; affected groups re-aggregated |
REGR_SXX(Y, X) | Group-rescan | Regression aggregate; affected groups re-aggregated |
REGR_SXY(Y, X) | Group-rescan | Regression aggregate; affected groups re-aggregated |
REGR_SYY(Y, X) | Group-rescan | Regression aggregate; affected groups re-aggregated |
ANY_VALUE(expr) | Group-rescan | PostgreSQL 16+; affected groups re-aggregated |
JSON_ARRAYAGG(expr ...) | Group-rescan | SQL-standard JSON aggregation (PostgreSQL 16+); full deparsed SQL preserved |
JSON_OBJECTAGG(key: value ...) | Group-rescan | SQL-standard JSON aggregation (PostgreSQL 16+); full deparsed SQL preserved |
FILTER Clause:
All aggregate functions support the FILTER (WHERE …) clause:
SELECT COUNT(*) FILTER (WHERE status = 'active') AS active_count FROM orders GROUP BY region
The filter predicate is applied within the delta computation — only rows matching the filter contribute to the aggregate delta. Filtered aggregates are excluded from the P5 direct-bypass optimization.
SQL Generation:
The aggregate operator uses a 3-CTE pipeline:
- Merge CTE — Joins affected group keys against old (storage) and new (source) aggregate values, producing
__pgs_meta_action('I' for new-only groups, 'D' for disappeared groups, 'U' for changed groups). - LATERAL VALUES expansion — A single-pass
LATERAL (VALUES ...)clause expands each merge row into insert and delete actions, avoiding a 4-branch UNION ALL:
FROM merge_cte m,
LATERAL (VALUES
('I', m.new_count, m.new_total),
('D', m.old_count, m.old_total)
) v(action, count_val, val_total)
WHERE (m.__pgs_meta_action = 'I' AND v.action = 'I')
OR (m.__pgs_meta_action = 'D' AND v.action = 'D')
OR (m.__pgs_meta_action = 'U')
- Final projection — Emits
('+', row)and('-', row)tuples for the refresh engine.
MIN/MAX Merge Strategy:
MIN and MAX use a semi-algebraic strategy with two cases:
-
Non-extremum deletion — When the deleted row is NOT the current minimum (or maximum), the merge uses
LEAST(old_value, new_inserts)for MIN orGREATEST(old_value, new_inserts)for MAX. This is fully algebraic and requires no rescan. -
Extremum deletion — When the row holding the current minimum (or maximum) IS deleted, the new value cannot be computed from the delta alone. The merge expression returns
NULLas a sentinel, which triggers the change-detection guard (IS DISTINCT FROM) to emit the group for re-aggregation. The MERGE layer treats this as a DELETE + INSERT pair, recomputing the group from source data. This is still more efficient than a full table refresh since only affected groups are rescanned.
Distinct
Module: src/dvm/operators/distinct.rs
Implements SELECT DISTINCT using reference counting.
Delta Rule:
$$\Delta(\delta(R)) = { r \in \Delta R : \text{count}(r, R) = 0 \land \text{count}(r, R') > 0 } - { r \in \Delta R : \text{count}(r, R) > 0 \land \text{count}(r, R') = 0 }$$
In other words:
- A row enters the output when its count transitions from 0 to ≥1
- A row leaves the output when its count transitions from ≥1 to 0
Strategy:
Maintains a hidden __pgs_dup_count column in the storage table to track how many times each distinct row appears in the pre-distinct input.
- On insert: increment count. If count was 0, emit
('+', row). - On delete: decrement count. If count becomes 0, emit
('-', row).
Notes:
- The duplicate count is not visible in user queries against the storage table (projected away by the view layer).
- Duplicate counting uses
__pgs_row_id(xxHash) for efficient lookups.
Union All
Module: src/dvm/operators/union_all.rs
Merges deltas from two branches.
Delta Rule:
$$\Delta(R \cup_{\text{all}} S) = \Delta R \cup_{\text{all}} \Delta S$$
Simply concatenates the delta streams from both branches.
SQL Generation:
SELECT * FROM (<left_delta>)
UNION ALL
SELECT * FROM (<right_delta>)
Notes:
- Column count and types must match between branches.
- Each branch is independently processed through its own operator sub-tree.
- This is the simplest operator since
UNION ALLpreserves all duplicates.
Intersect
Module: src/dvm/operators/intersect.rs
Implements INTERSECT and INTERSECT ALL using dual-count per-branch multiplicity tracking.
Delta Rule:
$$\Delta(R \cap S): \text{emit rows where } \min(\text{count}_L, \text{count}_R) \text{ crosses the 0 boundary}$$
- INTERSECT (set): a row is present when both branches contain it.
- INTERSECT ALL (bag): a row appears $\min(\text{count}_L, \text{count}_R)$ times.
SQL Generation (3-CTE chain):
- Delta CTE — tags rows from left/right child deltas with branch indicator (
'L'/'R') and computes per-row net_count. - Merge CTE — joins with the storage table to compute old and new per-branch counts (
__pgs_count_l,__pgs_count_r). - Final CTE — detects boundary crossings using
LEAST(old_count_l, old_count_r)vsLEAST(new_count_l, new_count_r).
Notes:
- Storage table requires hidden columns
__pgs_count_land__pgs_count_rfor multiplicity tracking. - Both set and bag variants use the same 3-CTE structure; only the boundary logic stays the same (both use LEAST).
Except
Module: src/dvm/operators/except.rs
Implements EXCEPT and EXCEPT ALL using dual-count per-branch multiplicity tracking.
Delta Rule:
$$\Delta(R - S): \text{emit rows where } \max(0, \text{count}_L - \text{count}_R) \text{ crosses the 0 boundary}$$
- EXCEPT (set): a row is present when it exists in the left but not the right branch.
- EXCEPT ALL (bag): a row appears $\max(0, \text{count}_L - \text{count}_R)$ times.
SQL Generation (3-CTE chain):
- Delta CTE — same as Intersect: tags rows from both child deltas with branch indicator.
- Merge CTE — joins with storage table for old/new per-branch counts.
- Final CTE — detects boundary crossings using
GREATEST(0, old_count_l - old_count_r)vsGREATEST(0, new_count_l - new_count_r).
Notes:
- EXCEPT is not commutative — left branch is the positive input, right is subtracted.
- Storage table requires hidden columns
__pgs_count_land__pgs_count_r. - Same 3-CTE structure as Intersect with different effective-count function.
Subquery
Module: src/dvm/operators/subquery.rs
Handles both inlined CTEs and explicit subqueries in FROM ((SELECT ...) AS alias).
Delta Rule:
$$\Delta(\rho_{\text{alias}}(Q)) = \rho_{\text{alias}}(\Delta Q)$$
A subquery wrapper is transparent for differentiation — it delegates to its child's delta and optionally renames output columns to match the subquery's column aliases.
SQL Generation:
-- If column aliases differ from child output columns:
SELECT __pgs_row_id, __pgs_action, child_col1 AS alias_col1, child_col2 AS alias_col2
FROM (<child_delta>)
If the child columns already match the aliases, the subquery is a pure passthrough — no additional CTE is emitted.
Notes:
- This operator enables both CTE support (Tier 1) and standalone subqueries in FROM.
- Column aliases on subqueries (
FROM (...) AS x(a, b)) are handled by emitting a thin renaming CTE. - The subquery body is fully differentiated as a normal operator sub-tree.
CTE Scan (Shared Delta)
Module: src/dvm/operators/cte_scan.rs
Handles multi-reference CTEs by computing the CTE body's delta once and reusing it across all references (Tier 2).
Delta Rule:
$$\Delta(\text{CteScan}(\text{id}, Q)) = \text{cache}[\text{id}] \quad \text{(computed once, reused)}$$
When a CTE is referenced multiple times in a query, each reference produces a CteScan node with the same cte_id. The diff engine differentiates the CTE body once and caches the result. Subsequent CteScan nodes for the same CTE reuse the cached delta.
SQL Generation:
-- First reference: differentiates the CTE body and stores result in cache
-- Subsequent references: point to the same system CTE name
SELECT __pgs_row_id, __pgs_action, <columns>
FROM __pgs_cte_<cte_name>_delta -- shared across all references
If column aliases are present, a thin renaming CTE is added on top of the cached delta.
Notes:
- Without CteScan (Tier 1), multi-reference CTEs are inlined: each reference duplicates the full operator sub-tree. CteScan (Tier 2) eliminates this duplication.
- The CTE body is pre-differentiated in dependency order (earlier CTEs before later ones that reference them).
- Column alias support follows the same pattern as the Subquery operator.
Recursive CTEs
Recursive CTEs (WITH RECURSIVE) are supported via two strategies depending on the refresh mode:
FULL Mode
Recursive CTEs work out-of-the-box with refresh_mode = 'FULL'. The defining query is executed as-is via INSERT INTO ... SELECT ..., and PostgreSQL handles the iterative evaluation internally.
DIFFERENTIAL Mode (Three-Strategy Incremental Maintenance)
Recursive CTEs with refresh_mode = 'DIFFERENTIAL' use an automatic three-strategy approach, selected based on column compatibility and change type:
Strategy 1: Semi-Naive Evaluation (INSERT-only changes)
When only INSERT changes are present in the change buffer, pg_stream uses semi-naive evaluation — the standard technique for incremental fixpoint computation. The base case is differentiated normally through the DVM operator tree, then the resulting delta is propagated through the recursive term using a nested WITH RECURSIVE:
WITH RECURSIVE
__pgs_base_delta AS (
-- Normal DVM differentiation of the base case (INSERT rows only)
<differentiated base case>
),
__pgs_rec_delta AS (
-- Seed: base case delta rows
SELECT cols FROM __pgs_base_delta WHERE __pgs_action = 'I'
UNION ALL
-- Seed: new base rows joining existing ST storage
SELECT cols FROM <recursive term with self_ref = ST_storage, base = change_buffer>
UNION ALL
-- Propagation: recursive term applied to growing delta
SELECT cols FROM <recursive term with self_ref = __pgs_rec_delta, base = full>
)
SELECT pgstream.pg_stream_hash(...) AS __pgs_row_id, 'I' AS __pgs_action, cols
FROM __pgs_rec_delta
The cost is proportional to the number of new rows produced by the change, not the full result set.
Strategy 2: Delete-and-Rederive / DRed (mixed INSERT/DELETE/UPDATE changes)
When the change buffer contains DELETE or UPDATE changes, simple propagation is insufficient — a deleted base row may have transitively derived many recursive rows, some of which may still be derivable from alternative paths. DRed handles this in four phases:
- Insert propagation — semi-naive evaluation for the INSERT portion (same as Strategy 1)
- Over-deletion cascade — propagate base-case deletions through the recursive term against ST storage to find all transitively-derived rows that might be invalidated
- Rederivation — re-execute the recursive CTE from the remaining (non-deleted) base rows to restore any over-deleted rows that have alternative derivations
- Combine — final delta = inserts + (over-deletions − rederived rows)
This avoids full recomputation while correctly handling deletions with alternative derivation paths.
Strategy 3: Recomputation Fallback
When the CTE defines more columns than the outer SELECT projects (column mismatch), the incremental strategies cannot be used because the ST storage table lacks columns needed for recursive self-joins. In this case, the full defining query is re-executed and anti-joined against current storage:
WITH __pgs_recomp_new AS (
SELECT pgstream.pg_stream_hash(row_to_json(sub)::text) AS __pgs_row_id, col1, col2, ...
FROM (<defining_query>) sub
),
__pgs_recomp_ins AS (
SELECT n.__pgs_row_id, 'I'::text AS __pgs_action, n.col1, n.col2, ...
FROM __pgs_recomp_new n
LEFT JOIN <storage_table> s ON s.__pgs_row_id = n.__pgs_row_id
WHERE s.__pgs_row_id IS NULL
),
__pgs_recomp_del AS (
SELECT s.__pgs_row_id, 'D'::text AS __pgs_action, s.col1, s.col2, ...
FROM <storage_table> s
LEFT JOIN __pgs_recomp_new n ON n.__pgs_row_id = s.__pgs_row_id
WHERE n.__pgs_row_id IS NULL
)
SELECT * FROM __pgs_recomp_ins
UNION ALL
SELECT * FROM __pgs_recomp_del
The cost is proportional to the full result set size.
Strategy Selection
| CTE columns match ST? | Change type | Strategy |
|---|---|---|
| ✅ Match | INSERT-only | Semi-naive (Strategy 1) |
| ✅ Match | Mixed (INSERT+DELETE/UPDATE) | DRed (Strategy 2) |
| ❌ Mismatch | Any | Recomputation (Strategy 3) |
Notes:
- Non-linear recursion (multiple self-references in the recursive term) is rejected — PostgreSQL restricts the recursive term to reference the CTE at most once.
- The
__pgs_row_idcolumn (xxHash of the JSON-serialized row) is used for row identity. - For write-heavy workloads on very large recursive result sets with frequent mixed changes,
refresh_mode = 'FULL'may still be more efficient than DRed.
Window Functions
Module: src/dvm/operators/window.rs
Handles window functions (ROW_NUMBER, RANK, DENSE_RANK, SUM() OVER, etc.) using partition-based recomputation.
Delta Rule:
When any row in a partition changes (insert, update, or delete), the entire partition's window function output is recomputed:
$$\Delta(\omega_{f, P}(R)) = \omega_{f, P}(R'|{\text{affected partitions}}) - \omega{f, P}(R|_{\text{affected partitions}})$$
Where $P$ is the PARTITION BY key and $f$ is the window function.
Strategy:
- Identify affected partition keys from the child delta.
- Delete old window function results for affected partitions from storage.
- Fetch surviving (unchanged) rows in affected partitions from the child.
- Recompute the window function on the current input for affected partitions.
- Emit the recomputed rows as inserts.
SQL Generation:
-- CTE 1: Affected partition keys from delta
WITH affected_partitions AS (
SELECT DISTINCT <partition_cols> FROM (<child_delta>)
),
-- CTE 2: Surviving rows (not in delta) for affected partitions
surviving AS (
SELECT * FROM <storage_table>
WHERE (<partition_cols>) IN (SELECT * FROM affected_partitions)
AND __pgs_row_id NOT IN (SELECT __pgs_row_id FROM (<child_delta>) WHERE __pgs_action = 'D')
),
-- CTE 3: Recompute window function
recomputed AS (
SELECT *, <window_func> OVER (PARTITION BY <partition_cols> ORDER BY <order_cols>) AS <alias>
FROM surviving
)
-- Delete old results + insert recomputed results
SELECT 'D' AS __pgs_action, ... -- old rows from affected partitions
UNION ALL
SELECT 'I' AS __pgs_action, ... -- recomputed rows
Notes:
- The cost is proportional to the size of affected partitions, not the full table. For workloads where changes spread across few partitions, this is efficient.
- All window expressions must share the same PARTITION BY / ORDER BY clause.
- Without PARTITION BY, the entire table is treated as a single partition — any change triggers a full recomputation.
Window Frame Clauses:
Window frame specifications are fully supported:
- Modes:
ROWS,RANGE,GROUPS - Bounds:
UNBOUNDED PRECEDING,N PRECEDING,CURRENT ROW,N FOLLOWING,UNBOUNDED FOLLOWING - Between syntax:
BETWEEN <start> AND <end> - Exclusion:
EXCLUDE CURRENT ROW,EXCLUDE GROUP,EXCLUDE TIES,EXCLUDE NO OTHERS
Example: SUM(val) OVER (ORDER BY ts ROWS BETWEEN 3 PRECEDING AND CURRENT ROW)
Named WINDOW Clauses:
Named window definitions are resolved from the query-level WINDOW clause:
SELECT id, SUM(val) OVER w, AVG(val) OVER w
FROM data
WINDOW w AS (PARTITION BY category ORDER BY ts)
The parser resolves OVER w by looking up the window definition from the WINDOW clause and merging partition, order, and frame specifications.
Lateral Function (Set-Returning Functions in FROM)
Module: src/dvm/operators/lateral_function.rs
Handles set-returning functions (SRFs) used in the FROM clause with implicit LATERAL semantics: jsonb_array_elements, jsonb_each, jsonb_each_text, unnest, etc.
Delta Rule:
When a source row changes (insert, update, or delete), the SRF expansion is re-evaluated only for that source row:
$$\Delta(R \ltimes f(R.\text{col})) = (R' \ltimes f(R'.\text{col}))|{\text{changed rows}} - (R \ltimes f(R.\text{col}))|{\text{changed rows}}$$
Where $R$ is the source table, $f$ is the SRF, and changed rows are identified via the child delta.
Strategy (Row-Scoped Recomputation):
- Propagate the child delta to identify changed source rows.
- Find all existing ST rows derived from changed source rows (via column matching).
- Delete old SRF expansions for those source rows.
- Re-expand the SRF for inserted/updated source rows.
- Emit deletes + inserts as the final delta.
SQL Generation (4-CTE chain):
-- CTE 1: Changed source rows from child delta
WITH lat_changed AS (
SELECT DISTINCT "__pgs_row_id", "__pgs_action", <child_cols>
FROM <child_delta>
),
-- CTE 2: Old ST rows for changed source rows (to be deleted)
lat_old AS (
SELECT st."__pgs_row_id", st.<all_output_cols>
FROM <st_table> st
WHERE EXISTS (
SELECT 1 FROM lat_changed cs
WHERE st.<col1> IS NOT DISTINCT FROM cs.<col1>
AND st.<col2> IS NOT DISTINCT FROM cs.<col2>
...
)
),
-- CTE 3: Re-expand SRF for inserted/updated source rows
lat_expand AS (
SELECT pg_stream_hash(<all_cols>::text) AS "__pgs_row_id",
cs.<child_cols>, <srf_alias>.<srf_cols>
FROM lat_changed cs,
LATERAL <srf_function>(cs.<arg>) AS <srf_alias>
WHERE cs."__pgs_action" = 'I'
),
-- CTE 4: Final delta
lat_final AS (
SELECT "__pgs_row_id", 'D' AS "__pgs_action", <cols> FROM lat_old
UNION ALL
SELECT "__pgs_row_id", 'I' AS "__pgs_action", <cols> FROM lat_expand
)
Row Identity:
Content-based: hash(child_columns || srf_result_columns). This is stable as long as the same source row produces the same expanded values.
Supported SRFs:
| Function | Output Columns | Notes |
|---|---|---|
jsonb_array_elements(jsonb) | value (jsonb) | Expands JSONB array to rows |
jsonb_array_elements_text(jsonb) | value (text) | Text variant |
jsonb_each(jsonb) | key (text), value (jsonb) | Expands JSONB object to key-value pairs |
jsonb_each_text(jsonb) | key (text), value (text) | Text variant |
unnest(anyarray) | Element type | Unnests PostgreSQL arrays |
| Custom SRFs | User-provided column aliases | AS alias(col1, col2) |
Notes:
- The cost is proportional to the number of changed source rows × average SRF expansion size, not the full table.
WITH ORDINALITYis supported — adds abigintordinality column to the output.ROWS FROM()with multiple functions is not supported (rejected at parse time).- Column aliases (e.g.,
AS child(value)) are used to determine output column names; for known SRFs without aliases, the alias name becomes the column name. - JSON_TABLE (PostgreSQL 17+) —
JSON_TABLE(expr, path COLUMNS (...))is modeled as aLateralFunctionand uses the same row-scoped recomputation strategy. Supported column types: regular, EXISTS, formatted, and nested columns withON ERROR/ON EMPTYbehaviors andPASSINGclauses.
Lateral Subquery (Correlated Subqueries in FROM)
Module: src/dvm/operators/lateral_subquery.rs
Handles correlated subqueries used in the FROM clause with explicit or implicit LATERAL semantics: FROM t, LATERAL (SELECT ... WHERE ref = t.col) AS alias or FROM t LEFT JOIN LATERAL (...) AS alias ON true.
Delta Rule:
When an outer row changes, the correlated subquery is re-executed only for that row:
$$\Delta(R \ltimes Q(R)) = (R' \ltimes Q(R'))|{\text{changed rows}} - (R \ltimes Q(R))|{\text{changed rows}}$$
Where $R$ is the outer table, $Q(R)$ is the correlated subquery, and changed rows are identified via the child delta.
Strategy (Row-Scoped Recomputation):
- Propagate the child delta to identify changed outer rows.
- Find all existing ST rows derived from changed outer rows (via column matching with
IS NOT DISTINCT FROM). - Delete old subquery expansions for those outer rows.
- Re-execute the subquery for inserted/updated outer rows using the original outer alias.
- Emit deletes + inserts as the final delta.
SQL Generation (4-CTE chain):
-- CTE 1: Changed outer rows from child delta
WITH lat_sq_changed AS (
SELECT DISTINCT "__pgs_row_id", "__pgs_action", <child_cols>
FROM <child_delta>
),
-- CTE 2: Old ST rows for changed outer rows (to be deleted)
lat_sq_old AS (
SELECT st."__pgs_row_id", st.<all_output_cols>
FROM <st_table> st
WHERE EXISTS (
SELECT 1 FROM lat_sq_changed cs
WHERE st.<col1> IS NOT DISTINCT FROM cs.<col1>
AND st.<col2> IS NOT DISTINCT FROM cs.<col2>
...
)
),
-- CTE 3: Re-execute subquery for inserted/updated outer rows
lat_sq_expand AS (
SELECT pg_stream_hash(<all_cols>::text) AS "__pgs_row_id",
<outer_alias>.<child_cols>, <sub_alias>.<sub_cols>
FROM lat_sq_changed AS <outer_alias>, -- Original outer alias!
LATERAL (<subquery_sql>) AS <sub_alias>
WHERE <outer_alias>."__pgs_action" = 'I'
),
-- CTE 4: Final delta
lat_sq_final AS (
SELECT "__pgs_row_id", 'D' AS "__pgs_action", <cols> FROM lat_sq_old
UNION ALL
SELECT "__pgs_row_id", 'I' AS "__pgs_action", <cols> FROM lat_sq_expand
)
LEFT JOIN LATERAL Handling:
For queries using LEFT JOIN LATERAL (...) ON true, the expand CTE uses LEFT JOIN LATERAL instead of comma syntax and wraps subquery columns in COALESCE for hash stability:
lat_sq_expand AS (
SELECT pg_stream_hash(<outer_cols>::text || '/' || COALESCE(<sub_cols>::text, '')) AS "__pgs_row_id",
<outer_alias>.<child_cols>, <sub_alias>.<sub_cols>
FROM lat_sq_changed AS <outer_alias>
LEFT JOIN LATERAL (<subquery_sql>) AS <sub_alias> ON true
WHERE <outer_alias>."__pgs_action" = 'I'
)
Row Identity:
Content-based: hash(outer_columns || '/' || subquery_result_columns). For LEFT JOIN with NULL results, COALESCE ensures a stable hash.
Supported Patterns:
| Pattern | Syntax | Notes |
|---|---|---|
| Top-N per group | LATERAL (SELECT ... ORDER BY ... LIMIT N) | Most common use case |
| Correlated aggregate | LATERAL (SELECT SUM(x) FROM t WHERE t.fk = p.pk) | Returns single row per outer row |
| Existence with data | LEFT JOIN LATERAL (...) ON true | Preserves outer rows with NULLs |
| Multi-column lookup | LATERAL (SELECT a, b FROM t WHERE t.fk = p.pk LIMIT 1) | Multiple derived values |
| GROUP BY inside subquery | LATERAL (SELECT type, COUNT(*) FROM t WHERE t.fk = p.pk GROUP BY type) | Multiple rows per outer row |
Key Design Decision: Outer Alias Rewriting
The subquery body contains column references to the outer table (e.g., WHERE li.order_id = o.id). In the expansion CTE, the changed-sources CTE is aliased with the original outer table alias (e.g., lat_sq_changed AS o) so that the subquery's column references resolve naturally without rewriting.
Notes:
- The cost is proportional to the number of changed outer rows × average subquery result size, not the full table.
- The subquery is stored as raw SQL (like
LateralFunction) because it cannot be independently differentiated — it depends on outer row context. - Source table OIDs referenced by the subquery body are extracted at parse time for CDC trigger setup.
- ORDER BY + LIMIT inside the subquery are valid (they apply per-outer-row, not to the stream table).
Semi-Join (EXISTS / IN Subquery)
Module: src/dvm/operators/semi_join.rs
Handles WHERE EXISTS (SELECT ... FROM ...) and WHERE col IN (SELECT ...) patterns. The parser transforms these into a SemiJoin operator with a left (outer) child, a right (inner) child, and a join condition.
Delta Rule:
$$\Delta(L \ltimes R) = \Delta L|{R} + L|{\Delta R \text{ causes existence change}}$$
- Part 1: Outer rows that changed and still satisfy the semi-join condition.
- Part 2: Existing outer rows whose semi-join result flipped due to inner changes (a matching inner row was inserted or deleted).
Strategy (Two-Part Delta):
- Part 1 (outer delta): Filter
delta_leftto rows that have at least one match in the current right-hand snapshot. - Part 2 (inner delta): For each row in the left snapshot, check whether the existence of matching right-hand rows changed between the old and current state. Emit
'I'if a match appeared,'D'if all matches disappeared.
The "old" right-hand state is reconstructed from the current state by reversing the delta: R_old = (R_current EXCEPT ALL delta_right(action='I')) UNION ALL delta_right(action='D').
Row Identity:
- Part 1: Uses
__pgs_row_idfrom the left delta. - Part 2: Content-based hash via
pg_stream_hash_multion left-side columns.
Supported Patterns:
| Pattern | SQL | Notes |
|---|---|---|
EXISTS | WHERE EXISTS (SELECT 1 FROM t WHERE t.fk = s.pk) | Direct semi-join |
IN (subquery) | WHERE id IN (SELECT fk FROM t) | Rewritten to EXISTS with equality |
| Multiple conditions | WHERE EXISTS (... AND ...) | Additional predicates in subquery WHERE |
Anti-Join (NOT EXISTS / NOT IN Subquery)
Module: src/dvm/operators/anti_join.rs
Handles WHERE NOT EXISTS (SELECT ... FROM ...) and WHERE col NOT IN (SELECT ...) patterns. The inverse of the semi-join operator.
Delta Rule:
$$\Delta(L \triangleright R) = \Delta L|{\neg R} + L|{\Delta R \text{ causes existence change}}$$
- Part 1: Outer rows that changed and have no match in the right-hand snapshot.
- Part 2: Existing outer rows whose anti-join result flipped due to inner changes.
Strategy (Two-Part Delta):
- Part 1 (outer delta): Filter
delta_leftto rows withNOT EXISTSin the current right snapshot. - Part 2 (inner delta): For each row in the left snapshot, detect existence changes. Emit
'D'if a match appeared (row no longer qualifies),'I'if all matches disappeared (row now qualifies).
Note the inverted semantics compared to semi-join: a new match means deletion, losing all matches means insertion.
Row Identity: Same as semi-join.
Supported Patterns:
| Pattern | SQL | Notes |
|---|---|---|
NOT EXISTS | WHERE NOT EXISTS (SELECT 1 FROM t WHERE t.fk = s.pk) | Direct anti-join |
NOT IN (subquery) | WHERE id NOT IN (SELECT fk FROM t) | Rewritten to NOT EXISTS with equality |
Scalar Subquery (Correlated SELECT Subquery)
Module: src/dvm/operators/scalar_subquery.rs
Handles scalar subqueries appearing in the SELECT list, e.g., SELECT a, (SELECT max(x) FROM t) AS mx FROM s. The subquery must return exactly one row and one column.
Delta Rule:
$$\Delta(L \times q) = \Delta L \times q' + L \times (q' - q)$$
Where $q$ is the scalar subquery value and $q'$ is the updated value.
Strategy (Two-Part Delta):
- Part 1 (outer delta): Propagate the child delta, appending the current scalar subquery value to each row.
- Part 2 (scalar value change): When the scalar subquery's result changes, emit deletes for all existing outer rows (with the old scalar value) and re-inserts for all outer rows (with the new value). The old scalar value is reconstructed by reversing the inner delta.
SQL Generation (3 or 4 CTEs):
-- Part 1: child delta + current scalar value
WITH sq_outer AS (
SELECT *, (<scalar_subquery>) AS "<alias>"
FROM <child_delta>
),
-- Part 2a: DELETE all outer rows when scalar changed
sq_del AS (
SELECT "__pgs_row_id", 'D' AS "__pgs_action", <cols>
FROM <st_table>
WHERE (<scalar_old>) IS DISTINCT FROM (<scalar_current>)
),
-- Part 2b: INSERT all outer rows with new scalar value
sq_ins AS (
SELECT pg_stream_hash_multi(...) AS "__pgs_row_id",
'I' AS "__pgs_action", <cols>, (<scalar_current>) AS "<alias>"
FROM <source_snapshot>
WHERE (<scalar_old>) IS DISTINCT FROM (<scalar_current>)
)
-- Final: UNION ALL of all parts
SELECT * FROM sq_outer
UNION ALL SELECT * FROM sq_del
UNION ALL SELECT * FROM sq_ins
Row Identity:
- Part 1:
__pgs_row_idfrom the child delta. - Part 2: Content-based hash via
pg_stream_hash_multion all output columns.
Notes:
- The scalar subquery is stored as raw SQL (deparsed from the parse tree).
- The old scalar value is approximated using the same
EXCEPT ALL / UNION ALLreversal technique as semi/anti-join. - If the scalar subquery references a table that changes, all outer rows must be re-evaluated — the delta can be large.
- Source OIDs used by the scalar subquery are captured at parse time for CDC trigger registration.
Operator Tree Construction
The DVM engine builds the operator tree by analyzing the parsed query:
- WITH clause → CTE definitions extracted into a name→body map (non-recursive) or CTE registry (multi-reference)
- FROM clause →
Scannodes for physical tables;Subquerynodes for inlined CTEs and subqueries in FROM;CteScannodes for multi-reference CTEs;LateralFunctionnodes for SRFs and JSON_TABLE in FROM;LateralSubquerynodes for correlated subqueries in FROM - JOIN →
JoinorOuterJoinwrapping two sub-trees - LATERAL SRFs →
LateralFunctionwrapping the left-hand FROM item as its child - LATERAL subqueries →
LateralSubquerywrapping the left-hand FROM item as its child (comma syntax or JOIN LATERAL) - WHERE subqueries →
SemiJoinforEXISTS/IN (subquery),AntiJoinforNOT EXISTS/NOT IN (subquery), extracted from the WHERE clause - Scalar subqueries →
ScalarSubqueryfor(SELECT ...)in the SELECT list, wrapping the child tree - WHERE →
Filterwrapping the scan/join tree (remaining non-subquery predicates) - SELECT list →
Projectfor column selection and expressions - GROUP BY →
Aggregatewrapping the filtered/projected tree - DISTINCT →
Distincton top - UNION ALL →
UnionAllcombining two complete sub-trees - INTERSECT / EXCEPT →
IntersectorExceptcombining two sub-trees with dual-count tracking - Window functions →
Windowwrapping the sub-tree with PARTITION BY / ORDER BY metadata - ORDER BY → silently discarded (storage row order is undefined)
- LIMIT / OFFSET → rejected with a clear error (stream tables materialize the full result set)
For recursive CTEs (WITH RECURSIVE), the query is parsed into an OpTree with RecursiveCte operator nodes. In DIFFERENTIAL mode, the strategy (semi-naive, DRed, or recomputation) is selected automatically based on column compatibility and change type — see the Recursive CTEs section above for details.
The tree is then traversed bottom-up during delta generation: each operator's generate_delta_sql() method composes its SQL fragment around the output of its child operator(s).
Further Reading
- ARCHITECTURE.md — System-wide component overview
- SQL_REFERENCE.md — Complete function reference
- CONFIGURATION.md — GUC tuning guide