What Happens When You INSERT a Row?

This tutorial traces the complete lifecycle of a single INSERT statement on a base table that is referenced by a stream table — from the moment the row is written to the moment the stream table reflects the change.

Setup: A Real-World Example

Suppose you run an e-commerce platform. You have an orders table and a stream table that maintains a running total per customer:

-- Base table
CREATE TABLE orders (
    id    SERIAL PRIMARY KEY,
    customer TEXT NOT NULL,
    amount   NUMERIC(10,2) NOT NULL
);

-- Stream table: always-fresh customer totals
SELECT pgstream.create_stream_table(
    'customer_totals',
    $$
      SELECT customer, SUM(amount) AS total, COUNT(*) AS order_count
      FROM orders GROUP BY customer
    $$,
    '1m',           -- refresh when data is staler than 1 minute
    'DIFFERENTIAL'  -- only process changed rows, not the full table
);

After creation, customer_totals is a real PostgreSQL table:

SELECT * FROM customer_totals;
-- (empty — no orders yet)

Phase 1: The INSERT

A new order arrives:

INSERT INTO orders (customer, amount) VALUES ('alice', 49.99);

What happens inside PostgreSQL

When create_stream_table() was called, pg_stream installed an AFTER INSERT OR UPDATE OR DELETE trigger on the orders table. This trigger fires automatically — the user's INSERT statement triggers it transparently.

The trigger function (pgstream_changes.pg_stream_cdc_fn_<oid>()) executes inside the same transaction as the INSERT and writes a single row into the change buffer table:

pgstream_changes.changes_16384    (where 16384 = orders table OID)
┌───────────┬─────────────┬────────┬─────────┬──────────┬──────────┬────────────┐
│ change_id │ lsn         │ action │ pk_hash  │ new_id   │ new_cust │ new_amount │
├───────────┼─────────────┼────────┼─────────┼──────────┼──────────┼────────────┤
│ 1         │ 0/1A3F2B80  │ I      │ -837291 │ 1        │ alice    │ 49.99      │
└───────────┴─────────────┴────────┴─────────┴──────────┴──────────┴────────────┘

Key details:

  • lsn: The current WAL Log Sequence Number (pg_current_wal_lsn()), used to bound which changes belong to which refresh cycle.
  • action: 'I' for INSERT, 'U' for UPDATE, 'D' for DELETE.
  • pk_hash: A pre-computed hash of the primary key (orders.id), used later for efficient row matching.
  • new_* columns: The actual column values from NEW, stored as native PostgreSQL types (not JSONB). There are no old_* values for INSERTs.

The trigger adds zero overhead to the user's transaction commit beyond this single INSERT into the buffer table. There is no JSONB serialization, no logical replication slot, and no external process involved.

Phase 2: The Scheduler Wakes Up

A background worker called the scheduler runs inside PostgreSQL (registered via shared_preload_libraries). It wakes up every pg_stream.scheduler_interval_ms milliseconds (default: 1000ms) and performs a tick:

  1. Rebuild the DAG (if any stream tables were created/dropped since last tick) — a dependency graph of all stream tables and their source tables.
  2. Topological sort — determine the refresh order so that stream tables depending on other stream tables are refreshed after their dependencies.
  3. For each stream table, check: has its staleness exceeded its schedule?

For customer_totals with a '1m' schedule, the scheduler compares:

  • now() minus data_timestamp (the freshness watermark from the last refresh)
  • Against the schedule: 60 seconds

If more than 60 seconds have elapsed and the stream table isn't already being refreshed, the scheduler begins a refresh.

Phase 3: Frontier Advancement

Before executing the refresh, the scheduler creates a new frontier — a snapshot of how far to read changes from each source table:

Previous frontier: { orders(16384): lsn = 0/1A3F2A00 }
New frontier:      { orders(16384): lsn = 0/1A3F2C00 }

The frontier is a DBSP-inspired version vector. Each source table has its own LSN cursor. The refresh will process all changes in the buffer table where lsn > previous_frontier_lsn AND lsn <= new_frontier_lsn.

This means:

  • Changes committed before the previous refresh are already reflected.
  • Changes committed after the new frontier will be picked up in the next cycle.
  • The INSERT we made (lsn = 0/1A3F2B80) falls within this window.

Phase 4: Change Detection — Is There Anything to Do?

Before running the full delta query, the scheduler runs a short-circuit check: does the change buffer actually have any rows in the LSN window?

SELECT count(*)::bigint FROM (
    SELECT 1 FROM pgstream_changes.changes_16384
    WHERE lsn > '0/1A3F2A00'::pg_lsn
    AND lsn <= '0/1A3F2C00'::pg_lsn
    LIMIT <threshold>
) __pgs_capped

This query also checks the adaptive threshold: if the number of changes exceeds a percentage of the source table size (default: 10%), the scheduler falls back to a FULL refresh instead of DIFFERENTIAL, because applying thousands of individual deltas would be slower than a bulk reload.

For our single INSERT, the count is 1 — well below the threshold. The scheduler proceeds with a DIFFERENTIAL refresh.

Phase 5: Delta Query Generation (DVM Engine)

This is where the Differential View Maintenance (DVM) engine does its work. The defining query:

SELECT customer, SUM(amount) AS total, COUNT(*) AS order_count
FROM orders GROUP BY customer

is parsed into an operator tree:

Aggregate(GROUP BY customer, SUM(amount), COUNT(*))
  └── Scan(orders)

The DVM engine differentiates each operator — converting it from "compute the full result" to "compute only what changed":

Step 1: Differentiate the Scan

The Scan(orders) operator becomes a read from the change buffer:

-- Reads only changes in the LSN window, splitting UPDATEs into DELETE+INSERT
WITH __pgs_raw AS (
    SELECT c.pk_hash, c.action,
           c."new_customer", c."old_customer",
           c."new_amount", c."old_amount"
    FROM pgstream_changes.changes_16384 c
    WHERE c.lsn > '0/1A3F2A00'::pg_lsn
    AND   c.lsn <= '0/1A3F2C00'::pg_lsn
)
-- INSERT rows: take new_* values
SELECT pk_hash AS __pgs_row_id, 'I' AS __pgs_action,
       "new_customer" AS customer, "new_amount" AS amount
FROM __pgs_raw WHERE action IN ('I', 'U')
UNION ALL
-- DELETE rows: take old_* values
SELECT pk_hash AS __pgs_row_id, 'D' AS __pgs_action,
       "old_customer" AS customer, "old_amount" AS amount
FROM __pgs_raw WHERE action IN ('D', 'U')

For our single INSERT, this produces:

__pgs_row_id | __pgs_action | customer | amount
-------------|--------------|----------|-------
-837291      | I            | alice    | 49.99

Step 2: Differentiate the Aggregate

The Aggregate differentiation is the heart of incremental maintenance. Instead of re-computing SUM(amount) over the entire orders table, it computes:

-- Delta for SUM: add new values, subtract deleted values
SELECT customer,
       SUM(CASE WHEN __pgs_action = 'I' THEN amount
                WHEN __pgs_action = 'D' THEN -amount END) AS total,
       SUM(CASE WHEN __pgs_action = 'I' THEN 1
                WHEN __pgs_action = 'D' THEN -1 END) AS order_count,
       pgstream.pg_stream_hash(customer::text) AS __pgs_row_id,
       'I' AS __pgs_action
FROM <scan_delta>
GROUP BY customer

For our INSERT of ('alice', 49.99), this yields:

customer | total  | order_count | __pgs_row_id | __pgs_action
---------|--------|-------------|--------------|-------------
alice    | +49.99 | +1          | 7283194      | I

The stream table uses reference counting: it tracks __pgs_count (how many source rows contribute to each group). When __pgs_count reaches 0, the group row is deleted.

Phase 6: MERGE Into the Stream Table

The delta is applied to the customer_totals storage table using a single SQL MERGE statement:

MERGE INTO public.customer_totals AS st
USING (<delta_query>) AS d
ON st.__pgs_row_id = d.__pgs_row_id
WHEN MATCHED AND d.__pgs_action = 'D' THEN DELETE
WHEN MATCHED AND d.__pgs_action = 'I' THEN
    UPDATE SET customer = d.customer, total = d.total, order_count = d.order_count
WHEN NOT MATCHED AND d.__pgs_action = 'I' THEN
    INSERT (__pgs_row_id, customer, total, order_count)
    VALUES (d.__pgs_row_id, d.customer, d.total, d.order_count)

Since alice didn't exist before, this is a NOT MATCHEDINSERT. The stream table now contains:

SELECT * FROM customer_totals;
 customer | total | order_count
----------|-------|------------
 alice    | 49.99 | 1

Phase 7: Cleanup and Bookkeeping

After the MERGE succeeds:

  1. Consumed changes are deleted from the buffer table:

    DELETE FROM pgstream_changes.changes_16384
    WHERE lsn > '0/1A3F2A00'::pg_lsn
    AND lsn <= '0/1A3F2C00'::pg_lsn
    
  2. The frontier is saved to the catalog as JSONB, so the next refresh knows where to start.

  3. The refresh is recorded in pgstream.pgs_refresh_history:

    refresh_id | pgs_id | action       | rows_inserted | rows_deleted | status    | initiated_by
    1          | 1      | DIFFERENTIAL | 1             | 0            | COMPLETED | SCHEDULER
    
  4. The data timestamp on the stream table is advanced, resetting the staleness clock.

  5. The MERGE template is cached in thread-local storage. The next refresh for this stream table skips SQL parsing, operator tree construction, and differentiation — it only substitutes LSN values into the cached template. This saves ~45ms per refresh cycle.

What About UPDATE and DELETE?

UPDATE

UPDATE orders SET amount = 59.99 WHERE id = 1;

The trigger writes a single row with action = 'U', capturing both OLD and NEW values:

action | new_amount | old_amount | new_customer | old_customer
-------|------------|------------|--------------|-------------
U      | 59.99      | 49.99      | alice        | alice

The scan differentiation splits this into:

  • DELETE old: (alice, 49.99) with action 'D'
  • INSERT new: (alice, 59.99) with action 'I'

The aggregate differentiation computes: +59.99 - 49.99 = +10.00 for alice's total. The MERGE updates the existing row.

DELETE

DELETE FROM orders WHERE id = 1;

The trigger writes action = 'D' with the OLD values. The aggregate differentiation computes -49.99 for the total and -1 for the count. If the __pgs_count reaches 0 (no more orders for alice), the MERGE deletes alice's row from the stream table entirely.

Performance: Why This Is Fast

StepWhat it avoids
Trigger-based CDCNo logical replication slot, no WAL parsing, no external process
Typed columnsNo JSONB serialization in the trigger, no jsonb_populate_record in the delta query
Pre-computed pk_hashNo per-row hash computation during the delta query
LSN-bounded readsIndex scan on the change buffer, not a full table scan
Algebraic differentiationProcesses only changed rows — O(changes) not O(table size)
MERGE statementSingle SQL round-trip for all inserts, updates, and deletes
Cached templatesAfter the first refresh, delta SQL generation is skipped entirely
Adaptive fallbackAutomatically switches to FULL refresh when changes exceed a threshold

For a table with 10 million rows and 100 changed rows, a DIFFERENTIAL refresh processes only those 100 rows. A FULL refresh would need to scan all 10 million.


Next in This Series