dbt-pgtrickle

A dbt package that integrates pg_trickle stream tables into your dbt project via a custom stream_table materialization.

No custom Python adapter required — works with the standard dbt-postgres adapter. Just Jinja SQL macros that call pg_trickle's SQL API.

Prerequisites

RequirementMinimum Version
dbt Core≥ 1.9
dbt-postgres adapterMatching dbt Core version
PostgreSQL18.x
pg_trickle extension≥ 0.1.0 (CREATE EXTENSION pg_trickle;)

Installation

Add to your packages.yml:

packages:
  - git: "https://github.com/<org>/pg-trickle.git"
    revision: v0.1.0
    subdirectory: "dbt-pgtrickle"

Then run:

dbt deps

Quick Start

Create a model with materialized='stream_table':

-- models/marts/order_totals.sql
{{
  config(
    materialized='stream_table',
    schedule='5m',
    refresh_mode='DIFFERENTIAL'
  )
}}

SELECT
    customer_id,
    SUM(amount) AS total_amount,
    COUNT(*) AS order_count
FROM {{ source('raw', 'orders') }}
GROUP BY customer_id
dbt run --select order_totals   # Creates the stream table
dbt test --select order_totals  # Tests work normally (it's a real table)

Configuration Reference

KeyTypeDefaultDescription
materializedstringMust be 'stream_table'
schedulestring/null'1m'Refresh schedule (e.g., '5m', '1h', cron). null for pg_trickle's CALCULATED schedule.
refresh_modestring'DIFFERENTIAL''FULL' or 'DIFFERENTIAL'
initializebooltruePopulate on creation
statusstring/nullnull'ACTIVE' or 'PAUSED'. When set, applies on subsequent runs via alter_stream_table().
stream_table_namestringmodel nameOverride stream table name
stream_table_schemastringtarget schemaOverride schema

Project-level defaults

# dbt_project.yml
models:
  my_project:
    marts:
      +materialized: stream_table
      +schedule: '5m'
      +refresh_mode: DIFFERENTIAL

Operations

pgtrickle_refresh — Manual refresh

dbt run-operation pgtrickle_refresh --args '{"model_name": "order_totals"}'

drop_all_stream_tables — Drop dbt-managed stream tables

Drops only stream tables defined as dbt models (safe in shared environments):

dbt run-operation drop_all_stream_tables

drop_all_stream_tables_force — Drop ALL stream tables

Drops everything from the pg_trickle catalog, including non-dbt stream tables:

dbt run-operation drop_all_stream_tables_force

pgtrickle_check_cdc_health — CDC pipeline health

dbt run-operation pgtrickle_check_cdc_health

Raises an error (non-zero exit) if any CDC source is unhealthy.

Freshness Monitoring

Native dbt source freshness is not supported (the last_refresh_at column lives in the catalog, not on the stream table). Use the pgtrickle_check_freshness run-operation instead:

# Check all active stream tables (defaults: warn=600s, error=1800s)
dbt run-operation pgtrickle_check_freshness

# Custom thresholds
dbt run-operation pgtrickle_check_freshness \
  --args '{model_name: order_totals, warn_seconds: 300, error_seconds: 900}'

Exits non-zero when any stream table exceeds the error threshold — safe for CI.

Useful dbt Commands

# List all stream table models
dbt ls --select config.materialized:stream_table

# Full refresh (drop + recreate)
dbt run --select order_totals --full-refresh

# Build models + tests in DAG order
dbt build --select order_totals

Note: dbt build runs stream table models early in the DAG. If downstream models depend on a stream table with initialize: false, the table may not be populated yet.

Testing

Stream tables are standard PostgreSQL heap tables — all dbt tests work normally:

models:
  - name: order_totals
    columns:
      - name: customer_id
        tests:
          - not_null
          - unique

__pgt_row_id Column

pg_trickle adds an internal __pgt_row_id column to stream tables for row identity tracking. This column:

  • Appears in SELECT * and dbt docs generate
  • Does not affect dbt test unless you check column counts
  • Can be documented to reduce confusion:
columns:
  - name: __pgt_row_id
    description: "Internal pg_trickle row identity hash. Ignore this column."

Limitations

LimitationWorkaround
No in-place query alterationMaterialization auto-drops and recreates when query changes
__pgt_row_id visibleDocument it; exclude in downstream SELECT
No native dbt source freshnessUse pgtrickle_check_freshness run-operation
No dbt snapshot supportSnapshot the stream table as a regular table
Query change detection is whitespace-sensitivedbt compiles deterministically; unnecessary recreations are safe
PostgreSQL 18 requiredExtension requirement
Shared version tags with pg_trickle extensionPin to specific git revision

Contributing

See AGENTS.md for development guidelines and the implementation plan for design rationale.

Running tests locally

The quickest way (requires Docker and dbt installed):

# Full run — builds Docker image, starts container, runs tests, cleans up
just test-dbt

# Fast run — reuses existing Docker image (run after first build)
just test-dbt-fast

Or use the script directly with options:

cd dbt-pgtrickle/integration_tests/scripts

# Default: builds image, runs tests with dbt 1.9, cleans up
./run_dbt_tests.sh

# Skip image rebuild (faster iteration)
./run_dbt_tests.sh --skip-build

# Keep the container running after tests (for debugging)
./run_dbt_tests.sh --skip-build --keep-container

# Use a custom port (avoids conflicts with local PostgreSQL)
PGPORT=25432 ./run_dbt_tests.sh

Manual testing against an existing pg_trickle instance

If you already have PostgreSQL 18 + pg_trickle running locally:

export PGHOST=localhost PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=postgres
cd dbt-pgtrickle/integration_tests
dbt deps
dbt seed
dbt run
./scripts/wait_for_populated.sh order_totals 30
dbt test
dbt run-operation drop_all_stream_tables

License

Apache 2.0 — see LICENSE.