Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

DeltaForge

Version Arch License

Introduction

DeltaForge is a versatile, high-performance Change Data Capture (CDC) engine built in Rust. It streams database changes into downstream systems like Kafka, Redis, and NATS - giving you full control over how events are routed, transformed, and delivered. Built-in schema discovery automatically infers and tracks the shape of your data as it flows through, including deep inspection of nested JSON structures.

Pipelines are defined declaratively in YAML, making it straightforward to onboard new use cases without custom code.

Built with Sources Processors Sinks Output Formats
Rust
Rust
MySQL PostgreSQL
MySQL · PostgreSQL
JavaScript
JavaScript · Outbox
Kafka Redis NATS
Kafka · Redis · NATS
Native Debezium CloudEvents

Why DeltaForge?

Core Capabilities

  • Powered by Rust : Predictable performance, memory safety, and minimal resource footprint.
  • 🔌 Pluggable architecture : Sources, processors, and sinks are modular and independently extensible.
  • 🧩 Declarative pipelines : Define sources, transforms, sinks, and commit policies in version-controlled YAML with environment variable expansion for secrets.
  • 📦 Reliable checkpointing : Resume safely after restarts with at-least-once delivery guarantees.
  • 🔀 Dynamic routing : Route events to per-table topics, streams, or subjects using templates or JavaScript logic.
  • 📤 Transactional outbox : Publish domain events atomically with database writes. Per-aggregate routing, raw payload delivery, zero polling.
  • 🛠️ Cloud-native ready : Single binary, Docker images, JSON logs, Prometheus metrics, and liveness/readiness probes for Kubernetes.

Schema Intelligence

  • 🔍 Schema sensing : Automatically infer and track schema from event payloads, including deep inspection of nested JSON structures.
  • 🗺️ High-cardinality handling : Detect and normalize dynamic map keys (session IDs, trace IDs) to prevent false schema evolution events.
  • 🏷️ Schema fingerprinting : SHA-256 based change detection with schema-to-checkpoint correlation for reliable replay.
  • 🗃️ Source-owned semantics : Preserves native database types (PostgreSQL arrays, MySQL JSON, etc.) instead of normalizing to a universal type system.

Operational Features

  • 🔄 Graceful failover : Handles source failover with automatic schema revalidation - no manual intervention needed.
  • 🧬 Zero-downtime schema evolution : Detects DDL changes and reloads schemas automatically, no pipeline restart needed.
  • 🎯 Flexible table selection : Wildcard patterns (db.*, schema.prefix%) for easy onboarding.
  • 📀 Transaction boundaries : Optionally keep source transactions intact across batches.
  • ⚙️ Commit policies : Control checkpoint behavior with all, required, or quorum modes across multiple sinks.
  • 🔧 Live pipeline management : Pause, resume, patch, and inspect running pipelines via the REST API.
  • 🗄️ Safe initial snapshot : Consistent parallel backfill of existing tables before streaming begins, with binlog/WAL retention validation, background guards, and crash-resume at table granularity.

Use Cases

DeltaForge is designed for:

  • Real-time data synchronization : Keep caches, search indexes, and analytics systems in sync with your primary database.
  • Event-driven architectures : Stream database changes to Kafka or NATS for downstream microservices.
  • Transactional messaging : Use the outbox pattern to publish domain events atomically with database writes - no distributed transactions needed.
  • Audit trails and compliance : Capture every mutation with full before/after images for SOC2, HIPAA, or GDPR requirements.
  • Lightweight ETL : Transform, filter, and route data in-flight with JavaScript processors - no Spark or Flink cluster needed.

DeltaForge is not a DAG-based stream processor. It is a focused CDC engine meant to replace tools like Debezium when you need a lighter, cloud-native, and more customizable runtime.

Getting Started

GuideDescription
QuickstartGet DeltaForge running in minutes
CDC OverviewUnderstand Change Data Capture concepts
ConfigurationPipeline spec reference
DevelopmentBuild from source, contribute

Quick Example

metadata:
  name: orders-to-kafka
  tenant: acme

spec:
  source:
    type: mysql
    config:
      dsn: ${MYSQL_DSN}
      tables: [shop.orders]

  processors:
    - type: javascript
      inline: |
        (event) => {
          event.processed_at = Date.now();
          return [event];
        }

  sinks:
    - type: kafka
      config:
        brokers: ${KAFKA_BROKERS}
        topic: order-events

Installation

Docker (recommended):

docker pull ghcr.io/vnvo/deltaforge:latest

From source:

git clone https://github.com/vnvo/deltaforge.git
cd deltaforge
cargo build --release

See the Development Guide for detailed build instructions and available Docker image variants.

License

DeltaForge is dual-licensed under MIT and Apache 2.0.

Change Data Capture (CDC)

Change Data Capture (CDC) is the practice of streaming database mutations as ordered events so downstream systems can stay in sync without periodic full loads. Rather than asking “what does my data look like now?”, CDC answers “what changed, and when?”

DeltaForge is a CDC engine built to make log-based change streams reliable, observable, and operationally simple in modern, containerized environments. It focuses on row-level CDC from MySQL binlog and Postgres logical replication to keep consumers accurate and latency-aware while minimizing impact on source databases.

In short: DeltaForge tails committed transactions from MySQL and Postgres logs, preserves ordering and transaction boundaries, and delivers events to Kafka and Redis with checkpointed delivery, configurable batching, and Prometheus metrics - without requiring a JVM or distributed coordinator.


Why CDC matters

Traditional data integration relies on periodic batch jobs that query source systems, compare snapshots, and push differences downstream. This approach worked for decades, but modern architectures demand something better.

The batch ETL problem: A nightly sync means your analytics are always a day stale. An hourly sync still leaves gaps and hammers your production database with expensive SELECT * queries during business hours. As data volumes grow, these jobs take longer, fail more often, and compete for the same resources your customers need.

CDC flips the model. Instead of pulling data on a schedule, you subscribe to changes as they happen.

AspectBatch ETLCDC
LatencyMinutes to hoursSeconds to milliseconds
Source loadHigh (repeated scans)Minimal (log tailing)
Data freshnessStale between runsNear real-time
Failure recoveryRe-run entire jobResume from checkpoint
Change detectionDiff comparisonNative from source

These benefits compound as systems scale and teams decentralize:

  • Deterministic replay: Ordered events allow consumers to reconstruct state or power exactly-once delivery with checkpointing.
  • Polyglot delivery: The same change feed can serve caches, queues, warehouses, and search indexes simultaneously without additional source queries.

How CDC works

All CDC implementations share a common goal: detect changes and emit them as events. The approaches differ in how they detect those changes.

Log-based CDC

Databases maintain transaction logs (MySQL binlog, Postgres WAL) that record every committed change for durability and replication. Log-based CDC reads these logs directly, capturing changes without touching application tables.

┌─────────────┐    commits    ┌─────────────┐    tails     ┌─────────────┐
│ Application │──────────────▶│  Database   │─────────────▶│ CDC Engine  │
└─────────────┘               │   + WAL     │              └──────┬──────┘
                              └─────────────┘                     │
                                                                  ▼
                                                           ┌─────────────┐
                                                           │   Kafka /   │
                                                           │   Redis     │
                                                           └─────────────┘

Advantages:

  • Zero impact on source table performance
  • Captures all changes including those from triggers and stored procedures
  • Preserves transaction boundaries and ordering
  • Can capture deletes without soft-delete columns

Trade-offs:

  • Requires database configuration (replication slots, binlog retention)
  • Schema changes need careful handling
  • Log retention limits how far back you can replay

DeltaForge uses log-based CDC exclusively. This allows DeltaForge to provide stronger ordering guarantees, lower source impact, and simpler operational semantics than hybrid approaches that mix log tailing with polling or triggers.

Trigger-based CDC

Database triggers fire on INSERT, UPDATE, and DELETE operations, writing change records to a shadow table that a separate process polls.

Advantages:

  • Works on databases without accessible transaction logs
  • Can capture application-level context unavailable in logs

Trade-offs:

  • Adds write overhead to every transaction
  • Triggers can be disabled or forgotten during schema migrations
  • Shadow tables require maintenance and can grow unbounded

Polling-based CDC

A process periodically queries tables for rows modified since the last check, typically using an updated_at timestamp or incrementing ID.

Advantages:

  • Simple to implement
  • No special database configuration required

Trade-offs:

  • Cannot reliably detect deletes
  • Requires updated_at columns on every table
  • Polling frequency trades off latency against database load
  • Clock skew and transaction visibility can cause missed or duplicate events

Anatomy of a CDC event

A well-designed CDC event contains everything downstream consumers need to process the change correctly.

{
  "id": "evt_01J7K9X2M3N4P5Q6R7S8T9U0V1",
  "source": {
    "database": "shop",
    "table": "orders",
    "server_id": "mysql-prod-1"
  },
  "operation": "update",
  "timestamp": "2025-01-15T14:32:01.847Z",
  "transaction": {
    "id": "gtid:3E11FA47-71CA-11E1-9E33-C80AA9429562:42",
    "position": 15847293
  },
  "before": {
    "id": 12345,
    "status": "pending",
    "total": 99.99,
    "updated_at": "2025-01-15T14:30:00.000Z"
  },
  "after": {
    "id": 12345,
    "status": "shipped",
    "total": 99.99,
    "updated_at": "2025-01-15T14:32:01.000Z"
  },
  "schema_version": "v3"
}

Key components:

FieldPurpose
operationINSERT, UPDATE, DELETE, or DDL
before / afterRow state before and after the change (enables diff logic)
transactionGroups changes from the same database transaction
timestampWhen the change was committed at the source
schema_versionHelps consumers handle schema evolution

Not all fields are present for every operation-before is omitted for INSERTs, after is omitted for DELETEs - but the event envelope and metadata fields are consistent across MySQL and Postgres sources.


Real-world use cases

Cache invalidation and population

Problem: Your Redis cache serves product catalog data, but cache invalidation logic is scattered across dozens of services. Some forget to invalidate, others invalidate too aggressively, and debugging staleness issues takes hours.

CDC solution: Stream changes from the products table to a message queue. A dedicated consumer reads the stream and updates or deletes cache keys based on the operation type. This centralizes invalidation logic, provides replay capability for cache rebuilds, and removes cache concerns from application code entirely.

┌──────────┐     CDC      ┌─────────────┐   consumer   ┌─────────────┐
│  MySQL   │─────────────▶│   Stream    │─────────────▶│ Cache Keys  │
│ products │              │  (Kafka/    │              │ product:123 │
└──────────┘              │   Redis)    │              └─────────────┘
                          └─────────────┘

Event-driven microservices

Problem: Your order service needs to notify inventory, shipping, billing, and analytics whenever an order changes state. Direct service-to-service calls create tight coupling and cascade failures.

CDC solution: Publish order changes to a durable message queue. Each downstream service subscribes independently, processes at its own pace, and can replay events after failures or during onboarding. The order service doesn’t need to know about its consumers.

┌─────────────┐     CDC      ┌─────────────┐
│   Orders    │─────────────▶│   Message   │
│   Database  │              │   Queue     │
└─────────────┘              └──────┬──────┘
                    ┌───────────────┼───────────────┐
                    ▼               ▼               ▼
             ┌───────────┐   ┌───────────┐   ┌───────────┐
             │ Inventory │   │ Shipping  │   │ Analytics │
             └───────────┘   └───────────┘   └───────────┘

Search index synchronization

Problem: Your Elasticsearch index drifts from the source of truth. Full reindexing takes hours and blocks search updates during the process.

CDC solution: Stream changes continuously to keep the index synchronized. Use the before image to remove old documents and the after image to index new content.

Data warehouse incremental loads

Problem: Nightly full loads into your warehouse take 6 hours and block analysts until noon. Business users complain that dashboards show yesterday’s numbers.

CDC solution: Stream changes to a staging topic, then micro-batch into your warehouse every few minutes. Analysts get near real-time data without impacting source systems.

Audit logging and compliance

Problem: Regulations require you to maintain a complete history of changes to sensitive data. Application-level audit logging is inconsistent and can be bypassed.

CDC solution: The transaction log captures every committed change regardless of how it was made - application code, admin scripts, or direct SQL. Stream these events to immutable storage for compliance.

Cross-region replication

Problem: You need to replicate data to a secondary region for disaster recovery, but built-in replication doesn’t support the transformations you need.

CDC solution: Stream changes through a CDC pipeline that filters, transforms, and routes events to the target region’s databases or message queues.


Architecture patterns

The outbox pattern

When you need to update a database and publish an event atomically, the outbox pattern provides exactly-once semantics without distributed transactions.

┌─────────────────────────────────────────┐
│              Single Transaction         │
│  ┌─────────────┐    ┌─────────────────┐ │
│  │ UPDATE      │    │ INSERT INTO     │ │
│  │ orders SET  │ +  │ outbox (event)  │ │
│  │ status=...  │    │                 │ │
│  └─────────────┘    └─────────────────┘ │
└─────────────────────────────────────────┘
                      │
                      ▼ CDC
               ┌─────────────┐
               │   Kafka     │
               └─────────────┘
  1. The application writes business data and an event record in the same transaction.
  2. CDC tails the outbox table and publishes events to Kafka.
  3. A cleanup process (or CDC itself) removes processed outbox rows.

This guarantees that events are published if and only if the transaction commits.

When to skip the outbox: If your only requirement is to react to committed database state (not application intent), direct CDC from business tables is often simpler than introducing an outbox. The outbox pattern adds value when you need custom event payloads, explicit event versioning, or when the event schema differs significantly from table structure.

Event sourcing integration

CDC complements event sourcing by bridging legacy systems that weren’t built event-first. Stream changes from existing tables into event stores, then gradually migrate to native event sourcing.

CQRS (Command Query Responsibility Segregation)

CDC naturally supports CQRS by populating read-optimized projections from the write model. Changes flow through the CDC pipeline to update denormalized views, search indexes, or cache layers.


Challenges and solutions

Schema evolution

Databases change. Columns get added, renamed, or removed. Types change. CDC pipelines need to handle this gracefully.

Strategies:

  • Schema registry: Store and version schemas centrally (e.g., Confluent Schema Registry with Avro/Protobuf).
  • Forward compatibility: Add columns as nullable; avoid removing columns that consumers depend on.
  • Consumer tolerance: Design consumers to ignore unknown fields and handle missing optional fields.
  • Processor transforms: Use DeltaForge’s JavaScript processors to normalize schemas before sinks.

Ordering guarantees

Events must arrive in the correct order for consumers to reconstruct state accurately. A DELETE arriving before its corresponding INSERT would be catastrophic.

DeltaForge guarantees:

  • Source-order preservation per table partition (always enabled)
  • Transaction boundary preservation when respect_source_tx: true is configured in batch settings

Kafka sink uses consistent partitioning by primary key to maintain ordering within a partition at the consumer.

Exactly-once delivery

Network failures, process crashes, and consumer restarts can cause duplicates or gaps. True exactly-once semantics require coordination between source, pipeline, and sink.

DeltaForge approach:

  • Checkpoints track the last committed position in the source log.
  • Configurable commit policies (all, required, quorum) control when checkpoints advance.
  • Kafka sink supports idempotent producers; transactional writes available via exactly_once: true.

Default behavior: DeltaForge provides at-least-once delivery out of the box. Exactly-once semantics require sink support and explicit configuration.

High availability

Production CDC pipelines need to handle failures without data loss or extended downtime.

Best practices:

  • Run multiple pipeline instances with leader election.
  • Store checkpoints in durable storage (DeltaForge persists to local files, mountable volumes in containers).
  • Monitor lag between source position and checkpoint position.
  • Set up alerts for pipeline failures and excessive lag.

Expectations: DeltaForge checkpoints ensure no data loss on restart, but does not currently include built-in leader election. For HA deployments, use external coordination (Kubernetes leader election, etcd locks) or run active-passive with health-check-based failover.

Backpressure

When sinks can’t keep up with the change rate, pipelines need to slow down gracefully rather than dropping events or exhausting memory.

DeltaForge handles backpressure through:

  • Configurable batch sizes (max_events, max_bytes, max_ms).
  • In-flight limits (max_inflight) that bound concurrent sink writes.
  • Blocking reads from source when batches queue up.

Performance considerations

Batching trade-offs

SettingLow valueHigh value
max_eventsLower latency, more overheadHigher throughput, more latency
max_msFaster flush, smaller batchesLarger batches, delayed flush
max_bytesMemory-safe, frequent commitsEfficient for large rows

Start with DeltaForge defaults and tune based on observed latency and throughput.

Source database impact

Log-based CDC has minimal impact, but consider:

  • Replication slot retention: Paused pipelines cause WAL/binlog accumulation.
  • Connection limits: Each pipeline holds a replication connection.
  • Network bandwidth: High-volume tables generate significant log traffic.

Sink throughput

  • Kafka: Tune batch.size, linger.ms, and compression in client_conf.
  • Redis: Use pipelining and connection pooling for high-volume streams.

Monitoring and observability

CDC pipelines are long-running, stateful processes; without metrics and alerts, failures are silent by default. A healthy pipeline requires visibility into lag, throughput, and errors.

Key metrics to track

MetricDescriptionAlert threshold
cdc_lag_secondsTime between event timestamp and processing> 60s
events_processed_totalThroughput counterSudden drops
checkpoint_lag_eventsEvents since last checkpoint> 10,000
sink_errors_totalFailed sink writesAny sustained errors
batch_size_avgEvents per batchOutside expected range

DeltaForge exposes Prometheus metrics on the configurable metrics endpoint (default :9000).

Health checks

  • GET /healthz: Liveness probe - is the process running?
  • GET /readyz: Readiness probe - are pipelines connected and processing?
  • GET /pipelines: Detailed status of each pipeline including configuration.

Choosing a CDC solution

When evaluating CDC tools, consider:

FactorQuestions to ask
Source supportDoes it support your databases? MySQL binlog? Postgres logical replication?
Sink flexibilityCan it write to your target systems? Kafka, Redis, HTTP, custom?
TransformationCan you filter, enrich, or reshape events in-flight?
Operational overheadHow much infrastructure does it require? JVM? Distributed coordinator?
Resource efficiencyWhat’s the memory/CPU footprint per pipeline?
Cloud-nativeDoes it containerize cleanly? Support health checks? Emit metrics?

Where DeltaForge fits

DeltaForge intentionally avoids the operational complexity of JVM-based CDC stacks (Kafka Connect-style deployments with Zookeeper, Connect workers, and converter configurations) while remaining compatible with Kafka-centric architectures.

DeltaForge is designed for teams that want:

  • Lightweight runtime: Single binary, minimal memory footprint, no JVM warmup.
  • Config-driven pipelines: YAML specs instead of code for common patterns.
  • Inline transformation: JavaScript processors for custom logic without recompilation.
  • Container-native operations: Built for Kubernetes with health endpoints and Prometheus metrics.

DeltaForge is not designed for:

  • Complex DAG-based stream processing with windowed aggregations
  • Stateful joins across multiple streams
  • Sources beyond MySQL and Postgres (currently)
  • Built-in schema registry integration (use external registries)

If you need those capabilities, consider dedicated stream processors or the broader Kafka ecosystem. DeltaForge excels at getting data out of databases and into your event infrastructure reliably and efficiently.


Getting started

Ready to try CDC with DeltaForge? Head to the Quickstart guide to run your first pipeline in minutes.

For production deployments, review the Development guide for container builds and operational best practices.

Quickstart

Get DeltaForge running in minutes.

1. Prepare a pipeline spec

Create a YAML file that defines your CDC pipeline. Environment variables are expanded at runtime, so secrets stay out of version control.

metadata:
  name: orders-mysql-to-kafka
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders

  processors:
    - type: javascript
      id: transform
      inline: |
        (event) => {
          event.tags = ["processed"];
          return [event];
        }

  sinks:
    - type: kafka
      config:
        id: orders-kafka
        brokers: ${KAFKA_BROKERS}
        topic: orders
        required: true

  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 1000

2. Start DeltaForge

Using Docker (recommended):

docker run --rm \
  -p 8080:8080 -p 9000:9000 \
  -e MYSQL_DSN="mysql://user:pass@host:3306/db" \
  -e KAFKA_BROKERS="kafka:9092" \
  -v $(pwd)/pipeline.yaml:/etc/deltaforge/pipeline.yaml:ro \
  -v deltaforge-checkpoints:/app/data \
  ghcr.io/vnvo/deltaforge:latest \
  --config /etc/deltaforge/pipeline.yaml

From source:

cargo run -p runner -- --config ./pipeline.yaml

Runner options

FlagDefaultDescription
--config(required)Path to pipeline spec file or directory
--api-addr0.0.0.0:8080REST API address
--metrics-addr0.0.0.0:9095Prometheus metrics address

3. Verify it’s running

Check health and pipeline status:

# Liveness probe
curl http://localhost:8080/healthz

# Readiness with pipeline status
curl http://localhost:8080/readyz

# List all pipelines
curl http://localhost:8080/pipelines

4. Manage pipelines

Control pipelines via the REST API:

# Pause a pipeline
curl -X POST http://localhost:8080/pipelines/orders-mysql-to-kafka/pause

# Resume a pipeline
curl -X POST http://localhost:8080/pipelines/orders-mysql-to-kafka/resume

# Stop a pipeline
curl -X POST http://localhost:8080/pipelines/orders-mysql-to-kafka/stop

Next steps

Configuration

Pipelines are defined as YAML documents that map directly to the PipelineSpec type. Environment variables are expanded before parsing using ${VAR} syntax, so secrets and connection strings can be injected at runtime. Unknown variables (e.g. ${source.table}) pass through for use as routing templates.

Document structure

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: <pipeline-name>
  tenant: <tenant-id>
spec:
  source: { ... }
  processors: [ ... ]
  sinks: [ ... ]
  batch: { ... }
  commit_policy: { ... }
  schema_sensing: { ... }

Metadata

FieldTypeRequiredDescription
namestringYesUnique pipeline identifier. Used in API routes and metrics.
tenantstringYesBusiness-oriented tenant label for multi-tenancy.

Spec fields

FieldTypeRequiredDescription
sourceobjectYesDatabase source configuration. See Sources.
processorsarrayNoOrdered list of processors. See Processors.
sinksarrayYes (at least one)One or more sinks that receive each batch. See Sinks.
shardingobjectNoOptional hint for downstream distribution.
connection_policyobjectNoHow the runtime establishes upstream connections.
batchobjectNoCommit unit thresholds. See Batching.
commit_policyobjectNoHow sink acknowledgements gate checkpoints. See Commit policy.
schema_sensingobjectNoAutomatic schema inference from event payloads. See Schema sensing.

Sources

MySQL

Captures row-level changes via binlog replication. See MySQL source documentation for prerequisites and detailed configuration.

source:
  type: mysql
  config:
    id: orders-mysql
    dsn: ${MYSQL_DSN}
    tables:
      - shop.orders
      - shop.order_items
    outbox:
      tables: ["shop.outbox"]
    snapshot:
      mode: initial
FieldTypeDescription
idstringUnique identifier for checkpoints and metrics
dsnstringMySQL connection string with replication privileges
tablesarrayTable patterns to capture; omit for all tables
outbox.tablesarrayTable patterns to tag as outbox events. Must also appear in tables. Supports globs: shop.outbox, *.outbox, shop.outbox_%.
snapshot.modestringnever (default), initial - run once if no checkpoint exists, always - re-snapshot on every restart
snapshot.max_parallel_tablesintTables snapshotted concurrently (default: 8)
snapshot.chunk_sizeintRows per range chunk for integer-PK tables (default: 10000)

Table patterns support SQL LIKE syntax:

  • db.table - exact match
  • db.prefix% - tables matching prefix
  • db.% - all tables in database

PostgreSQL

Captures row-level changes via logical replication. See PostgreSQL source documentation for prerequisites and detailed configuration.

source:
  type: postgres
  config:
    id: users-postgres
    dsn: ${POSTGRES_DSN}
    slot: deltaforge_users
    publication: users_pub
    tables:
      - public.users
      - public.sessions
    start_position: earliest
    outbox:
      prefixes: ["outbox", "order_outbox_%"]
    snapshot:
      mode: initial
FieldTypeDescription
idstringUnique identifier
dsnstringPostgreSQL connection string
slotstringReplication slot name
publicationstringPublication name
tablesarrayTable patterns to capture
start_positionstringearliest, latest, or lsn
outbox.prefixesarraypg_logical_emit_message prefixes to tag as outbox events. Supports globs: outbox, outbox_%, *.
snapshot.modestringnever (default), initial - run once if no checkpoint exists, always - re-snapshot on every restart
snapshot.max_parallel_tablesintTables snapshotted concurrently (default: 8)
snapshot.chunk_sizeintRows per range chunk (default: 10000)

Note: The source-level outbox field only tags matching events with the __outbox sentinel. Routing and transformation are handled by the outbox processor.


Processors

Processors transform events between source and sinks. They run in order and can filter, enrich, or modify events.

JavaScript

processors:
  - type: javascript
    id: transform
    inline: |
      function processBatch(events) {
        return events.map(e => {
          e.tags = ["processed"];
          return e;
        });
      }
    limits:
      cpu_ms: 50
      mem_mb: 128
      timeout_ms: 500
FieldTypeDescription
idstringProcessor identifier
inlinestringJavaScript code
limits.cpu_msintCPU time limit
limits.mem_mbintMemory limit
limits.timeout_msintExecution timeout

Flatten

processors:
  - type: flatten
    id: flat
    separator: "__"
    max_depth: 3
    on_collision: last
    empty_object: preserve
    lists: preserve
    empty_list: drop
FieldTypeDefaultDescription
idstring"flatten"Processor identifier
separatorstring"__"Separator between path segments
max_depthintunlimitedStop recursing at this depth; objects at the boundary kept as-is
on_collisionstringlastKey collision policy: last, first, or error
empty_objectstringpreserveEmpty object policy: preserve, drop, or null
listsstringpreserveArray policy: preserve or index
empty_liststringpreserveEmpty array policy: preserve, drop, or null

Outbox

Transforms raw outbox events into routed, sink-ready events. Requires the source to have outbox configured so events are tagged before reaching this processor. See Outbox pattern documentation for full details.

processors:
  - type: outbox
    id: outbox
    topic: "${aggregate_type}.${event_type}"
    default_topic: "events.unrouted"
    raw_payload: true
    columns:
      payload: data
    additional_headers:
      x-trace-id: trace_id
FieldTypeDefaultDescription
idstring"outbox"Processor identifier
tablesarray[]Filter: only process outbox events matching these patterns. Empty = all outbox events.
topicstringTopic template resolved against the raw payload using ${field} placeholders
default_topicstringFallback topic when template resolution fails and no topic column exists
keystringKey template resolved against raw payload. Default: aggregate_id value.
raw_payloadboolfalseDeliver the extracted payload as-is to sinks, bypassing envelope wrapping
strictboolfalseFail the batch if required fields are missing rather than silently falling back
columns.payloadstringpayloadColumn containing the event payload
columns.aggregate_typestringaggregate_typeColumn for aggregate type
columns.aggregate_idstringaggregate_idColumn for aggregate ID
columns.event_typestringevent_typeColumn for event type
columns.topicstringtopicColumn for pre-computed topic override
columns.event_idstringidColumn extracted as df-event-id header
additional_headersmap{}Forward extra payload fields as routing headers: header-name: column-name

Sinks

Sinks deliver events to downstream systems. Each sink supports configurable envelope formats and wire encodings to match consumer expectations. See the Sinks documentation for detailed information on multi-sink patterns, commit policies, and failure handling.

Envelope and encoding

All sinks support these serialization options:

FieldTypeDefaultDescription
envelopeobjectnativeOutput structure format. See Envelopes.
encodingstringjsonWire encoding format

Envelope types:

  • native - Direct Debezium payload structure (default, most efficient)
  • debezium - Full {"payload": ...} wrapper
  • cloudevents - CloudEvents 1.0 specification (requires type_prefix)
# Native (default)
envelope:
  type: native

# Debezium wrapper
envelope:
  type: debezium

# CloudEvents
envelope:
  type: cloudevents
  type_prefix: "com.example.cdc"

Kafka

See Kafka sink documentation for detailed configuration options and best practices.

sinks:
  - type: kafka
    config:
      id: orders-kafka
      brokers: ${KAFKA_BROKERS}
      topic: orders
      envelope:
        type: debezium
      encoding: json
      required: true
      exactly_once: false
      send_timeout_secs: 30
      client_conf:
        security.protocol: SASL_SSL
FieldTypeDefaultDescription
idstring-Sink identifier
brokersstring-Kafka broker addresses
topicstring-Target topic or template
keystring-Message key template
envelopeobjectnativeOutput format
encodingstringjsonWire encoding
requiredbooltrueGates checkpoints
exactly_onceboolfalseTransactional mode
send_timeout_secsint30Send timeout
client_confmap-librdkafka overrides

CloudEvents example:

sinks:
  - type: kafka
    config:
      id: events-kafka
      brokers: ${KAFKA_BROKERS}
      topic: events
      envelope:
        type: cloudevents
        type_prefix: "com.acme.cdc"
      encoding: json

Redis

See Redis sink documentation for detailed configuration options and best practices.

sinks:
  - type: redis
    config:
      id: orders-redis
      uri: ${REDIS_URI}
      stream: orders
      envelope:
        type: native
      encoding: json
      required: true
FieldTypeDefaultDescription
idstring-Sink identifier
uristring-Redis connection URI
streamstring-Redis stream key or template
keystring-Entry key template
envelopeobjectnativeOutput format
encodingstringjsonWire encoding
requiredbooltrueGates checkpoints
send_timeout_secsint5XADD timeout
batch_timeout_secsint30Pipeline timeout
connect_timeout_secsint10Connection timeout

NATS

See NATS sink documentation for detailed configuration options and best practices.

sinks:
  - type: nats
    config:
      id: orders-nats
      url: ${NATS_URL}
      subject: orders.events
      stream: ORDERS
      envelope:
        type: native
      encoding: json
      required: true
      send_timeout_secs: 5
      batch_timeout_secs: 30
FieldTypeDefaultDescription
idstring-Sink identifier
urlstring-NATS server URL
subjectstring-Subject or template
keystring-Message key template
streamstring-JetStream stream name
envelopeobjectnativeOutput format
encodingstringjsonWire encoding
requiredbooltrueGates checkpoints
send_timeout_secsint5Publish timeout
batch_timeout_secsint30Batch timeout
connect_timeout_secsint10Connection timeout
credentials_filestring-NATS credentials file
usernamestring-Auth username
passwordstring-Auth password
tokenstring-Auth token

Batching

batch:
  max_events: 500
  max_bytes: 1048576
  max_ms: 1000
  respect_source_tx: true
  max_inflight: 2
FieldTypeDefaultDescription
max_eventsint500Flush after this many events
max_bytesint1048576Flush after size reaches limit
max_msint1000Flush after time (ms)
respect_source_txbooltrueNever split source transactions
max_inflightint2Max concurrent batches

Commit policy

commit_policy:
  mode: required

# For quorum mode:
commit_policy:
  mode: quorum
  quorum: 2
ModeDescription
allEvery sink must acknowledge before checkpoint
requiredOnly required: true sinks must acknowledge (default)
quorumCheckpoint after quorum sinks acknowledge

Schema sensing

Schema sensing automatically infers and tracks schema from event payloads. See the Schema Sensing documentation for detailed information on how it works, drift detection, and API endpoints.

Performance tip: Schema sensing can be CPU-intensive, especially with deep JSON inspection. Consider your throughput requirements when configuring:

  • Set enabled: false if you don’t need runtime schema inference
  • Limit deep_inspect.max_depth to avoid traversing deeply nested structures
  • Increase sampling.sample_rate to analyze fewer events (e.g., 1 in 100 instead of 1 in 10)
  • Reduce sampling.warmup_events if you’re confident in schema stability
schema_sensing:
  enabled: true
  deep_inspect:
    enabled: true
    max_depth: 3
    max_sample_size: 500
  sampling:
    warmup_events: 50
    sample_rate: 5
    structure_cache: true
    structure_cache_size: 50
  high_cardinality:
    enabled: true
    min_events: 100
    stable_threshold: 0.5
    min_dynamic_fields: 5
FieldTypeDefaultDescription
enabledboolfalseEnable schema sensing
deep_inspect.enabledbooltrueInspect nested JSON
deep_inspect.max_depthint10Max nesting depth
deep_inspect.max_sample_sizeint1000Max events for deep analysis
sampling.warmup_eventsint1000Events to fully analyze first
sampling.sample_rateint10After warmup, analyze 1 in N
sampling.structure_cachebooltrueCache structure fingerprints
sampling.structure_cache_sizeint100Max cached structures
high_cardinality.enabledbooltrueDetect dynamic map keys
high_cardinality.min_eventsint100Events before classification
high_cardinality.stable_thresholdfloat0.5Frequency for stable fields
high_cardinality.min_dynamic_fieldsint5Min unique fields for map

Complete examples

MySQL to Kafka with Debezium envelope

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-mysql-to-kafka
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders

  processors:
    - type: javascript
      id: transform
      inline: |
        function processBatch(events) {
          return events.map(event => {
            event.tags = (event.tags || []).concat(["normalized"]);
            return event;
          });
        }
      limits:
        cpu_ms: 50
        mem_mb: 128
        timeout_ms: 500

  sinks:
    - type: kafka
      config:
        id: orders-kafka
        brokers: ${KAFKA_BROKERS}
        topic: orders
        envelope:
          type: debezium
        encoding: json
        required: true
        exactly_once: false
        client_conf:
          message.timeout.ms: "5000"

  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 1000
    respect_source_tx: true
    max_inflight: 2

  commit_policy:
    mode: required

PostgreSQL to Kafka with CloudEvents

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: users-postgres-to-kafka
  tenant: acme

spec:
  source:
    type: postgres
    config:
      id: users-postgres
      dsn: ${POSTGRES_DSN}
      slot: deltaforge_users
      publication: users_pub
      tables:
        - public.users
        - public.user_sessions
      start_position: earliest

  sinks:
    - type: kafka
      config:
        id: users-kafka
        brokers: ${KAFKA_BROKERS}
        topic: user-events
        envelope:
          type: cloudevents
          type_prefix: "com.acme.users"
        encoding: json
        required: true

  batch:
    max_events: 500
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: required

Multi-sink with different formats

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-multi-sink
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders

  sinks:
    # Kafka Connect expects Debezium format
    - type: kafka
      config:
        id: connect-sink
        brokers: ${KAFKA_BROKERS}
        topic: connect-events
        envelope:
          type: debezium
        required: true

    # Lambda expects CloudEvents
    - type: kafka
      config:
        id: lambda-sink
        brokers: ${KAFKA_BROKERS}
        topic: lambda-events
        envelope:
          type: cloudevents
          type_prefix: "com.acme.cdc"
        required: false

    # Redis cache uses native format
    - type: redis
      config:
        id: cache-redis
        uri: ${REDIS_URI}
        stream: orders-cache
        envelope:
          type: native
        required: false

  batch:
    max_events: 500
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: required

MySQL to NATS

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-mysql-to-nats
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders
        - shop.order_items

  sinks:
    - type: nats
      config:
        id: orders-nats
        url: ${NATS_URL}
        subject: orders.events
        stream: ORDERS
        envelope:
          type: native
        encoding: json
        required: true

  batch:
    max_events: 500
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: required

REST API Reference

DeltaForge exposes a REST API for health checks, pipeline management, schema inspection, and drift detection. All endpoints return JSON.

Base URL

Default: http://localhost:8080

Configure with --api-addr:

deltaforge --config pipelines.yaml --api-addr 0.0.0.0:9090

Health Endpoints

Liveness Probe

GET /healthz

Returns ok if the process is running. Use for Kubernetes liveness probes.

Response: 200 OK

ok

Readiness Probe

GET /readyz

Returns pipeline states. Use for Kubernetes readiness probes.

Response: 200 OK

{
  "status": "ready",
  "pipelines": [
    {
      "name": "orders-cdc",
      "status": "running",
      "spec": { ... }
    }
  ]
}

Pipeline Management

List Pipelines

GET /pipelines

Returns all pipelines with current status.

Response: 200 OK

[
  {
    "name": "orders-cdc",
    "status": "running",
    "spec": {
      "metadata": { "name": "orders-cdc", "tenant": "acme" },
      "spec": { ... }
    }
  }
]

Get Pipeline

GET /pipelines/{name}

Returns a single pipeline by name.

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "running",
  "spec": { ... }
}

Errors:

  • 404 Not Found - Pipeline doesn’t exist

Create Pipeline

POST /pipelines
Content-Type: application/json

Creates a new pipeline from a full spec.

Request:

{
  "metadata": {
    "name": "orders-cdc",
    "tenant": "acme"
  },
  "spec": {
    "source": {
      "type": "mysql",
      "config": {
        "id": "mysql-1",
        "dsn": "mysql://user:pass@host/db",
        "tables": ["shop.orders"]
      }
    },
    "processors": [],
    "sinks": [
      {
        "type": "kafka",
        "config": {
          "id": "kafka-1",
          "brokers": "localhost:9092",
          "topic": "orders"
        }
      }
    ]
  }
}

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "running",
  "spec": { ... }
}

Errors:

  • 409 Conflict - Pipeline already exists

Update Pipeline

PATCH /pipelines/{name}
Content-Type: application/json

Applies a partial update to an existing pipeline. The pipeline is stopped, updated, and restarted.

Request:

{
  "spec": {
    "batch": {
      "max_events": 1000,
      "max_ms": 500
    }
  }
}

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "running",
  "spec": { ... }
}

Errors:

  • 404 Not Found - Pipeline doesn’t exist
  • 400 Bad Request - Name mismatch in patch

Delete Pipeline

DELETE /pipelines/{name}

Permanently deletes a pipeline. This removes the pipeline from the runtime and cannot be undone.

Response: 204 No Content

Errors:

  • 404 Not Found - Pipeline doesn’t exist

Pause Pipeline

POST /pipelines/{name}/pause

Pauses ingestion. Events in the buffer are not processed until resumed.

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "paused",
  "spec": { ... }
}

Resume Pipeline

POST /pipelines/{name}/resume

Resumes a paused pipeline.

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "running",
  "spec": { ... }
}

Stop Pipeline

POST /pipelines/{name}/stop

Stops a pipeline. Final checkpoint is saved.

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "stopped",
  "spec": { ... }
}

Schema Management

List Database Schemas

GET /pipelines/{name}/schemas

Returns all tracked database schemas for a pipeline. These are the schemas loaded directly from the source database.

Response: 200 OK

[
  {
    "database": "shop",
    "table": "orders",
    "column_count": 5,
    "primary_key": ["id"],
    "fingerprint": "sha256:a1b2c3d4e5f6...",
    "registry_version": 2
  },
  {
    "database": "shop",
    "table": "customers",
    "column_count": 8,
    "primary_key": ["id"],
    "fingerprint": "sha256:f6e5d4c3b2a1...",
    "registry_version": 1
  }
]

Get Schema Details

GET /pipelines/{name}/schemas/{db}/{table}

Returns detailed schema information including all columns.

Response: 200 OK

{
  "database": "shop",
  "table": "orders",
  "columns": [
    {
      "name": "id",
      "type": "bigint(20) unsigned",
      "nullable": false,
      "default": null,
      "extra": "auto_increment"
    },
    {
      "name": "customer_id",
      "type": "bigint(20)",
      "nullable": false,
      "default": null
    }
  ],
  "primary_key": ["id"],
  "fingerprint": "sha256:a1b2c3d4..."
}

Schema Sensing

Schema sensing automatically infers schema structure from JSON event payloads. This is useful for sources that don’t provide schema metadata or for detecting schema evolution in JSON columns.

List Inferred Schemas

GET /pipelines/{name}/sensing/schemas

Returns all schemas inferred via sensing for a pipeline.

Response: 200 OK

[
  {
    "table": "orders",
    "fingerprint": "sha256:abc123...",
    "sequence": 3,
    "event_count": 1500,
    "stabilized": true,
    "first_seen": "2025-01-15T10:30:00Z",
    "last_seen": "2025-01-15T14:22:00Z"
  }
]
FieldDescription
tableTable name (or table:column for JSON column sensing)
fingerprintSHA-256 content hash of current schema
sequenceMonotonic version number (increments on evolution)
event_countTotal events observed
stabilizedWhether schema has stopped sampling (structure stable)
first_seenFirst observation timestamp
last_seenMost recent observation timestamp

Get Inferred Schema Details

GET /pipelines/{name}/sensing/schemas/{table}

Returns detailed inferred schema including all fields.

Response: 200 OK

{
  "table": "orders",
  "fingerprint": "sha256:abc123...",
  "sequence": 3,
  "event_count": 1500,
  "stabilized": true,
  "fields": [
    {
      "name": "id",
      "types": ["integer"],
      "nullable": false,
      "optional": false
    },
    {
      "name": "metadata",
      "types": ["object"],
      "nullable": true,
      "optional": false,
      "nested_field_count": 5
    },
    {
      "name": "tags",
      "types": ["array"],
      "nullable": false,
      "optional": true,
      "array_element_types": ["string"]
    }
  ],
  "first_seen": "2025-01-15T10:30:00Z",
  "last_seen": "2025-01-15T14:22:00Z"
}

Export JSON Schema

GET /pipelines/{name}/sensing/schemas/{table}/json-schema

Exports the inferred schema as a standard JSON Schema document.

Response: 200 OK

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "orders",
  "type": "object",
  "properties": {
    "id": { "type": "integer" },
    "metadata": { "type": ["object", "null"] },
    "tags": {
      "type": "array",
      "items": { "type": "string" }
    }
  },
  "required": ["id", "metadata"]
}

Get Sensing Cache Statistics

GET /pipelines/{name}/sensing/stats

Returns cache performance statistics for schema sensing.

Response: 200 OK

{
  "tables": [
    {
      "table": "orders",
      "cached_structures": 3,
      "max_cache_size": 100,
      "cache_hits": 1450,
      "cache_misses": 50
    }
  ],
  "total_cache_hits": 1450,
  "total_cache_misses": 50,
  "hit_rate": 0.9667
}

Drift Detection

Drift detection compares expected database schema against observed data patterns to detect mismatches, unexpected nulls, and type drift.

Get Drift Results

GET /pipelines/{name}/drift

Returns drift detection results for all tables in a pipeline.

Response: 200 OK

[
  {
    "table": "orders",
    "has_drift": true,
    "columns": [
      {
        "column": "amount",
        "expected_type": "decimal(10,2)",
        "observed_types": ["string"],
        "mismatch_count": 42,
        "examples": ["\"99.99\""]
      }
    ],
    "events_analyzed": 1500,
    "events_with_drift": 42
  }
]

Get Table Drift

GET /pipelines/{name}/drift/{table}

Returns drift detection results for a specific table.

Response: 200 OK

{
  "table": "orders",
  "has_drift": false,
  "columns": [],
  "events_analyzed": 1000,
  "events_with_drift": 0
}

Errors:

  • 404 Not Found - Table not found or no drift data available

Error Responses

All error responses follow this format:

{
  "error": "Description of the error"
}
Status CodeMeaning
400 Bad RequestInvalid request body or parameters
404 Not FoundResource doesn’t exist
409 ConflictResource already exists
500 Internal Server ErrorUnexpected server error

Pipelines

Each pipeline is created from a single PipelineSpec. The runtime spawns the source, processors, and sinks defined in the spec and coordinates them with batching and checkpointing.

  • 🔄 Live control: pause, resume, or stop pipelines through the REST API without redeploying.
  • 📦 Coordinated delivery: batching and commit policy keep sinks consistent even when multiple outputs are configured.

Lifecycle controls

The REST API addresses pipelines by metadata.name and returns PipeInfo records containing the live spec and status.

  • GET /healthz - liveness probe.
  • GET /readyz - readiness with pipeline states.
  • GET /pipelines - list pipelines.
  • POST /pipelines - create from a full spec.
  • PATCH /pipelines/{name} - merge a partial spec (for example, adjust batch thresholds) and restart the pipeline.
  • POST /pipelines/{name}/pause - pause ingestion and coordination.
  • POST /pipelines/{name}/resume - resume a paused pipeline.
  • POST /pipelines/{name}/stop - stop a running pipeline.

Pausing halts both source ingestion and the coordinator. Resuming re-enables both ends so buffered events can drain cleanly.

Processors

Processors run in the declared order for each batch. The built-in processor type is JavaScript, powered by deno_core.

  • type: javascript
    • id: processor label.
    • inline: JS source. Export a processBatch(events) function that returns the transformed batch.
    • limits (optional): resource guardrails (cpu_ms, mem_mb, timeout_ms).

Batching

The coordinator builds batches using soft thresholds:

  • max_events: flush after this many events.
  • max_bytes: flush after the serialized size reaches this limit.
  • max_ms: flush after this much time has elapsed since the batch started.
  • respect_source_tx: when true, never split a single source transaction across batches.
  • max_inflight: cap the number of batches being processed concurrently.

Commit policy

When multiple sinks are configured, checkpoints can wait for different acknowledgement rules:

  • all: every sink must acknowledge.
  • required (default): only sinks marked required: true must acknowledge; others are best-effort.
  • quorum: checkpoint after at least quorum sinks acknowledge.

Sources

DeltaForge captures database changes through pluggable source connectors. Each source is configured under spec.source with a type field and a config object. Environment variables are expanded before parsing using ${VAR} syntax.

Supported Sources

SourceStatusDescription
mysql✅ ProductionMySQL binlog CDC with GTID support
postgres✅ ProductionPostgreSQL logical replication via pgoutput
turso🔧 BetaTurso/libSQL CDC with multiple modes

Common Behavior

All sources share these characteristics:

  • Checkpointing: Progress is automatically saved and resumed on restart
  • Schema tracking: Table schemas are loaded and fingerprinted for change detection
  • At-least-once delivery: Events may be redelivered after failures; sinks should be idempotent
  • Batching: Events are batched according to the pipeline’s batch configuration
  • Transaction boundaries: respect_source_tx: true (default) keeps source transactions intact

Source Interface

Sources implement a common trait that provides:

#![allow(unused)]
fn main() {
trait Source {
    fn checkpoint_key(&self) -> &str;
    async fn run(&self, tx: Sender<Event>, checkpoint_store: Arc<dyn CheckpointStore>) -> SourceHandle;
}
}

The returned SourceHandle supports pause/resume and graceful cancellation.

Adding Custom Sources

The source interface is pluggable. To add a new source:

  1. Implement the Source trait
  2. Add configuration parsing in deltaforge-config
  3. Register the source type in the pipeline builder

See existing sources for implementation patterns.

MySQL

MySQL source

DeltaForge tails the MySQL binlog to capture row-level changes with automatic checkpointing and schema tracking.

Prerequisites

MySQL Server Configuration

Ensure your MySQL server has binary logging enabled with row-based format:

-- Required server settings (my.cnf or SET GLOBAL)
log_bin = ON
binlog_format = ROW
binlog_row_image = FULL  -- Recommended for complete before-images

If binlog_row_image is not FULL, DeltaForge will warn at startup and before-images on UPDATE/DELETE events may be incomplete.

User Privileges

Create a dedicated replication user with the required grants:

-- Create user with mysql_native_password (required by binlog connector)
CREATE USER 'deltaforge'@'%' IDENTIFIED WITH mysql_native_password BY 'your_password';

-- Replication privileges (required)
GRANT REPLICATION REPLICA, REPLICATION CLIENT ON *.* TO 'deltaforge'@'%';

-- Schema introspection (required for table discovery)
GRANT SELECT, SHOW VIEW ON your_database.* TO 'deltaforge'@'%';

FLUSH PRIVILEGES;

For capturing all databases, grant SELECT on *.* instead.

Configuration

Set spec.source.type to mysql and provide a config object:

FieldTypeRequiredDescription
idstringYesUnique identifier used for checkpoints, server_id derivation, and metrics
dsnstringYesMySQL connection string with replication privileges
tablesarrayNoTable patterns to capture; omit or leave empty to capture all user tables

Table Patterns

The tables field supports flexible pattern matching using SQL LIKE syntax:

tables:
  - shop.orders          # exact match: database "shop", table "orders"
  - shop.order_%         # LIKE pattern: tables starting with "order_" in "shop"
  - analytics.*          # wildcard: all tables in "analytics" database
  - %.audit_log          # cross-database: "audit_log" table in any database
  # omit entirely to capture all user tables (excludes system schemas)

System schemas (mysql, information_schema, performance_schema, sys) are always excluded.

Example

source:
  type: mysql
  config:
    id: orders-mysql
    dsn: ${MYSQL_DSN}
    tables:
      - shop.orders
      - shop.order_items

Resume Behavior

DeltaForge automatically checkpoints progress and resumes from the last position on restart. The resume strategy follows this priority:

  1. GTID: Preferred if the MySQL server has GTID enabled. Provides the most reliable resume across binlog rotations and failovers.
  2. File:position: Used when GTID is not available. Resumes from the exact binlog file and byte offset.
  3. Binlog tail: On first run with no checkpoint, starts from the current end of the binlog (no historical replay).

Checkpoints are stored using the id field as the key.

Snapshot (Initial Load)

DeltaForge can perform a consistent initial snapshot of existing table data before starting binlog streaming. This is the recommended approach for migrating existing tables without manual backfills.

How it works

DeltaForge opens all worker connections simultaneously, each with START TRANSACTION WITH CONSISTENT SNAPSHOT. The binlog position is captured after all workers have started - InnoDB guarantees every visible row was committed at or before that position, so CDC streaming from there has no gaps.

No FLUSH TABLES WITH READ LOCK or RELOAD privilege is required.

Additional privileges

-- Required for snapshot (SELECT is already needed for introspection)
GRANT SELECT ON your_database.* TO 'deltaforge'@'%';

Configuration

source:
  type: mysql
  config:
    id: orders-mysql
    dsn: ${MYSQL_DSN}
    tables:
      - shop.orders
    snapshot:
      mode: initial           # initial | always | never (default: never)
      max_parallel_tables: 8  # tables snapshotted concurrently
      chunk_size: 10000       # rows per chunk for integer-PK tables
FieldDefaultDescription
modeneverinitial: run once if no checkpoint exists; always: re-snapshot on every restart; never: skip
max_parallel_tables8Tables snapshotted concurrently
chunk_size10000Rows per range chunk (integer single-column PK tables only; others do a full scan)

Snapshot events

Snapshot rows are emitted as Op::Read events (Debezium op: "r"), distinguishable from live CDC Op::Create events. The binlog position captured at snapshot time becomes the CDC resume point, so no rows are missed or duplicated.

Resume after interruption

If the snapshot is interrupted, DeltaForge resumes at table granularity on the next restart - already-completed tables are skipped.

Binlog retention safety

DeltaForge validates binlog retention before starting a snapshot and monitors it throughout. This prevents the silent failure mode where a long snapshot completes successfully but CDC startup then fails because the captured binlog position was purged.

Preflight checks (before any rows are read):

  • Fails hard if log_bin=0 or binlog_format != ROW
  • Estimates snapshot duration from table sizes and max_parallel_tables
  • Warns at ≥50% of binlog_expire_logs_seconds usage; HIGH RISK at ≥80%

During snapshot:

  • Background task polls SHOW BINARY LOGS every 30s
  • Cancels the snapshot immediately if the captured file is purged

After all tables complete:

  • Synchronous final check before writing finished=true
  • finished=true means the position is confirmed valid for CDC resume, not just that rows were emitted

If you see retention risk warnings, the recommended actions are:

  1. Increase binlog_expire_logs_seconds to cover the estimated snapshot duration
  2. Use a read replica as the snapshot source to avoid load on the primary

Server ID Handling

MySQL replication requires each replica to have a unique server_id. DeltaForge derives this automatically from the source id using a CRC32 hash:

server_id = 1 + (CRC32(id) % 4,000,000,000)

When running multiple DeltaForge instances against the same MySQL server, ensure each has a unique id to avoid server_id conflicts.

Schema Tracking

DeltaForge has a built-in schema registry to track table schemas, per source. For MySQL source:

  • Schemas are preloaded at startup by querying INFORMATION_SCHEMA
  • Each schema is fingerprinted using SHA-256 for change detection
  • Events carry schema_version (fingerprint) and schema_sequence (monotonic counter)
  • Schema-to-checkpoint correlation enables reliable replay

Schema changes (DDL) trigger automatic reload of affected table schemas.

Timeouts and Heartbeats

BehaviorValueDescription
Heartbeat interval15sServer sends heartbeat if no events
Read timeout90sMaximum wait for next binlog event
Inactivity timeout60sTriggers reconnect if no data received
Connect timeout30sMaximum time to establish connection

These values are currently fixed. Reconnection uses exponential backoff with jitter.

Event Format

Each captured row change produces an event with:

  • op: insert, update, or delete
  • before: Previous row state (updates and deletes only, requires binlog_row_image = FULL)
  • after: New row state (inserts and updates only)
  • table: Fully qualified table name (database.table)
  • tx_id: GTID if available
  • checkpoint: Binlog position for resume
  • schema_version: Schema fingerprint
  • schema_sequence: Monotonic sequence for schema correlation

Troubleshooting

Connection Issues

If you see authentication errors mentioning mysql_native_password:

ALTER USER 'deltaforge'@'%' IDENTIFIED WITH mysql_native_password BY 'password';

Missing Before-Images

If UPDATE/DELETE events have incomplete before data:

SET GLOBAL binlog_row_image = 'FULL';

Binlog Not Enabled

Check binary logging status:

SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
SHOW BINARY LOG STATUS;  -- or SHOW MASTER STATUS on older versions

Privilege Issues

Verify grants for your user:

SHOW GRANTS FOR 'deltaforge'@'%';

Required grants include REPLICATION REPLICA and REPLICATION CLIENT.

PostgreSQL

PostgreSQL source

DeltaForge captures row-level changes from PostgreSQL using logical replication with the pgoutput plugin.

Prerequisites

PostgreSQL Server Configuration

Enable logical replication in postgresql.conf:

# Required settings
wal_level = logical
max_replication_slots = 10    # At least 1 per DeltaForge pipeline
max_wal_senders = 10          # At least 1 per DeltaForge pipeline

Restart PostgreSQL after changing these settings.

User Privileges

Create a replication user with the required privileges:

-- Create user with replication capability
CREATE ROLE deltaforge WITH LOGIN REPLICATION PASSWORD 'your_password';

-- Grant connect access
GRANT CONNECT ON DATABASE your_database TO deltaforge;

-- Grant schema usage and table access for schema introspection
GRANT USAGE ON SCHEMA public TO deltaforge;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO deltaforge;

-- For automatic publication/slot creation (optional)
-- If you prefer manual setup, skip this and create them yourself
ALTER ROLE deltaforge SUPERUSER;  -- Or use manual setup below

pg_hba.conf

Ensure your pg_hba.conf allows replication connections:

# TYPE  DATABASE        USER            ADDRESS                 METHOD
host    replication     deltaforge      0.0.0.0/0               scram-sha-256
host    your_database   deltaforge      0.0.0.0/0               scram-sha-256

Replication Slot and Publication

DeltaForge can automatically create the replication slot and publication on first run. Alternatively, create them manually:

-- Create publication for specific tables
CREATE PUBLICATION my_pub FOR TABLE public.orders, public.order_items;

-- Or for all tables
CREATE PUBLICATION my_pub FOR ALL TABLES;

-- Create replication slot
SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput');

Replica Identity

For complete before-images on UPDATE and DELETE operations, set tables to REPLICA IDENTITY FULL:

ALTER TABLE public.orders REPLICA IDENTITY FULL;
ALTER TABLE public.order_items REPLICA IDENTITY FULL;

Without this setting:

  • FULL: Complete row data in before-images
  • DEFAULT (primary key): Only primary key columns in before-images
  • NOTHING: No before-images at all

DeltaForge warns at startup if tables don’t have REPLICA IDENTITY FULL.

Configuration

Set spec.source.type to postgres and provide a config object:

FieldTypeRequiredDefaultDescription
idstringYesUnique identifier for checkpoints and metrics
dsnstringYesPostgreSQL connection string
slotstringYesReplication slot name
publicationstringYesPublication name
tablesarrayYesTable patterns to capture
start_positionstring/objectNoearliestWhere to start when no checkpoint exists

DSN Formats

DeltaForge accepts both URL-style and key=value DSN formats:

# URL style
dsn: "postgres://user:pass@localhost:5432/mydb"

# Key=value style
dsn: "host=localhost port=5432 user=deltaforge password=pass dbname=mydb"

Table Patterns

The tables field supports flexible pattern matching:

tables:
  - public.orders          # exact match: schema "public", table "orders"
  - public.order_%         # LIKE pattern: tables starting with "order_"
  - myschema.*             # wildcard: all tables in "myschema"
  - %.audit_log            # cross-schema: "audit_log" table in any schema
  - orders                 # defaults to public schema: "public.orders"

System schemas (pg_catalog, information_schema, pg_toast) are always excluded.

Start Position

Controls where replication begins when no checkpoint exists:

# Start from the earliest available position (slot's restart_lsn)
start_position: earliest

# Start from current WAL position (skip existing data)
start_position: latest

# Start from a specific LSN
start_position:
  lsn: "0/16B6C50"

Example

source:
  type: postgres
  config:
    id: orders-postgres
    dsn: ${POSTGRES_DSN}
    slot: deltaforge_orders
    publication: orders_pub
    tables:
      - public.orders
      - public.order_items
    start_position: earliest

Resume Behavior

DeltaForge checkpoints progress using PostgreSQL’s LSN (Log Sequence Number):

  1. With checkpoint: Resumes from the stored LSN
  2. Without checkpoint: Uses the slot’s confirmed_flush_lsn or restart_lsn
  3. New slot: Starts from pg_current_wal_lsn() or the configured start_position

Checkpoints are stored using the id field as the key.

Snapshot (Initial Load)

DeltaForge performs a consistent initial snapshot using PostgreSQL’s exported snapshot mechanism before starting logical replication.

How it works

A coordinator connection exports a snapshot and captures the current WAL LSN in a single round trip. Worker connections each import the shared snapshot into their own REPEATABLE READ transaction - all workers see the same consistent DB state with no locks held on the source.

Tables with a single integer primary key use PK-range chunking. All others fall back to ctid page-range chunking.

Configuration

source:
  type: postgres
  config:
    id: orders-postgres
    dsn: ${POSTGRES_DSN}
    slot: deltaforge_orders
    publication: orders_pub
    tables:
      - public.orders
    snapshot:
      mode: initial           # initial | always | never (default: never)
      max_parallel_tables: 8  # tables snapshotted concurrently
      chunk_size: 10000       # rows per chunk for integer-PK tables
FieldDefaultDescription
modeneverinitial: run once if no checkpoint exists; always: re-snapshot on every restart; never: skip
max_parallel_tables8Tables snapshotted concurrently
chunk_size10000Rows per range chunk (integer PK tables only; others use ctid chunking)

Snapshot events

Snapshot rows are emitted as Op::Read events (Debezium op: "r"), distinguishable from live CDC Op::Create events. The WAL LSN captured at snapshot time becomes the CDC resume point - no rows are missed or duplicated.

Resume after interruption

If the snapshot is interrupted, DeltaForge resumes at table granularity on the next restart - already-completed tables are skipped.

WAL slot retention safety

DeltaForge validates replication slot health before starting a snapshot and monitors it throughout. This prevents the slot from being invalidated during a long snapshot, which would make the captured LSN unreachable for CDC resume.

Preflight checks (before any rows are read):

  • Fails hard if the slot does not exist or is already invalidated
  • Fails hard if wal_status=lost
  • Warns if wal_status=unreserved (WAL retention no longer guaranteed)
  • Estimates WAL generated during snapshot (~2× data size) against max_slot_wal_keep_size; warns at ≥50%, HIGH RISK at ≥80%

During snapshot:

  • Background task polls pg_replication_slots every 30s
  • Cancels immediately on slot invalidation or disappearance
  • Warns but continues on wal_status=unreserved

After all tables complete:

  • Synchronous final check before writing finished=true
  • finished=true means the position is confirmed valid for CDC resume, not just that rows were emitted

If you see WAL retention risk warnings:

ALTER SYSTEM SET max_slot_wal_keep_size = '10GB';
SELECT pg_reload_conf();

Type Handling

DeltaForge preserves PostgreSQL’s native type semantics:

PostgreSQL TypeJSON Representation
booleantrue / false
integer, bigintJSON number
real, double precisionJSON number
numericJSON string (preserves precision)
text, varcharJSON string
json, jsonbParsed JSON object/array
bytea{"_base64": "..."}
uuidJSON string
timestamp, date, timeISO 8601 string
Arrays (int[], text[], etc.)JSON array
TOAST unchanged{"_unchanged": true}

Event Format

Each captured row change produces an event with:

  • op: insert, update, delete, or truncate
  • before: Previous row state (updates and deletes, requires appropriate replica identity)
  • after: New row state (inserts and updates)
  • table: Fully qualified table name (schema.table)
  • tx_id: PostgreSQL transaction ID (xid)
  • checkpoint: LSN position for resume
  • schema_version: Schema fingerprint
  • schema_sequence: Monotonic sequence for schema correlation

WAL Management

Logical replication slots prevent WAL segments from being recycled until the consumer confirms receipt. To avoid disk space issues:

  1. Monitor slot lag: Check pg_replication_slots.restart_lsn vs pg_current_wal_lsn()
  2. Set retention limits: Configure max_slot_wal_keep_size (PostgreSQL 13+)
  3. Handle stale slots: Drop unused slots with pg_drop_replication_slot('slot_name')
-- Check slot status and lag
SELECT slot_name, 
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;

Troubleshooting

Connection Issues

If you see authentication errors:

-- Verify user has replication privilege
SELECT rolname, rolreplication FROM pg_roles WHERE rolname = 'deltaforge';

-- Check pg_hba.conf allows replication connections
-- Ensure the line type includes "replication" database

Missing Before-Images

If UPDATE/DELETE events have incomplete before data:

-- Check current replica identity
SELECT relname, relreplident 
FROM pg_class 
WHERE relname = 'your_table';
-- d = default, n = nothing, f = full, i = index

-- Set to FULL for complete before-images
ALTER TABLE your_table REPLICA IDENTITY FULL;

Slot/Publication Not Found

-- List existing publications
SELECT * FROM pg_publication;

-- List existing slots
SELECT * FROM pg_replication_slots;

-- Create if missing
CREATE PUBLICATION my_pub FOR TABLE public.orders;
SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput');

WAL Disk Usage Growing

-- Check slot lag
SELECT slot_name, 
       active,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;

-- If slot is inactive and not needed, drop it
SELECT pg_drop_replication_slot('unused_slot');

Logical Replication Not Enabled

-- Check wal_level
SHOW wal_level;  -- Should be 'logical'

-- If not, update postgresql.conf and restart PostgreSQL
-- wal_level = logical

Turso Source

⚠️ STATUS: EXPERIMENTAL / PAUSED

The Turso source is not yet ready for production use. Native CDC in Turso/libSQL is still evolving and has limitations:

  • CDC is per-connection (only changes from the enabling connection are captured)
  • File locking prevents concurrent access
  • sqld Docker image doesn’t have CDC support yet

This documentation is retained for reference. The code exists but is not officially supported.


The Turso source captures changes from Turso and libSQL databases. It supports multiple CDC modes to work with different database configurations and Turso versions.

CDC Modes

DeltaForge supports four CDC modes for Turso/libSQL:

ModeDescriptionRequirements
nativeUses Turso’s built-in CDC via turso_cdc tableTurso v0.1.2+ with CDC enabled
triggersShadow tables populated by database triggersStandard SQLite/libSQL
pollingTracks changes via rowid comparisonAny SQLite/libSQL (inserts only)
autoAutomatic fallback: native → triggers → pollingAny

Native Mode

Native mode uses Turso’s built-in CDC capabilities. This is the most efficient mode when available.

Requirements:

  • Turso database with CDC enabled
  • Turso server v0.1.2 or later

How it works:

  1. Queries the turso_cdc system table for changes
  2. Uses bin_record_json_object() to extract row data as JSON
  3. Tracks position via change ID in checkpoints

Triggers Mode

Triggers mode uses shadow tables and database triggers to capture changes. This works with standard SQLite and libSQL without requiring native CDC support.

How it works:

  1. Creates shadow tables (_df_cdc_{table}) for each tracked table
  2. Installs INSERT/UPDATE/DELETE triggers that write to shadow tables
  3. Polls shadow tables for new change records
  4. Cleans up processed records periodically

Polling Mode

Polling mode uses rowid tracking to detect new rows. This is the simplest mode but only captures inserts (not updates or deletes).

How it works:

  1. Tracks the maximum rowid seen per table
  2. Queries for rows with rowid greater than last seen
  3. Emits insert events for new rows

Limitations:

  • Only captures INSERT operations
  • Cannot detect UPDATE or DELETE
  • Requires tables to have accessible rowid (not WITHOUT ROWID tables)

Auto Mode

Auto mode tries each CDC mode in order and uses the first one that works:

  1. Try native mode (check for turso_cdc table)
  2. Try triggers mode (check for existing CDC triggers)
  3. Fall back to polling mode

This is useful for deployments where the database capabilities may vary.

Configuration

source:
  type: turso
  config:
    id: turso-main
    url: "libsql://your-db.turso.io"
    auth_token: "${TURSO_AUTH_TOKEN}"
    tables: ["users", "orders", "order_items"]
    cdc_mode: auto
    poll_interval_ms: 1000
    native_cdc:
      level: data

Configuration Options

FieldTypeRequiredDefaultDescription
idstringYesLogical identifier for metrics and logging
urlstringYesDatabase URL (libsql://, http://, or file path)
auth_tokenstringNoAuthentication token for Turso cloud
tablesarrayYesTables to track (supports wildcards)
cdc_modestringNoautoCDC mode: native, triggers, polling, auto
poll_interval_msintegerNo1000Polling interval in milliseconds
native_cdc.levelstringNodataNative CDC level: binlog or data

Table Patterns

The tables field supports patterns:

tables:
  - users              # Exact match
  - order%             # Prefix match (order, orders, order_items)
  - "*"                # All tables (excluding system tables)

System tables and DeltaForge infrastructure tables are automatically excluded:

  • sqlite_* — SQLite system tables
  • _df_* — DeltaForge CDC shadow tables
  • _litestream* — Litestream replication tables
  • _turso* — Turso internal tables
  • turso_cdc — Turso CDC system table

Native CDC Levels

When using native mode, you can choose the CDC level:

LevelDescription
dataOnly row data changes (default, more efficient)
binlogFull binlog-style events with additional metadata

Examples

Local Development

source:
  type: turso
  config:
    id: local-dev
    url: "http://127.0.0.1:8080"
    tables: ["users", "orders"]
    cdc_mode: auto
    poll_interval_ms: 500

Turso Cloud

source:
  type: turso
  config:
    id: turso-prod
    url: "libsql://mydb-myorg.turso.io"
    auth_token: "${TURSO_AUTH_TOKEN}"
    tables: ["*"]
    cdc_mode: native
    poll_interval_ms: 1000

SQLite File (Polling Only)

source:
  type: turso
  config:
    id: sqlite-file
    url: "file:./data/myapp.db"
    tables: ["events", "audit_log"]
    cdc_mode: polling
    poll_interval_ms: 2000

Checkpoints

Turso checkpoints track position differently depending on the CDC mode:

{
  "mode": "native",
  "last_change_id": 12345,
  "table_positions": {}
}

For polling mode, positions are tracked per-table:

{
  "mode": "polling",
  "last_change_id": null,
  "table_positions": {
    "users": 1000,
    "orders": 2500
  }
}

Schema Loading

The Turso source includes a schema loader that:

  • Queries PRAGMA table_info() for column metadata
  • Detects SQLite type affinities (INTEGER, TEXT, REAL, BLOB, NUMERIC)
  • Identifies primary keys and autoincrement columns
  • Handles WITHOUT ROWID tables
  • Checks for existing CDC triggers

Schema information is available via the REST API at /pipelines/{name}/schemas.

Notes

  • WITHOUT ROWID tables: Polling mode cannot track WITHOUT ROWID tables. Use triggers or native mode instead.
  • Type affinity: SQLite uses type affinity rather than strict types. The schema loader maps declared types to SQLite affinities.
  • Trigger cleanup: In triggers mode, processed change records are cleaned up automatically based on checkpoint position.
  • Connection handling: The source maintains a single connection and reconnects automatically on failure.

Processors

Processors can transform, modify or take extra action per event batch between the source and sinks. They run in the order listed in spec.processors, and each receives the output of the previous one. A processor can modify events, filter them out, or add routing metadata.

Available Processors

ProcessorDescription
javascriptCustom transformations using V8-powered JavaScript
outboxTransactional outbox pattern - extracts payload, resolves topic, sets routing headers
flattenFlatten nested JSON objects into conmbined keys

JavaScript

Run arbitrary JavaScript against each event batch. Uses the V8 engine via deno_core for near-native speed with configurable resource limits.

processors:
  - type: javascript
    id: enrich
    inline: |
      function processBatch(events) {
        return events.map(e => {
          e.tags = ["processed"];
          return e;
        });
      }
    limits:
      cpu_ms: 50
      mem_mb: 128
      timeout_ms: 500
FieldTypeDefaultDescription
idstring(required)Processor identifier
inlinestring(required)JavaScript source code
limits.cpu_msint50CPU time limit per batch
limits.mem_mbint128V8 heap memory limit
limits.timeout_msint500Wall-clock timeout per batch

Performance

JavaScript processing adds a fixed overhead per batch plus linear scaling per event. Benchmarks show ~70K+ events/sec throughput with typical transforms — roughly 61× slower than native Rust processors, but sufficient for most workloads. If you need higher throughput, keep transform logic simple or move heavy processing downstream.

Tips

  • Return an empty array to drop all events in a batch.
  • Return multiple copies of an event to fan out.
  • JavaScript numbers are f64 — integer columns wider than 53 bits may lose precision. Use string representation for such values.

Outbox

The outbox processor transforms events captured by the outbox pattern into routed, sink-ready events. It extracts business fields from the raw outbox payload, resolves the destination topic, and passes through all non-outbox events unchanged.

processors:
  - type: outbox
    topic: "${aggregate_type}.${event_type}"
    default_topic: "events.unrouted"
FieldTypeDefaultDescription
idstring"outbox"Processor identifier
tablesarray[]Only process outbox events matching these patterns. Empty = all.
topicstringTopic template with ${field} placeholders (resolved against raw payload columns)
default_topicstringFallback when template resolution fails
columnsobject(defaults below)Field name mappings
additional_headersmap{}Forward extra payload fields as routing headers. Key = header name, value = column name.
raw_payloadboolfalseDeliver payload as-is, bypassing envelope wrapping

Column Defaults

KeyDefaultDescription
payload"payload"Field containing the event body
aggregate_type"aggregate_type"Domain aggregate type
aggregate_id"aggregate_id"Aggregate identifier
event_type"event_type"Domain event type
topic"topic"Optional explicit topic override in payload

See the Outbox Pattern guide for source configuration, complete examples, and multi-outbox routing.

Flatten

Flattens nested JSON objects in event payloads into top-level keys joined by a configurable separator. Works on every object-valued field present on the event without assuming any particular envelope structure - CDC before/after, outbox business payloads, or any custom fields introduced by upstream processors are all handled uniformly.

processors:
  - type: flatten
    id: flat
    separator: "__"
    max_depth: 3
    on_collision: last
    empty_object: preserve
    lists: preserve
    empty_list: drop

Input:

{
  "after": {
    "order_id": "abc",
    "customer": {
      "id": 1,
      "address": { "city": "Berlin", "zip": "10115" }
    },
    "tags": ["vip"],
    "meta": {}
  }
}

Output (with defaults — separator: "__", empty_object: preserve, lists: preserve):

{
  "after": {
    "order_id": "abc",
    "customer__id": 1,
    "customer__address__city": "Berlin",
    "customer__address__zip": "10115",
    "tags": ["vip"],
    "meta": {}
  }
}

Configuration

FieldTypeDefaultDescription
idstring"flatten"Processor identifier
separatorstring"__"Separator inserted between path segments
max_depthintunlimitedStop recursing at this depth; objects at the boundary are kept as opaque leaves
on_collisionstringlastWhat to do when two paths produce the same key. last, first, or error
empty_objectstringpreserveHow to handle {} values. preserve, drop, or null
listsstringpreserveHow to handle array values. preserve (keep as-is) or index (expand to field__0, field__1, …)
empty_liststringpreserveHow to handle [] values. preserve, drop, or null

max_depth in practice

max_depth counts nesting levels from the top of the payload object. Objects at the boundary are treated as opaque leaves rather than expanded further, still subject to empty_object policy.

# max_depth: 2

depth 0: customer               -> object, recurse
depth 1: customer__address      -> object, recurse
depth 2: customer__address__geo -> STOP, kept as leaf {"lat": 52.5, "lng": 13.4}

Without max_depth, a deeply nested or recursive payload could produce a large number of keys. Setting a limit is recommended for payloads with variable or unknown nesting depth.

Collision policy

A collision occurs when two input paths produce the same flattened key - typically when a payload already contains a pre-flattened key (e.g. "a__b": 1) alongside a nested object ("a": {"b": 2}).

  • last - the last path to write a key wins (default, never fails)
  • first - the first path to write a key wins, subsequent writes are ignored
  • error - the batch fails immediately, useful in strict pipelines where collisions indicate a schema problem

Working with outbox payloads

After the outbox processor runs, event.after holds the extracted business payload - there is no before. The flatten processor handles this naturally since it operates on whatever fields are present:

processors:
  - type: outbox
    topic: "${aggregate_type}.${event_type}"
  - type: flatten
    id: flat
    separator: "."
    empty_list: drop

Envelope interaction

The flatten processor runs on the raw Event struct before sink delivery. Envelope wrapping happens inside the sink, after all processors have run. This means the envelope always wraps already-flattened data, no special configuration needed.

Source → [flatten processor] → Sink (envelope → bytes)

All envelope formats work as expected:

// Native
{ "before": null, "after": { "customer__id": 1, "customer__address__city": "Berlin" }, "op": "c" }

// Debezium
{ "payload": { "before": null, "after": { "customer__id": 1, "customer__address__city": "Berlin" }, "op": "c" } }

// CloudEvents
{ "specversion": "1.0", ..., "data": { "before": null, "after": { "customer__id": 1, "customer__address__city": "Berlin" } } }

Outbox + raw_payload: true

When the outbox processor is configured with raw_payload: true, the sink delivers event.after directly, bypassing the envelope entirely. If the flatten processor runs after outbox, the raw payload delivered to the sink is the flattened object — which is the intended behavior for analytics sinks that can’t handle nested JSON.

processors:
  - type: outbox
    topic: "${aggregate_type}.${event_type}"
    raw_payload: true       # sink delivers event.after directly, no envelope
  - type: flatten
    id: flat
    separator: "__"
    empty_list: drop

The flatten processor runs second, so by the time the sink delivers the raw payload it is already flat.

Analytics sink example

When sending to column-oriented sinks (ClickHouse, BigQuery, S3 Parquet) that don’t handle nested JSON:

processors:
  - type: flatten
    id: flat
    separator: "__"
    lists: index          # expand arrays to indexed columns
    empty_object: drop    # remove sparse marker objects
    empty_list: drop      # remove empty arrays

Processor Chain

Processors execute in order. Events flow through each processor sequentially:

Source → [Processor 1] → [Processor 2] → ... → Sinks

Each processor receives a Vec<Event> and returns a Vec<Event>. This means processors can:

  • Transform: Modify event fields in place
  • Filter: Return a subset of events (drop unwanted ones)
  • Fan-out: Return more events than received
  • Route: Set event.routing.topic to override sink defaults
  • Enrich: Add headers via event.routing.headers

Non-outbox events always pass through the outbox processor untouched, so it is safe to combine it with JavaScript processors in any order.

Adding Custom Processors

The processor interface is a simple trait:

#![allow(unused)]
fn main() {
#[async_trait]
pub trait Processor: Send + Sync {
    fn id(&self) -> &str;
    async fn process(&self, events: Vec<Event>) -> Result<Vec<Event>>;
}
}

To add a new processor:

  1. Implement the Processor trait in crates/processors
  2. Add a config variant to ProcessorCfg in deltaforge-config
  3. Register the build step in build_processors()

Sinks

Sinks receive batches from the coordinator after processors run. Each sink lives under spec.sinks and can be marked as required or best-effort via the required flag. Checkpoint behavior is governed by the pipeline’s commit policy.

Envelope and Encoding

All sinks support configurable envelope formats and wire encodings. See the Envelopes and Encodings page for detailed documentation.

OptionValuesDefaultDescription
envelopenative, debezium, cloudeventsnativeOutput JSON structure
encodingjsonjsonWire format

Quick example:

sinks:
  - type: kafka
    config:
      id: events-kafka
      brokers: localhost:9092
      topic: events
      envelope:
        type: cloudevents
        type_prefix: "com.example.cdc"
      encoding: json

Available Sinks

SinkDescription
kafkaKafka producer sink
natsNATS JetStream sink
redisRedis stream sink

Multiple sinks in one pipeline

You can combine multiple sinks in one pipeline to fan out events to different destinations. However, multi-sink pipelines introduce complexity that requires careful consideration.

Why multiple sinks are challenging

Different performance characteristics: Kafka might handle 100K events/sec while a downstream HTTP webhook processes 100/sec. The slowest sink becomes the bottleneck for the entire pipeline.

Independent failure modes: Each sink can fail independently. Redis might be healthy while Kafka experiences broker failures. Without proper handling, a single sink failure could block the entire pipeline or cause data loss.

No distributed transactions: DeltaForge cannot atomically commit across heterogeneous systems. If Kafka succeeds but Redis fails mid-batch, you face a choice: retry Redis (risking duplicates in Kafka) or skip Redis (losing data there).

Checkpoint semantics: The checkpoint represents “how far we’ve processed from the source.” With multiple sinks, when is it safe to advance? After one sink succeeds? All of them? A majority?

Read the required and commit_policy sections below for options to manage these challenges.

The required flag

The required flag on each sink determines whether that sink must acknowledge successful delivery before the checkpoint advances:

sinks:
  - type: kafka
    config:
      id: primary-kafka
      required: true    # Must succeed for checkpoint to advance
      
  - type: redis
    config:
      id: cache-redis
      required: false   # Best-effort; failures don't block checkpoint

When required: true (default): The sink must acknowledge the batch before the checkpoint can advance. If this sink fails, the pipeline blocks and retries until it succeeds or the operator intervenes.

When required: false: The sink is best-effort. Failures are logged but don’t prevent the checkpoint from advancing. Use this for non-critical destinations where some data loss is acceptable.

Commit policy

The commit_policy works with the required flag to determine checkpoint behavior:

PolicyBehavior
allEvery sink (regardless of required flag) must acknowledge
requiredOnly sinks with required: true must acknowledge (default)
quorumAt least N sinks must acknowledge
commit_policy:
  mode: required   # Only wait for required sinks

sinks:
  - type: kafka
    config:
      required: true   # Checkpoint waits for this
  - type: redis  
    config:
      required: false  # Checkpoint doesn't wait for this
  - type: nats
    config:
      required: true   # Checkpoint waits for this

Practical patterns

Primary + secondary: One critical sink (Kafka for durability) marked required: true, with secondary sinks (Redis for caching, testing or experimentation) marked required: false.

Quorum for redundancy: Three sinks with commit_policy.mode: quorum and quorum: 2. Checkpoint advances when any two succeed, providing fault tolerance.

All-or-nothing: Use commit_policy.mode: all when every destination is critical and you need the strongest consistency guarantee (but affecting rate of delivery).

Multi-format fan-out

For sending the same events to different consumers that expect different formats:

sinks:
  # Kafka Connect expects Debezium format
  - type: kafka
    config:
      id: connect-sink
      brokers: ${KAFKA_BROKERS}
      topic: connect-events
      envelope:
        type: debezium
      required: true

  # Lambda expects CloudEvents
  - type: kafka
    config:
      id: lambda-sink
      brokers: ${KAFKA_BROKERS}
      topic: lambda-events
      envelope:
        type: cloudevents
        type_prefix: "com.acme.cdc"
      required: false

  # Analytics wants raw events
  - type: redis
    config:
      id: analytics-redis
      uri: ${REDIS_URI}
      stream: analytics
      envelope:
        type: native
      required: false

This allows each consumer to receive events in their preferred format without post-processing.

Redis

Redis sink

The Redis sink publishes events to a Redis Stream for real-time consumption with consumer groups.

When to use Redis

Redis Streams shine when you need low-latency event delivery with simple operational requirements and built-in consumer group support.

Real-world applications

Use CaseDescription
Real-time notificationsPush database changes instantly to WebSocket servers for live UI updates
Cache invalidationTrigger cache eviction when source records change; keep Redis cache consistent
Session synchronizationReplicate user session changes across application instances in real-time
Rate limiting stateStream counter updates for distributed rate limiting decisions
Live dashboardsFeed real-time metrics and KPIs to dashboard backends
Job queuingUse CDC events to trigger background job processing with consumer groups
Feature flagsPropagate feature flag changes instantly across all application instances

Pros and cons

ProsCons
Ultra-low latency - Sub-millisecond publish; ideal for real-time appsMemory-bound - All data in RAM; expensive for high-volume retention
Simple operations - Single binary, minimal configurationLimited retention - Not designed for long-term event storage
Consumer groups - Built-in competing consumers with acknowledgementsDurability trade-offs - AOF/RDB persistence has limitations
Familiar tooling - redis-cli, widespread client library supportSingle-threaded - CPU-bound for very high throughput
Versatile - Combine with caching, pub/sub, and data structuresNo native replay - XRANGE exists but no offset management
Atomic operations - MULTI/EXEC for transactional guaranteesCluster complexity - Sharding requires careful key design

Configuration

sinks:
  - type: redis
    config:
      id: orders-redis
      uri: ${REDIS_URI}
      stream: orders.events
      required: true
FieldTypeDefaultDescription
idstringSink identifier
uristringRedis connection URI
streamstringRedis stream key
requiredbooltrueGates checkpoints

Consuming events

# Create consumer group
redis-cli XGROUP CREATE orders.events mygroup $ MKSTREAM

# Read as consumer (blocking)
redis-cli XREADGROUP GROUP mygroup consumer1 BLOCK 5000 COUNT 10 STREAMS orders.events >

# Acknowledge processing
redis-cli XACK orders.events mygroup 1234567890123-0

# Check pending (unacknowledged) messages
redis-cli XPENDING orders.events mygroup

Simple subscription

# Read latest entries
redis-cli XREAD COUNT 10 STREAMS orders.events 0-0

# Block for new entries
redis-cli XREAD BLOCK 5000 STREAMS orders.events $

Go consumer example

import "github.com/redis/go-redis/v9"

rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})

// Create consumer group (once)
rdb.XGroupCreateMkStream(ctx, "orders.events", "mygroup", "0")

for {
    streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
        Group:    "mygroup",
        Consumer: "worker1",
        Streams:  []string{"orders.events", ">"},
        Count:    10,
        Block:    5 * time.Second,
    }).Result()
    
    if err != nil {
        continue
    }
    
    for _, stream := range streams {
        for _, msg := range stream.Messages {
            var event Event
            json.Unmarshal([]byte(msg.Values["event"].(string)), &event)
            process(event)
            rdb.XAck(ctx, "orders.events", "mygroup", msg.ID)
        }
    }
}

Rust consumer example

#![allow(unused)]
fn main() {
use redis::AsyncCommands;

let client = redis::Client::open("redis://localhost:6379")?;
let mut con = client.get_async_connection().await?;

// Create consumer group
let _: () = redis::cmd("XGROUP")
    .arg("CREATE").arg("orders.events").arg("mygroup").arg("0").arg("MKSTREAM")
    .query_async(&mut con).await.unwrap_or(());

loop {
    let results: Vec<StreamReadReply> = con.xread_options(
        &["orders.events"],
        &[">"],
        &StreamReadOptions::default()
            .group("mygroup", "worker1")
            .count(10)
            .block(5000)
    ).await?;
    
    for stream in results {
        for msg in stream.ids {
            let event: Event = serde_json::from_str(&msg.map["event"])?;
            process(event);
            con.xack("orders.events", "mygroup", &[&msg.id]).await?;
        }
    }
}
}

Python consumer example

import redis
import json

r = redis.Redis.from_url("redis://localhost:6379")

# Create consumer group (once)
try:
    r.xgroup_create("orders.events", "mygroup", id="0", mkstream=True)
except redis.ResponseError:
    pass  # Group already exists

# Consume events
while True:
    events = r.xreadgroup("mygroup", "worker1", {"orders.events": ">"}, count=10, block=5000)
    for stream, messages in events:
        for msg_id, data in messages:
            event = json.loads(data[b"event"])
            process(event)
            r.xack("orders.events", "mygroup", msg_id)

Failure modes

FailureSymptomsDeltaForge behaviorResolution
Server unavailableConnection refusedRetries with backoff; blocks checkpointRestore Redis; check network
Authentication failureNOAUTH / WRONGPASSFails fast, no retryFix auth details in URI
OOM (Out of Memory)OOM command not allowedFails batch; retriesIncrease maxmemory; enable eviction or trim streams
Stream doesn’t existAuto-created by XADDNo failureN/A (XADD creates stream)
Connection timeoutCommand hangsTimeout after configured durationCheck network; increase timeout
Cluster MOVED/ASKRedirect errorsAutomatic redirect (if cluster mode)Ensure cluster client configured
Replication lagWrites to replica failFails with READONLYWrite to master only
Max stream lengthIf MAXLEN enforcedOldest entries trimmedExpected behavior; not a failure
Network partitionIntermittent timeoutsRetries; may have gapsRestore network

Failure scenarios and data guarantees

Redis OOM during batch delivery

  1. DeltaForge sends batch of 100 events via pipeline
  2. 50 events written, Redis hits maxmemory
  3. Pipeline fails atomically (all or nothing per pipeline)
  4. DeltaForge retries entire batch
  5. If OOM persists: batch blocked until memory available
  6. Checkpoint only saved after ALL events acknowledged

DeltaForge crash after XADD, before checkpoint

  1. Batch written to Redis stream successfully
  2. DeltaForge crashes before saving checkpoint
  3. On restart: replays from last checkpoint
  4. Result: Duplicate events in stream (at-least-once)
  5. Consumer must handle idempotently (check event.id)

Redis failover (Sentinel/Cluster)

  1. Master fails, Sentinel promotes replica
  2. In-flight XADD may fail with connection error
  3. DeltaForge reconnects to new master
  4. Retries failed batch
  5. Possible duplicates if original write succeeded

Handling duplicates in consumers

# Idempotent consumer using event ID
processed_ids = set()  # Or use Redis SET for distributed dedup

for msg_id, data in messages:
    event = json.loads(data[b"event"])
    event_id = event["id"]
    
    if event_id in processed_ids:
        r.xack("orders.events", "mygroup", msg_id)
        continue  # Skip duplicate
    
    process(event)
    processed_ids.add(event_id)
    r.xack("orders.events", "mygroup", msg_id)

Monitoring

DeltaForge exposes these metrics for Redis sink monitoring:

# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink}      # Events delivered
deltaforge_sink_batch_total{pipeline,sink}       # Batches delivered  
deltaforge_sink_latency_seconds{pipeline,sink}   # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"}  # Stage timing

For Redis server visibility, use Redis’s built-in monitoring:

# Monitor commands in real-time
redis-cli MONITOR

# Get server stats
redis-cli INFO stats

# Check memory usage
redis-cli INFO memory

# Stream-specific info
redis-cli XINFO STREAM orders.events
redis-cli XINFO GROUPS orders.events

Stream management

# Check stream length
redis-cli XLEN orders.events

# Trim to last 10000 entries (approximate)
redis-cli XTRIM orders.events MAXLEN ~ 10000

# Trim to exact length
redis-cli XTRIM orders.events MAXLEN 10000

# View consumer group info
redis-cli XINFO GROUPS orders.events

# Check pending messages
redis-cli XPENDING orders.events mygroup

# Claim stuck messages (after 60 seconds)
redis-cli XCLAIM orders.events mygroup worker2 60000 <message-id>

# Delete processed messages (careful!)
redis-cli XDEL orders.events <message-id>

Notes

  • Redis Streams provide at-least-once delivery with consumer group acknowledgements
  • Use MAXLEN ~ trimming to prevent unbounded memory growth (approximate is faster)
  • Consider Redis Cluster for horizontal scaling with multiple streams
  • Combine with Redis pub/sub for fan-out to ephemeral subscribers
  • For durability, enable AOF persistence with appendfsync everysec or always
  • Monitor memory usage closely; Redis will reject writes when maxmemory is reached

Kafka

Kafka sink

The Kafka sink publishes batches to a Kafka topic using rdkafka.

When to use Kafka

Kafka excels as the backbone for event-driven architectures where durability, ordering, and replay capabilities are critical.

Real-world applications

Use CaseDescription
Event sourcingStore all state changes as an immutable log; rebuild application state by replaying events
Microservices integrationDecouple services with async messaging; each service consumes relevant topics
Real-time analytics pipelinesFeed CDC events to Spark, Flink, or ksqlDB for streaming transformations
Data lake ingestionStream database changes to S3/HDFS via Kafka Connect for analytics and ML
Audit loggingCapture every database mutation for compliance, debugging, and forensics
Cross-datacenter replicationUse MirrorMaker 2 to replicate topics across regions for DR

Pros and cons

ProsCons
Durability - Configurable replication ensures no data lossOperational complexity - Requires ZooKeeper/KRaft, careful tuning
Ordering guarantees - Per-partition ordering with consumer groupsLatency - Batching and replication add milliseconds of delay
Replay capability - Configurable retention allows reprocessingResource intensive - High disk I/O and memory requirements
Ecosystem - Connect, Streams, Schema Registry, ksqlDBLearning curve - Partitioning, offsets, consumer groups to master
Throughput - Handles millions of messages per secondCold start - Cluster setup and topic configuration overhead
Exactly-once semantics - Transactions for critical workloadsCost - Managed services can be expensive at scale

Configuration

sinks:
  - type: kafka
    config:
      id: orders-kafka
      brokers: ${KAFKA_BROKERS}
      topic: orders
      required: true
      exactly_once: false
      client_conf:
        message.timeout.ms: "5000"
        acks: "all"
FieldTypeDefaultDescription
idstringSink identifier
brokersstringComma-separated broker list
topicstringDestination topic
requiredbooltrueGates checkpoints
exactly_onceboolfalseEnable EOS semantics
client_confmap{}librdkafka overrides
SettingRecommendedDescription
acksallWait for all replicas for durability
message.timeout.ms30000Total time to deliver a message
retries2147483647Retry indefinitely (with backoff)
enable.idempotencetruePrevent duplicates on retry
compression.typelz4Balance between CPU and bandwidth

Consuming events

Kafka CLI

# Consume from beginning
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --from-beginning

# Consume with consumer group
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --group deltaforge-consumers

Go consumer example

config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest

group, _ := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)

for {
    err := group.Consume(ctx, []string{"orders"}, &handler{})
    if err != nil {
        log.Printf("Consumer error: %v", err)
    }
}

type handler struct{}

func (h *handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        var event Event
        json.Unmarshal(msg.Value, &event)
        process(event)
        session.MarkMessage(msg, "")
    }
    return nil
}

Rust consumer example

#![allow(unused)]
fn main() {
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;

let consumer: StreamConsumer = ClientConfig::new()
    .set("bootstrap.servers", "localhost:9092")
    .set("group.id", "my-group")
    .set("auto.offset.reset", "earliest")
    .create()?;

consumer.subscribe(&["orders"])?;

loop {
    match consumer.recv().await {
        Ok(msg) => {
            let payload = msg.payload_view::<str>().unwrap()?;
            let event: Event = serde_json::from_str(payload)?;
            process(event);
            consumer.commit_message(&msg, CommitMode::Async)?;
        }
        Err(e) => eprintln!("Kafka error: {}", e),
    }
}
}

Python consumer example

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    event = message.value
    process(event)
    consumer.commit()

Failure modes

FailureSymptomsDeltaForge behaviorResolution
Broker unavailableConnection refused, timeoutRetries with backoff; blocks checkpointRestore broker; check network
Topic not foundUnknownTopicOrPartitionFails batch; retriesCreate topic or enable auto-create
Authentication failureSaslAuthenticationFailedFails fast, no retryFix credentials in config
Authorization failureTopicAuthorizationFailedFails fast, no retryGrant ACLs for producer
Message too largeMessageSizeTooLargeFails message permanentlyIncrease message.max.bytes or filter large events
Leader electionNotLeaderForPartitionAutomatic retry after metadata refreshWait for election; usually transient
Disk fullKafkaStorageExceptionRetries indefinitelyAdd disk space; purge old segments
Network partitionTimeouts, partial failuresRetries; may produce duplicatesRestore network; idempotence prevents dups
SSL/TLS errorsHandshake failuresFails fastFix certificates, verify truststore

Failure scenarios and data guarantees

Broker failure during batch delivery

  1. DeltaForge sends batch of 100 events
  2. 50 events delivered, broker crashes
  3. rdkafka detects failure, retries remaining 50
  4. If idempotence enabled: no duplicates
  5. If not: possible duplicates of events near failure point
  6. Checkpoint only saved after ALL events acknowledged

DeltaForge crash after Kafka ack, before checkpoint

  1. Batch delivered to Kafka successfully
  2. DeltaForge crashes before saving checkpoint
  3. On restart: replays from last checkpoint
  4. Result: Duplicate events in Kafka (at-least-once)
  5. Consumer must handle idempotently

Monitoring recommendations

DeltaForge exposes these metrics for Kafka sink monitoring:

# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink}      # Events delivered
deltaforge_sink_batch_total{pipeline,sink}       # Batches delivered
deltaforge_sink_latency_seconds{pipeline,sink}   # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"}  # Stage timing

For deeper Kafka broker visibility, monitor your Kafka cluster directly:

  • Broker metrics via JMX or Kafka’s built-in metrics
  • Consumer lag via kafka-consumer-groups.sh
  • Topic throughput via broker dashboards

Note: Internal rdkafka producer statistics (message queues, broker RTT, etc.) are not currently exposed by DeltaForge. This is a potential future enhancement.

Notes

  • Combine Kafka with other sinks to fan out data; use commit policy to control checkpoint behavior
  • For exactly-once semantics, ensure your Kafka cluster supports transactions (2.5+)
  • Adjust client_conf for durability (acks=all) or performance based on your requirements
  • Consider partitioning strategy for ordering guarantees within partitions
  • Enable enable.idempotence=true to prevent duplicates during retries

NATS

NATS sink

The NATS sink publishes events to a NATS JetStream stream for durable, at-least-once delivery.

When to use NATS

NATS JetStream is ideal when you need a lightweight, high-performance messaging system with persistence, without the operational overhead of Kafka.

Real-world applications

Use CaseDescription
Edge computingLightweight footprint perfect for IoT gateways and edge nodes syncing to cloud
Microservices meshRequest-reply and pub/sub patterns with automatic load balancing
Multi-cloud syncLeaf nodes and superclusters for seamless cross-cloud data replication
Kubernetes-native eventsNATS Operator for cloud-native deployment; sidecar-friendly architecture
Real-time gamingLow-latency state synchronization for multiplayer game servers
Financial data feedsStream market data with subject-based routing and wildcards
Command and controlDistribute configuration changes and commands to distributed systems

Pros and cons

ProsCons
Lightweight - Single binary ~20MB; minimal resource footprintSmaller ecosystem - Fewer connectors and integrations than Kafka
Simple operations - Zero external dependencies; easy clusteringYounger persistence - JetStream newer than Kafka’s battle-tested log
Low latency - Sub-millisecond message deliveryCommunity size - Smaller community than Kafka or Redis
Flexible patterns - Pub/sub, queues, request-reply, streamsTooling maturity - Fewer monitoring and management tools
Subject hierarchy - Powerful wildcard routing (orders.>, *.events)Learning curve - JetStream concepts differ from traditional queues
Multi-tenancy - Built-in accounts and security isolationLess enterprise adoption - Fewer case studies at massive scale
Cloud-native - Designed for Kubernetes and distributed systems

Configuration

sinks:
  - type: nats
    config:
      id: orders-nats
      url: ${NATS_URL}
      subject: orders.events
      stream: ORDERS
      required: true
      send_timeout_secs: 5
      batch_timeout_secs: 30
FieldTypeDefaultDescription
idstringSink identifier
urlstringNATS server URL
subjectstringSubject to publish to
streamstringJetStream stream name
requiredbooltrueGates checkpoints
send_timeout_secsint5Publish timeout
batch_timeout_secsint30Batch timeout
connect_timeout_secsint10Connection timeout

Authentication options

# Credentials file
credentials_file: /etc/nats/creds/user.creds

# Username/password
username: ${NATS_USER}
password: ${NATS_PASSWORD}

# Token
token: ${NATS_TOKEN}

JetStream setup

Before using the NATS sink with JetStream, create a stream that captures your subject:

# Using NATS CLI
nats stream add ORDERS \
  --subjects "orders.>" \
  --retention limits \
  --storage file \
  --replicas 3 \
  --max-age 7d

# Verify stream
nats stream info ORDERS

Consuming events

NATS CLI

# Subscribe to subject (ephemeral)
nats sub "orders.>"

# Create durable consumer
nats consumer add ORDERS orders-processor \
  --pull \
  --ack explicit \
  --deliver all \
  --max-deliver 3 \
  --filter "orders.events"

# Consume messages
nats consumer next ORDERS orders-processor --count 10

Go consumer example

nc, _ := nats.Connect("nats://localhost:4222")
js, _ := nc.JetStream()

// Create or bind to consumer
sub, _ := js.PullSubscribe("orders.events", "orders-processor",
    nats.Durable("orders-processor"),
    nats.AckExplicit(),
)

for {
    msgs, _ := sub.Fetch(10, nats.MaxWait(5*time.Second))
    for _, msg := range msgs {
        var event Event
        json.Unmarshal(msg.Data, &event)
        process(event)
        msg.Ack()
    }
}

Rust consumer example

#![allow(unused)]
fn main() {
use async_nats::jetstream;

let client = async_nats::connect("nats://localhost:4222").await?;
let js = jetstream::new(client);

let stream = js.get_stream("ORDERS").await?;
let consumer = stream.get_consumer("orders-processor").await?;

let mut messages = consumer.messages().await?;
while let Some(msg) = messages.next().await {
    let msg = msg?;
    let event: Event = serde_json::from_slice(&msg.payload)?;
    process(event);
    msg.ack().await?;
}
}

Monitoring

DeltaForge exposes these metrics for NATS sink monitoring:

# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink}      # Events delivered
deltaforge_sink_batch_total{pipeline,sink}       # Batches delivered
deltaforge_sink_latency_seconds{pipeline,sink}   # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"}  # Stage timing

For NATS server visibility, use the NATS CLI or monitoring endpoint:

# Server info
nats server info

# JetStream account info
nats account info

# Stream statistics
nats stream info ORDERS

# Consumer statistics  
nats consumer info ORDERS orders-processor

# Real-time event monitoring
nats events

NATS also exposes a monitoring endpoint (default :8222) with JSON stats:

  • http://localhost:8222/varz - General server stats
  • http://localhost:8222/jsz - JetStream stats
  • http://localhost:8222/connz - Connection stats

Subject design patterns

PatternExampleUse Case
Hierarchicalorders.us.createdRegional routing
Wildcard singleorders.*.createdAny region, specific event
Wildcard multiorders.>All order events
Versionedv1.orders.eventsAPI versioning

Failure modes

FailureSymptomsDeltaForge behaviorResolution
Server unavailableConnection refusedRetries with backoff; blocks checkpointRestore NATS; check network
Stream not foundstream not found errorFails batch; no retryCreate stream or remove stream config
Authentication failureauthorization violationFails fast, no retryFix credentials
Subject mismatchno responders (core NATS)Fails if no subscribersAdd subscribers or use JetStream
JetStream disabledjetstream not enabledFails fastEnable JetStream on server
Storage fullinsufficient resourcesRetries; eventually failsAdd storage; adjust retention
Message too largemessage size exceeds maximumFails message permanentlyIncrease max_payload or filter large events
Cluster partitionIntermittent failuresRetries with backoffRestore network; wait for quorum
Slow consumerPublish backpressureSlows down; may timeoutScale consumers; increase buffer
TLS errorsHandshake failuresFails fastFix certificates

Failure scenarios and data guarantees

NATS server restart during batch delivery

  1. DeltaForge sends batch of 100 events
  2. 50 events published, server restarts
  3. async_nats detects disconnect, starts reconnecting
  4. After reconnect, DeltaForge retries remaining 50
  5. JetStream deduplication prevents duplicates (if enabled)
  6. Checkpoint only saved after ALL events acknowledged

DeltaForge crash after JetStream ack, before checkpoint

  1. Batch published to JetStream successfully
  2. DeltaForge crashes before saving checkpoint
  3. On restart: replays from last checkpoint
  4. Result: Duplicate events in stream (at-least-once)
  5. Consumer must handle idempotently (check event.id)

Stream storage exhausted

  1. JetStream stream hits max_bytes or max_msgs limit
  2. With discard: old → oldest messages removed, publish succeeds
  3. With discard: new → publish rejected
  4. DeltaForge retries on rejection
  5. Resolution: Increase limits or enable discard: old

JetStream acknowledgement levels

# Stream configuration affects durability
nats stream add ORDERS \
  --replicas 3 \           # R=3 for production
  --retention limits \     # or 'workqueue' for single consumer
  --discard old \          # Remove oldest when full
  --max-age 7d \           # Auto-expire after 7 days
  --storage file           # Persistent (vs memory)
ReplicasGuaranteeUse Case
R=1Single node; lost if node failsDevelopment, non-critical
R=3Survives 1 node failureProduction default
R=5Survives 2 node failuresCritical data

Handling duplicates in consumers

// Use event ID for idempotency
processedIDs := make(map[string]bool)  // Or use Redis/DB

for _, msg := range msgs {
    var event Event
    json.Unmarshal(msg.Data, &event)
    
    if processedIDs[event.ID] {
        msg.Ack()  // Already processed
        continue
    }
    
    if err := process(event); err == nil {
        processedIDs[event.ID] = true
    }
    msg.Ack()
}

Notes

  • When stream is specified, the sink verifies the stream exists at connection time
  • Without stream, events are published to core NATS (no persistence guarantees)
  • Connection pooling ensures efficient reuse across batches
  • Use replicated streams (--replicas 3) for production durability
  • Combine with other sinks to fan out data; use commit policy to control checkpoint behavior
  • JetStream provides exactly-once semantics when combined with message deduplication

Envelopes and Encodings

DeltaForge supports configurable envelope formats and wire encodings for sink output. This allows you to match the output format expected by your downstream consumers without forcing code changes on them.

Overview

Every CDC event flows through two stages before being written to a sink:

Event -> Envelope (structure) -> Encoding (bytes) -> Sink
  • Envelope: Controls the JSON structure of the output (what fields exist, how they’re nested)
  • Encoding: Controls the wire format (JSON bytes, future: Avro, Protobuf)

Envelope Formats

Native (default)

The native envelope serializes events directly with minimal overhead. This is DeltaForge’s own format, optimized for efficiency and practical use cases.

Note

The native envelope format may evolve over time as we adapt to user needs and optimize for the lowest possible overhead. If you need a stable, standardized format, consider using debezium or cloudevents envelopes which follow their respective established specifications.

sinks:
  - type: kafka
    config:
      id: events-kafka
      brokers: localhost:9092
      topic: events
      envelope:
        type: native

Output:

{
  "before": null,
  "after": {"id": 1, "name": "Alice", "email": "alice@example.com"},
  "source": {
    "version": "0.1.0",
    "connector": "mysql",
    "name": "orders-db",
    "ts_ms": 1700000000000,
    "db": "shop",
    "table": "customers",
    "server_id": 1,
    "file": "mysql-bin.000003",
    "pos": 12345
  },
  "op": "c",
  "ts_ms": 1700000000000
}

When to use:

  • Maximum performance with lowest overhead
  • Custom consumers that parse the payload directly
  • When format stability is less important than efficiency
  • Internal systems where you control both producer and consumer

Debezium

The Debezium envelope wraps the event in a {"schema": null, "payload": ...} structure, following the Debezium event format specification.

This uses schemaless mode (schema: null), which is equivalent to Debezium’s JsonConverter with schemas.enable=false. This is the recommended configuration for most production deployments as it avoids the overhead of inline schemas.

sinks:
  - type: kafka
    config:
      id: events-kafka
      brokers: localhost:9092
      topic: events
      envelope:
        type: debezium

Output:

{
  "schema": null,
  "payload": {
    "before": null,
    "after": {"id": 1, "name": "Alice", "email": "alice@example.com"},
    "source": {
      "version": "0.1.0",
      "connector": "mysql",
      "name": "orders-db",
      "ts_ms": 1700000000000,
      "db": "shop",
      "table": "customers"
    },
    "op": "c",
    "ts_ms": 1700000000000
  }
}

When to use:

  • Kafka Connect consumers expecting full Debezium format
  • Existing Debezium-based pipelines you’re migrating from
  • Tools that specifically parse the payload wrapper
  • When you need a stable, well-documented format with broad ecosystem support

Note: For Schema Registry integration with Avro encoding (planned), schema handling will move to the encoding layer where schema IDs are embedded in the wire format.

CloudEvents

The CloudEvents envelope restructures events to the CloudEvents 1.0 specification, a CNCF project that defines a vendor-neutral format for event data. This format strictly follows the CloudEvents spec and is guaranteed to remain compliant.

sinks:
  - type: kafka
    config:
      id: events-kafka
      brokers: localhost:9092
      topic: events
      envelope:
        type: cloudevents
        type_prefix: "com.example.cdc"

Output:

{
  "specversion": "1.0",
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "source": "deltaforge/orders-db/shop.customers",
  "type": "com.example.cdc.created",
  "time": "2024-01-15T10:30:00.000Z",
  "datacontenttype": "application/json",
  "subject": "shop.customers",
  "data": {
    "before": null,
    "after": {"id": 1, "name": "Alice", "email": "alice@example.com"},
    "op": "c"
  }
}

The type field is constructed from your type_prefix plus the operation:

  • com.example.cdc.created (INSERT)
  • com.example.cdc.updated (UPDATE)
  • com.example.cdc.deleted (DELETE)
  • com.example.cdc.snapshot (READ/snapshot)
  • com.example.cdc.truncated (TRUNCATE)

When to use:

  • AWS EventBridge, Azure Event Grid, or other CloudEvents-native platforms
  • Serverless architectures (Lambda, Cloud Functions)
  • Event-driven microservices using CloudEvents SDKs
  • Standardized event routing based on type field
  • When you need a vendor-neutral, CNCF-backed standard format

Wire Encodings

JSON (default)

Standard UTF-8 JSON encoding. Human-readable and widely supported.

sinks:
  - type: kafka
    config:
      id: events-kafka
      brokers: localhost:9092
      topic: events
      encoding: json

Content-Type: application/json

When to use:

  • Development and debugging
  • Consumers that expect JSON
  • When human readability matters
  • Most use cases (good default)

Future: Avro

Coming soon: Avro encoding with Schema Registry integration for compact binary serialization and schema evolution support.

# Future configuration (not yet implemented)
sinks:
  - type: kafka
    config:
      id: events-kafka
      brokers: localhost:9092
      topic: events
      encoding:
        type: avro
        schema_registry: http://schema-registry:8081

Configuration Examples

Kafka with CloudEvents

sinks:
  - type: kafka
    config:
      id: orders-kafka
      brokers: ${KAFKA_BROKERS}
      topic: order-events
      envelope:
        type: cloudevents
        type_prefix: "com.acme.orders"
      encoding: json
      required: true

Redis with Debezium envelope

sinks:
  - type: redis
    config:
      id: orders-redis
      uri: ${REDIS_URI}
      stream: orders
      envelope:
        type: debezium
      encoding: json

NATS with native envelope

sinks:
  - type: nats
    config:
      id: orders-nats
      url: ${NATS_URL}
      subject: orders.events
      stream: ORDERS
      envelope:
        type: native
      encoding: json

Multi-sink with different formats

Different consumers may expect different formats. Configure each sink independently:

sinks:
  # Kafka Connect expects Debezium format
  - type: kafka
    config:
      id: connect-sink
      brokers: ${KAFKA_BROKERS}
      topic: connect-events
      envelope:
        type: debezium
      required: true

  # Lambda expects CloudEvents
  - type: kafka
    config:
      id: lambda-sink
      brokers: ${KAFKA_BROKERS}
      topic: lambda-events
      envelope:
        type: cloudevents
        type_prefix: "com.acme.cdc"
      required: false

  # Analytics wants raw events
  - type: redis
    config:
      id: analytics-redis
      uri: ${REDIS_URI}
      stream: analytics
      envelope:
        type: native

Operation Mapping

DeltaForge uses Debezium-compatible operation codes:

OperationCodeDescription
Create/InsertcNew row inserted
UpdateuExisting row modified
DeletedRow deleted
ReadrSnapshot read (initial load)
TruncatetTable truncated

These codes appear in the op field regardless of envelope format.

Performance Considerations

EnvelopeOverheadFormat StabilityUse Case
NativeBaseline (minimal)May evolveHigh-throughput, internal systems
Debezium~14 bytesStable (follows Debezium spec)Kafka Connect, Debezium ecosystem
CloudEvents~150-200 bytesStable (follows CNCF spec)Serverless, event-driven architectures

The native envelope is recommended for maximum throughput when you control both ends of the pipeline. For interoperability with external systems or when format stability is critical, use debezium or cloudevents.

Defaults

If not specified, sinks use:

  • Envelope: native
  • Encoding: json

The native envelope provides the lowest overhead for high-throughput scenarios. If you need format stability guarantees, use debezium or cloudevents which adhere to their respective established specifications.

Dynamic Routing

Dynamic routing controls where each CDC event is delivered - which Kafka topic, Redis stream, or NATS subject receives it. By default, all events go to the single destination configured in the sink (static routing). With dynamic routing, events can be split across destinations based on their content or other attributes of events, pipeline and etc.

Overview

There are two routing mechanisms, and they compose naturally:

  1. Template strings in sink config - resolve per-event from event fields
  2. JavaScript ev.route() - programmatic per-event routing in processors

When both are used, ev.route() overrides take highest priority, then template resolution, then the static config value.

Template Routing

Replace static topic/stream/subject strings with templates containing ${...} variables. Templates are compiled once at startup and resolved per-event with zero regex overhead.

Kafka

sinks:
  - type: kafka
    config:
      id: kafka-routed
      brokers: ${KAFKA_BROKERS}
      topic: "cdc.${source.db}.${source.table}"
      key: "${after.customer_id}"
      envelope:
        type: debezium

Events from shop.orders -> topic cdc.shop.orders, partitioned by customer_id.

Redis

sinks:
  - type: redis
    config:
      id: redis-routed
      uri: ${REDIS_URI}
      stream: "events:${source.table}"
      key: "${after.id}"

Events from orders -> stream events:orders. The key value appears as the df-key field in each stream entry.

NATS

sinks:
  - type: nats
    config:
      id: nats-routed
      url: ${NATS_URL}
      subject: "cdc.${source.db}.${source.table}"
      key: "${after.id}"
      stream: CDC

Events from shop.orders -> subject cdc.shop.orders. The key value appears as the df-key NATS header.

Available Variables

VariableDescriptionExample value
${source.table}Table nameorders
${source.db}Database nameshop
${source.schema}Schema name (PostgreSQL)public
${source.connector}Source typemysql
${op}Operation codec, u, d, r, t
${after.<field>}Field from after image42, cust-abc
${before.<field>}Field from before imageold-value
${tenant_id}Pipeline tenant IDacme

Missing fields resolve to an empty string. A warning is logged once per unique template, not per event.

Static strings (no ${...}) are detected at parse time and have zero overhead on the hot path - no allocation, no resolution.

Env Vars vs Templates

Both use ${...} syntax. The config loader expands environment variables first. Unknown variables pass through as templates for runtime resolution:

brokers: ${KAFKA_BROKERS}      # env var - expanded at load time
topic: "cdc.${source.table}"   # template - passed through to runtime
key: "${after.customer_id}"    # template - resolved per-event

JavaScript Routing

For routing logic that goes beyond field substitution, use ev.route() in a JavaScript processor. This lets you make conditional routing decisions based on event content.

processors:
  - type: javascript
    id: smart-router
    inline: |
      function processBatch(events) {
        for (const ev of events) {
          if (!ev.after) continue;

          if (ev.after.total_amount > 10000) {
            ev.route({
              topic: "orders.priority",
              key: String(ev.after.customer_id),
              headers: {
                "x-tier": "high-value",
                "x-amount": String(ev.after.total_amount)
              }
            });
          } else {
            ev.route({
              topic: "orders.standard",
              key: String(ev.after.customer_id)
            });
          }
        }
        return events;
      }

ev.route() fields

FieldTypeDescription
topicstringOverride destination (topic, stream, or subject)
keystringOverride message/partition key
headersobjectKey-value pairs added to the message

All fields are optional. Only set fields override; omitted fields fall through to config templates or static values.

Calling ev.route() replaces any previous routing on that event - it does not merge.

How headers are delivered

SinkKey deliveryHeader delivery
KafkaKafka message keyKafka message headers
Redisdf-key field in stream entrydf-headers field (JSON)
NATSdf-key NATS headerIndividual NATS headers

Resolution Order

For each event, the destination is resolved in priority order:

ev.route() override  →  config template  →  static config value

Specifically:

  1. If the event has routing.topic set (via ev.route() or programmatically), use it
  2. If the sink config contains a template (has ${...}), resolve it from event fields
  3. Otherwise, use the static config string

The same order applies independently to key and headers.

Examples

See the complete example configurations:

Outbox Pattern

The transactional outbox pattern guarantees that domain events are published whenever the corresponding database change is committed - no two-phase commit required. DeltaForge supports this natively for both MySQL and PostgreSQL with zero application-side polling.

How It Works

┌─────────────────┐     ┌──────────────┐     ┌──────────────────┐     ┌──────┐
│  Application    │────>│   Database   │────>│   DeltaForge     │────>│ Sink │
│  (writes data   │     │  (outbox     │     │  (captures +     │     │      │
│   + outbox msg) │     │   table/WAL) │     │   transforms)    │     │      │
└─────────────────┘     └──────────────┘     └──────────────────┘     └──────┘
  1. Your application writes business data and an outbox message in the same transaction.
  2. DeltaForge captures the outbox event through the database’s native replication stream.
  3. The OutboxProcessor extracts the payload, resolves the destination topic, and sets routing headers.
  4. The transformed event flows to sinks like any other CDC event.

Because the outbox write is part of the application transaction, the event is guaranteed to be published if and only if the transaction commits.

Source Configuration

Each database uses its native mechanism — there is nothing to install or poll.

PostgreSQL — WAL Messages

PostgreSQL uses pg_logical_emit_message() to write messages directly into the WAL. No table is needed.

-- In your application transaction:
BEGIN;
INSERT INTO orders (id, total) VALUES (42, 99.99);
SELECT pg_logical_emit_message(
  true,     -- transactional: tied to the enclosing TX
  'outbox', -- prefix: matched by DeltaForge
  '{"aggregate_type":"Order","aggregate_id":"42","event_type":"OrderCreated","payload":{"total":99.99}}'
);
COMMIT;

Source config:

source:
  type: postgres
  config:
    id: orders-pg
    dsn: ${POSTGRES_DSN}
    slot: deltaforge_orders
    publication: orders_pub
    tables: [public.orders]
    outbox:
      prefixes: [outbox]
FieldTypeDescription
outbox.prefixesarrayWAL message prefixes to capture. Supports glob patterns: outbox (exact), outbox_% (prefix match), * (all).

The message prefix becomes source.table on the event, so processors can filter by prefix when multiple outbox channels share a pipeline.

MySQL — Table Inserts

MySQL uses a regular table. For production, use the BLACKHOLE storage engine so rows are written to the binlog but never stored on disk.

-- Create outbox table (BLACKHOLE = no disk storage, binlog only)
CREATE TABLE outbox (
  id        INT AUTO_INCREMENT PRIMARY KEY,
  aggregate_type VARCHAR(64),
  aggregate_id   VARCHAR(64),
  event_type     VARCHAR(64),
  payload        JSON
) ENGINE=BLACKHOLE;
-- In your application transaction:
BEGIN;
INSERT INTO orders (id, total) VALUES (42, 99.99);
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES ('Order', '42', 'OrderCreated', '{"total": 99.99}');
COMMIT;

Source config:

source:
  type: mysql
  config:
    id: orders-mysql
    dsn: ${MYSQL_DSN}
    tables:
      - shop.orders
      - shop.outbox
    outbox:
      tables: ["shop.outbox"]
FieldTypeDescription
outbox.tablesarrayTable patterns to tag as outbox events. Supports globs: shop.outbox (exact), *.outbox (any database), shop.outbox_% (prefix).

Note: The outbox table must be included in the source’s tables list so DeltaForge subscribes to its binlog events. Only INSERTs are captured - UPDATE and DELETE on the outbox table are ignored.

Outbox Processor

The OutboxProcessor transforms raw outbox events into routed, sink-ready events. Add it to your processors list:

processors:
  - type: outbox
    topic: "${aggregate_type}.${event_type}"
    default_topic: "events.unrouted"
FieldTypeDefaultDescription
idstring"outbox"Processor identifier
tablesarray[]Filter: only process outbox events matching these patterns. Empty = all outbox events.
topicstring-Topic template resolved against the raw payload using ${field} placeholders
default_topicstring-Fallback topic when template resolution fails and no topic column exists
columnsobject(see below)Column name mappings for extracting outbox fields
additional_headersmap{}Forward extra payload fields as routing headers. Key = header name, value = column name.
raw_payloadboolfalseWhen true, deliver the extracted payload as-is to sinks, bypassing envelope wrapping (native/debezium/cloudevents). Metadata is still available via routing headers.
keystring-Key template resolved against raw payload. Sets routing.key for sink partitioning. Default: aggregate_id value.
strictboolfalseWhen true, fail the batch if required fields are missing (topic, payload, aggregate_type, aggregate_id, event_type). When false, missing fields are silently skipped.

Column Mappings

Column mappings control header extraction and payload rewriting - they tell the processor which fields correspond to aggregate_type, aggregate_id, etc. for setting df-* headers. The topic template resolves directly against the raw payload, so you reference your actual column names there.

ColumnDefaultHeaderDescription
payload"payload"-Event body. Extracted and promoted to event.after.
aggregate_type"aggregate_type"df-aggregate-typeAggregate root type (e.g. Order).
aggregate_id"aggregate_id"df-aggregate-idAggregate root ID. Also used as default routing key.
event_type"event_type"df-event-typeDomain event type (e.g. OrderCreated).
topic"topic"-Per-row topic override (used when template is absent).
event_id"id"df-event-idEvent identity for idempotency/dedup.

If your outbox payload uses non-default field names, override them:

processors:
  - type: outbox
    topic: "${aggregate_type}.${event_type}"
    columns:
      payload: data           # default: "payload"
      aggregate_type: type    # default: "aggregate_type"
      aggregate_id: key       # default: "aggregate_id"
      event_type: action      # default: "event_type"
      topic: destination      # default: "topic"
      event_id: uuid          # default: "id"

Additional Headers

Forward arbitrary payload fields as routing headers. This is useful when migrating from Debezium’s table.fields.additional.placement or when downstream consumers need extra metadata:

processors:
  - type: outbox
    topic: "${aggregate_type}.${event_type}"
    additional_headers:
      x-trace-id: trace_id
      x-correlation-id: correlation_id
      x-source-region: region

Each key becomes a header name, each value is the column name in the outbox payload. Missing columns are silently skipped - no error if a row doesn’t contain the field.

Typed Extraction

Header values are extracted as strings regardless of the source JSON type. Numeric IDs, booleans, and string values are all stringified automatically:

JSON valueHeader value
"abc-123"abc-123
4242
truetrue
null / missing(skipped)
{} / [](skipped)

What the Processor Does

  1. Identifies outbox events by the __outbox sentinel on source.schema (set by the source).
  2. Extracts aggregate_type, aggregate_id, event_type, and payload from event.after.
  3. Resolves the topic using a three-step cascade:
    • Template resolved against the raw payload (e.g. ${domain}.${action} — use your actual column names)
    • Column value (a topic field in the payload, configurable via columns.topic)
    • default_topic fallback
  4. Rewrites event.after to just the payload content.
  5. Sets routing headers: df-event-id, df-aggregate-type, df-aggregate-id, df-event-type, plus any additional_headers mappings.
  6. Sets routing key using the key template (or falls back to aggregate_id).
  7. Marks raw delivery if raw_payload: true — sinks serialize event.after directly, skipping envelope wrapping.
  8. Clears the __outbox sentinel so the event looks like a normal CDC event to sinks.
  9. Drops non-INSERT outbox events (UPDATE/DELETE on the outbox table are meaningless).
  10. Validates in strict mode (strict: true): fails the batch with an error if any required field is missing (topic, payload, aggregate_type, aggregate_id, event_type). The error names the missing fields so operators can fix the schema. In lenient mode (default), missing fields are silently skipped.
  11. Passes through all non-outbox events unchanged.

Multi-Outbox Routing

When a source captures multiple outbox channels, use the tables filter to scope each processor:

processors:
  - type: outbox
    tables: [orders_outbox]
    topic: "orders.${event_type}"

  - type: outbox
    tables: [payments_outbox]
    topic: "payments.${event_type}"
    columns:
      payload: data

Complete Examples

PostgreSQL → Kafka

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: order-events
  tenant: acme
spec:
  source:
    type: postgres
    config:
      id: pg1
      dsn: ${POSTGRES_DSN}
      publication: orders_pub
      slot: orders_slot
      tables: [public.orders]
      outbox:
        prefixes: [outbox]

  processors:
    - type: outbox
      topic: "${aggregate_type}.${event_type}"
      default_topic: "events.unrouted"
      raw_payload: true

  sinks:
    - type: kafka
      config:
        id: k1
        brokers: ${KAFKA_BROKERS}
        topic: "events.fallback"

The raw_payload: true means outbox events hit the wire as the extracted payload JSON. Regular CDC events (from public.orders) still use the sink’s configured envelope.

MySQL → Kafka (Multi-Outbox)

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: shop-events
  tenant: acme
spec:
  source:
    type: mysql
    config:
      id: m1
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders
        - shop.orders_outbox
        - shop.payments_outbox
      outbox:
        tables: ["shop.*_outbox"]

  processors:
    - type: outbox
      tables: [orders_outbox]
      topic: "orders.${event_type}"
      raw_payload: true

    - type: outbox
      tables: [payments_outbox]
      topic: "payments.${event_type}"
      raw_payload: true

  sinks:
    - type: kafka
      config:
        id: k1
        brokers: ${KAFKA_BROKERS}
        topic: "events.default"

Migrating from Debezium

If you’re using Debezium’s outbox event router with a custom schema, DeltaForge’s column mappings and additional_headers map directly. For example, given this Debezium-style outbox table:

CREATE TABLE outbox_events (
  id UUID PRIMARY KEY,
  aggregatetype VARCHAR(64),
  aggregateid VARCHAR(64),
  type VARCHAR(64),
  payload JSONB,
  traceid VARCHAR(64),
  tenant VARCHAR(32)
);

Debezium config:

transforms.outbox.table.field.event.id=id
transforms.outbox.table.field.event.key=aggregateid
transforms.outbox.table.field.event.type=type
transforms.outbox.table.field.event.payload=payload
transforms.outbox.table.fields.additional.placement=traceid:header,tenant:header

DeltaForge equivalent:

processors:
  - type: outbox
    topic: "${aggregatetype}.${type}"
    key: "${aggregateid}"
    raw_payload: true
    columns:
      event_id: id
      aggregate_type: aggregatetype
      aggregate_id: aggregateid
      event_type: type
      payload: payload
    additional_headers:
      x-trace-id: traceid
      x-tenant: tenant

Key differences from Debezium:

  • Topic template references raw column names directly — ${aggregatetype}, not ${aggregate_type}
  • raw_payload: true delivers the extracted payload as-is — same behavior as Debezium’s outbox event router
  • Column mappings only affect header extraction (df-* headers) and payload rewriting
  • additional_headers replaces table.fields.additional.placement
  • No SMT chain — everything is in one processor config

Observability

The outbox processor emits Prometheus-compatible metrics:

MetricLabelsDescription
deltaforge_outbox_transformed_total-Events successfully transformed
deltaforge_outbox_dropped_totalreasonEvents dropped or rejected

Drop reasons:

ReasonMeaning
non_insertUPDATE/DELETE on outbox table (expected, harmless)
non_objectevent.after is not a JSON object
null_payloadevent.after is null
strict_missing_fieldsStrict mode: required field missing (batch fails)

In strict mode, strict_missing_fields increments the counter and returns an error that halts the batch - the pipeline will not silently lose events.

Tips

  • PostgreSQL WAL messages are lightweight. No table, no index, no vacuum - just a single WAL entry per message. Prefer this over table-based outbox when using PostgreSQL.
  • MySQL BLACKHOLE engine avoids storing outbox rows on disk. The row is written to the binlog and immediately discarded. Use it in production to avoid unbounded table growth.
  • Outbox events coexist with normal CDC. The processor passes through all non-outbox events untouched, so you can mix regular table capture and outbox in the same pipeline.
  • Topic templates use ${field} syntax, same as dynamic routing. The template resolves directly against the raw outbox payload columns - use your actual column names like ${domain}.${action}, no remapping needed.
  • At-least-once delivery applies to outbox events just like regular CDC events. Downstream consumers should be idempotent - use the df-event-id header for idempotency, or aggregate_id + event_type as a composite dedup key.
  • Malformed events are logged and dropped by default. Non-INSERT operations, null payloads, and non-object payloads produce a WARN-level log with the table name and reason. They do not propagate to sinks. Enable strict: true to fail the batch instead of dropping - this ensures operators are alerted to schema issues before events are lost.

Architecture

This document describes DeltaForge’s internal architecture, design decisions, and how the major components interact.

Design Principles

Source-Owned Semantics

DeltaForge avoids imposing a universal data model on all sources. Instead, each database source defines and owns its schema semantics:

  • MySQL captures MySQL-specific types, collations, and engine information
  • PostgreSQL captures PostgreSQL-specific types, OIDs, and replica identity
  • Future sources (MongoDB, ClickHouse, TiDB) will capture their native semantics

This approach means downstream consumers receive schemas that accurately reflect the source database rather than a lowest-common-denominator normalization.

Delivery Guarantees First

The checkpoint system is designed around a single invariant:

Checkpoints are only saved after events have been successfully delivered.

This ordering guarantees at-least-once delivery. A crash between checkpoint and delivery would lose events; DeltaForge prevents this by always checkpointing after sink acknowledgment.

Configuration Over Code

Pipelines are defined declaratively in YAML. This enables:

  • Version-controlled pipeline definitions
  • Environment-specific configuration via variable expansion
  • Rapid iteration without recompilation

Component Overview

┌─────────────────────────────────────────────────────────────────┐
│                        DeltaForge Runtime                       │
├─────────────┬─────────────┬─────────────┬─────────────┬─────────┤
│   Sources   │   Schema    │ Coordinator │    Sinks    │ Control │
│             │  Registry   │  + Batch    │             │  Plane  │
├─────────────┼─────────────┼─────────────┼─────────────┼─────────┤
│ MySQL       │ Durable     │ Batching    │ Kafka       │ REST API│
│ PostgreSQL  │ Schema      │ Commit      │ Redis       │ Metrics │
│             │ Registry    │ Policy      │ NATS        │ Health  │
└─────────────┴──────┬──────┴─────────────┴─────────────┴─────────┘
                     │
          ┌──────────┴──────────┐
          │   Storage Backend   │
          │  (SQLite / PG /     │
          │   Memory)           │
          ├─────────────────────┤
          │ KV · Log · Slot     │
          │ Queue               │
          └─────────────────────┘

Data Flow

Event Lifecycle

1. Source reads from database log (binlog/WAL)
        │
        ▼
2. Schema loader maps table_id to schema
        │
        ▼
3. Event constructed with before/after images
        │
        ▼
4. Event sent to coordinator via channel
        │
        ▼
5. Coordinator batches events
        │
        ▼
6. Processors transform batch (JavaScript)
        │
        ▼
7. Sinks deliver batch concurrently
        │
        ▼
8. Commit policy evaluated
        │
        ▼
9. Checkpoint saved (if policy satisfied)

Event Structure

Every CDC event shares a common structure:

#![allow(unused)]
fn main() {
pub struct Event {
    pub source_id: String,          // Source identifier
    pub database: String,           // Database name
    pub table: String,              // Table name
    pub op: Op,                     // Insert, Update, Delete, Ddl
    pub tx_id: Option<u64>,         // Source transaction ID
    pub before: Option<Value>,      // Previous row state
    pub after: Option<Value>,       // New row state
    pub schema_version: Option<String>,  // Schema fingerprint
    pub schema_sequence: Option<u64>,    // For replay lookups
    pub ddl: Option<Value>,         // DDL payload if op == Ddl
    pub timestamp: DateTime<Utc>,   // Event timestamp
    pub checkpoint: Option<CheckpointMeta>,  // Position info
    pub size_bytes: usize,          // For batching
}
}

Schema Registry

Role

The schema registry serves three purposes:

  1. Map table IDs to schemas: Binlog events reference tables by ID; the registry resolves these to full schema metadata
  2. Detect schema changes: Fingerprint comparison identifies when DDL has modified a table
  3. Enable replay: Sequence numbers correlate events with the schema active when they were produced

Schema Sensing

At startup, the schema loader auto-discovers tables via pattern expansion and loads their schemas from the live database catalog before any CDC events arrive.

Pattern expansion supports wildcards:

PatternMatches
db.tableExact table
db.*All tables in database
db.prefix%Tables matching prefix
%.tableTable in any database

DDL detection works through cache invalidation. When the binlog delivers a QueryEvent (DDL), the affected database’s cache is cleared. On the next row event for that table, the schema is re-fetched from INFORMATION_SCHEMA, fingerprinted, and registered as a new version. No separate DDL history log is maintained — the live catalog is always the source of truth.

Failover reconciliation (verifying schemas against a new primary after server identity change) is planned but not yet implemented.

Schema versions are persisted via DurableSchemaRegistry, which uses the StorageBackend Log primitive. On startup the log is replayed to populate an in-memory cache, so hot-path reads have the same performance as the previous in-memory implementation. Cold-start reconstruction is always possible from the log alone.

Schema Registration Flow

1. Schema loader fetches from INFORMATION_SCHEMA
        │
        ▼
2. Compute fingerprint (SHA-256 of structure)
        │
        ▼
3. Check registry for existing schema with same fingerprint
        │
        ├── Found: Return existing version (idempotent)
        │
        └── Not found: Allocate new version number
                │
                ▼
4. Store with: version, fingerprint, JSON, timestamp, sequence, checkpoint

Sequence Numbers

The registry maintains a global monotonic counter. Each schema version receives a sequence number at registration. Events carry this sequence, enabling accurate schema lookup during replay:

Timeline:
─────────────────────────────────────────────────────────────►
     │              │                    │
Schema v1      Schema v2           Schema v3
(seq=1)        (seq=15)            (seq=42)
     │              │                    │
     └──events 1-14─┘──events 15-41─────┘──events 42+──►

Replay at seq=20: Use schema v2 (registered at seq=15, before seq=42)

Checkpoint Store

Timing Guarantee

The checkpoint is saved only after sinks acknowledge delivery:

┌────────┐   events   ┌────────┐   ack    ┌────────────┐
│ Source │ ─────────▶ │  Sink  │ ───────▶ │ Checkpoint │
└────────┘            └────────┘          │   Store    │
                                          └────────────┘

If the process crashes after sending to sink but before checkpoint, events will be replayed. This is the “at-least-once” guarantee — duplicates are possible, but loss is not.

Storage Backends

Checkpoints are stored via BackendCheckpointStore, a thin adapter over the StorageBackend KV primitive. See Storage for backend configuration and the full namespace map.

BackendPersistenceUse Case
SqliteStorageBackendSQLite fileSingle-instance production
MemoryStorageBackendNoneTesting, ephemeral deployments
PostgresStorageBackendExternal DBHA, multi-instance

Checkpoint-Schema Correlation

When registering schemas, the current checkpoint can be attached:

#![allow(unused)]
fn main() {
registry.register_with_checkpoint(
    tenant, db, table,
    &fingerprint,
    &schema_json,
    Some(&checkpoint_bytes),  // Current binlog position
).await?;
}

This creates a link between schema versions and source positions, enabling accurate schema lookup during replay.

Coordinator

The coordinator orchestrates event flow between source and sinks:

Batching

Events are accumulated until a threshold triggers flush:

  • max_events: Event count limit
  • max_bytes: Total serialized size limit
  • max_ms: Time since batch started
  • respect_source_tx: Never split source transactions

Commit Policy

When multiple sinks are configured, the commit policy determines when the checkpoint advances:

#![allow(unused)]
fn main() {
match policy {
    All => required_acks == total_sinks,
    Required => required_acks == sinks.filter(|s| s.required).count(),
    Quorum(n) => required_acks >= n,
}
}

Processor Pipeline

Processors run in declared order, transforming batches:

events ──▶ Processor 1 ──▶ Processor 2 ──▶ ... ──▶ transformed events

Each processor can filter, transform, or enrich events. The JavaScript processor uses deno_core for sandboxed execution.

Hot Paths

Critical performance paths have been optimized:

  1. Event construction - Minimal allocations, reuse buffers
  2. Checkpoint serialization - Opaque bytes avoid repeated JSON encoding
  3. Sink delivery - Batch operations reduce round trips
  4. Schema lookup - In-memory cache with stable fingerprints

Benchmarking

Performance is tracked via:

  • Micro-benchmarks for specific operations
  • End-to-end benchmarks using the Coordinator component
  • Regression detection in CI

Future Architecture

Planned enhancements:

  • Initial snapshot/backfill using the Slot primitive for cursor tracking
  • Event store: time-based replay and schema evolution using the Log primitive
  • Distributed coordination: leader election via the Slot primitive with TTL-based leases
  • Additional sources: MongoDB, SQL Server, TiDB
  • PostgreSQL storage validation: chaos/recovery testing to bring it to production parity with SQLite

Checkpoints

Checkpoints record pipeline progress so ingestion can resume from the last successfully delivered position. DeltaForge guarantees at-least-once delivery by saving checkpoints only after events have been acknowledged by sinks.

Core Guarantee: At-Least-Once Delivery

Checkpoints are only saved after events have been successfully delivered to sinks.

If a checkpoint were saved before delivery, a crash between those two points would silently lose events. DeltaForge prevents this by always checkpointing after sink acknowledgment.

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Source    │────▶│  Processor  │────▶│    Sink     │────▶│ Checkpoint  │
│   (read)    │     │ (transform) │     │  (deliver)  │     │   (save)    │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘

On crash: events since the last checkpoint are replayed. Consumers should be idempotent or use deduplication.

Storage

Checkpoints are stored via the unified StorageBackend - see Storage for backend configuration. All pipelines share the same storage backend; each pipeline’s checkpoint is keyed by its source ID.

BackendPersistenceUse Case
SqliteStorageBackendSQLite file on diskSingle-instance production
MemoryStorageBackendNone (lost on restart)Testing
PostgresStorageBackendExternal databaseHA, multi-instance

Checkpoint Contents

MySQL

Tracks binlog position:

file:     binlog.000042
pos:      12345
gtid_set: (optional, if GTID replication is enabled)

PostgreSQL

Tracks replication stream LSN:

lsn:    0/1A2B3C4D
tx_id:  (optional)

Checkpoint-Schema Correlation

When a schema change is detected, the current checkpoint position is recorded alongside the new schema version. This ensures that during replay, each event is interpreted with the schema that was active when it was produced - even if the table has since been altered.

Commit Policy

When multiple sinks are configured, the commit policy controls when the checkpoint advances:

PolicyBehaviour
allAll sinks must acknowledge
requiredOnly sinks marked required: true must acknowledge
quorum(n)At least n sinks must acknowledge

Set required: true only on sinks where delivery is mandatory for correctness. Optional sinks can fail without blocking the checkpoint.

Operations

Inspecting Checkpoints

sqlite3 ./data/deltaforge.db \
  "SELECT key, length(value) FROM kv WHERE namespace = 'checkpoints';"

Resetting a Pipeline

To force a pipeline to re-read from the beginning, delete its checkpoint:

# Via API
curl -X DELETE http://localhost:8080/pipelines/{name}/checkpoint

# Directly in SQLite
sqlite3 ./data/deltaforge.db \
  "DELETE FROM kv WHERE namespace = 'checkpoints' AND key = '{source-id}';"

Best Practices

  • Back up deltaforge.db regularly - it contains both checkpoints and schema history
  • Monitor checkpoint lag to detect stuck pipelines
  • Use smaller batch sizes for more frequent checkpoints at the cost of throughput
  • Test recovery by killing the process and verifying no events are lost or duplicated

Storage

DeltaForge uses a unified storage layer that backs all runtime state: checkpoints, schema registry, FSM state, leases, and quarantine queues. A single StorageBackend instance is shared across subsystems, so there is one operational concern instead of many.

Backends

BackendPersistenceUse Case
MemoryStorageBackendNone (lost on restart)Testing, ephemeral deployments
SqliteStorageBackendSQLite file on diskSingle-instance production
PostgresStorageBackendExternal databaseHA, multi-instance deployments

Configuration

The backend is selected via CLI flags:

# SQLite (default for production)
deltaforge --config pipeline.yaml --storage-backend sqlite --storage-path ./data/deltaforge.db

# In-memory (testing only — all state lost on restart)
deltaforge --config pipeline.yaml --storage-backend memory

Or via the config file:

storage:
  backend: sqlite
  path: ./data/deltaforge.db

# or:
storage:
  backend: postgres
  dsn: "host=localhost dbname=deltaforge_storage user=df password=secret"

The SQLite backend creates the database file and parent directories automatically on first start. The PostgreSQL backend runs schema migrations on first connect - no manual table creation is needed.

PostgreSQL backend is implemented and available under the postgres feature flag, but has not yet received the same chaos/recovery validation as the SQLite backend. Treat it as beta for production use.

Primitives

The StorageBackend trait exposes four primitives. All operations are namespaced - different subsystems share the same backend without key collisions.

KV (Key-Value with optional TTL)

General-purpose persistent key-value store. Used for checkpoints, FSM state, and leases.

#![allow(unused)]
fn main() {
backend.kv_put("checkpoints", "mysql-pipe1", &bytes).await?;
backend.kv_put_with_ttl("leases", "pipe1", b"alive", ttl_secs).await?;
backend.kv_get("checkpoints", "mysql-pipe1").await?;
backend.kv_delete("checkpoints", "mysql-pipe1").await?;
backend.kv_list("checkpoints", Some("mysql-")).await?; // prefix filter
}

Log (Append-only, globally sequenced)

Immutable append-only log with a global monotonic sequence counter. Used for the schema registry. The sequence is global across all keys in the namespace - interleaved appends to different keys produce strictly increasing sequence numbers.

#![allow(unused)]
fn main() {
let seq = backend.log_append("schemas", "acme/shop/orders", &entry).await?;
let all  = backend.log_list("schemas", "acme/shop/orders").await?;
let tail = backend.log_since("schemas", "acme/shop/orders", since_seq).await?;
}

Slot (Versioned, compare-and-swap)

Single versioned value with CAS semantics. Used for snapshot cursors and leader election. Concurrent CAS operations with the same expected version are serialized - exactly one wins.

#![allow(unused)]
fn main() {
let ver = backend.slot_upsert("snapshots", "pipe/orders", &state).await?;
let won = backend.slot_cas("snapshots", "pipe/orders", ver, &new_state).await?;
let cur = backend.slot_get("snapshots", "pipe/orders").await?; // (version, bytes)
}

Queue (Ordered, ack-based)

Ordered queue with explicit acknowledgement. Used for quarantine and DLQ. Items are not removed until acked; oldest items can be dropped when the queue is full.

#![allow(unused)]
fn main() {
let id = backend.queue_push("quarantine", "pipe/orders", &event_bytes).await?;
let items = backend.queue_peek("quarantine", "pipe/orders", 10).await?;
backend.queue_ack("quarantine", "pipe/orders", id).await?;
backend.queue_drop_oldest("quarantine", "pipe/orders", n).await?;
let len = backend.queue_len("quarantine", "pipe/orders").await?;
}

Namespace Map

Each subsystem owns a fixed namespace. Keys within a namespace never collide across subsystems.

NamespacePrimitiveUsed byKey format
checkpointsKVBackendCheckpointStore{source-id}
schemasLogDurableSchemaRegistry{tenant}/{db}/{table}
schemasKVDurableSchemaRegistry{tenant}/{db}/{table} (index)
snapshotsSlotSnapshot/backfill (planned){pipeline}/{table}
fsmKVFSM state (planned){pipeline}
leasesKV + TTLLeader election (planned){pipeline}
quarantineQueueQuarantine (planned){pipeline}/{table}
dlqQueueDead-letter queue (planned){pipeline}

Adapters

Existing pipeline code uses higher-level interfaces - it never touches the StorageBackend directly.

BackendCheckpointStore

Implements the CheckpointStore trait on top of KV. Drop-in replacement for the old FileCheckpointStore and SqliteCheckpointStore.

#![allow(unused)]
fn main() {
let store = BackendCheckpointStore::new(Arc::clone(&backend));
// CheckpointStore trait methods work unchanged
store.put_raw("mysql-pipe1", &bytes).await?;
store.get_raw("mysql-pipe1").await?;
}

DurableSchemaRegistry

Implements the schema registry on top of the Log primitive. On startup it replays the log to populate an in-memory cache, so hot-path reads are identical in performance to the old InMemoryRegistry. Writes go through the log, enabling cold-start reconstruction.

#![allow(unused)]
fn main() {
// Production: async construction with log replay
let registry = DurableSchemaRegistry::new(Arc::clone(&backend)).await?;

// Tests: sync construction, no replay (MemoryStorageBackend, empty log)
let registry = DurableSchemaRegistry::for_testing();
}

PipelineManager Wiring

PipelineManager::with_backend() wires both subsystems from a single backend:

#![allow(unused)]
fn main() {
let backend: ArcStorageBackend = SqliteStorageBackend::open("./data/deltaforge.db")?;
let manager = PipelineManager::with_backend(backend).await?;
}

Internally this creates a BackendCheckpointStore and a DurableSchemaRegistry from the same backend instance. All pipelines share the same storage file.

Schema Registry

DeltaForge’s schema registry tracks table schemas across time, enabling accurate event interpretation during replay and providing change detection for DDL operations.

Design Philosophy: Source-Owned Schemas

DeltaForge takes a fundamentally different approach to schema handling than many CDC tools. Rather than normalizing all database schemas into a universal type system, each source defines and owns its schema semantics.

This means:

  • MySQL schemas capture MySQL semantics - column types like bigint(20) unsigned, varchar(255), and json are preserved exactly as MySQL defines them
  • PostgreSQL schemas capture PostgreSQL semantics - arrays, custom types, and pg-specific attributes remain intact
  • No lossy normalization - you don’t lose precision or database-specific information by forcing everything into a common format

This design avoids the maintenance burden of keeping a universal type system synchronized across all databases, and it ensures that downstream consumers receive schemas that accurately reflect the source database’s capabilities and constraints.

The SourceSchema Trait

Every CDC source implements the SourceSchema trait, which provides a common interface for fingerprinting and column access while allowing source-specific schema representations:

#![allow(unused)]
fn main() {
pub trait SourceSchema: Serialize + DeserializeOwned + Send + Sync {
    /// Source type identifier (e.g., "mysql", "postgres", "mongodb").
    fn source_kind(&self) -> &'static str;

    /// Content-addressable fingerprint for change detection.
    /// Two schemas with the same fingerprint are identical.
    fn fingerprint(&self) -> String;

    /// Column/field names in ordinal order.
    fn column_names(&self) -> Vec<&str>;

    /// Primary key column names.
    fn primary_key(&self) -> Vec<&str>;

    /// Human-readable description.
    fn describe(&self) -> String;
}
}

Fingerprinting

Schema fingerprints use SHA-256 hashing over JSON-serialized content to provide:

  • Stability - the same schema always produces the same fingerprint
  • Change detection - any structural change produces a different fingerprint
  • Content-addressability - fingerprints can be used as cache keys or deduplication identifiers
#![allow(unused)]
fn main() {
pub fn compute_fingerprint<T: Serialize>(value: &T) -> String {
    let json = serde_json::to_vec(value).unwrap_or_default();
    let hash = Sha256::digest(&json);
    format!("sha256:{}", hex::encode(hash))
}
}

The fingerprint only includes structurally significant fields. For MySQL, this means columns and primary key are included, but engine and charset are excluded since they don’t affect how CDC events should be interpreted.

MySQL Schema Implementation

The MySqlTableSchema struct captures comprehensive MySQL table metadata:

#![allow(unused)]
fn main() {
pub struct MySqlTableSchema {
    /// Columns in ordinal order
    pub columns: Vec<MySqlColumn>,
    
    /// Primary key column names
    pub primary_key: Vec<String>,
    
    /// Storage engine (InnoDB, MyISAM, etc.)
    pub engine: Option<String>,
    
    /// Default charset
    pub charset: Option<String>,
    
    /// Default collation
    pub collation: Option<String>,
}

pub struct MySqlColumn {
    pub name: String,
    pub column_type: String,      // e.g., "bigint(20) unsigned"
    pub data_type: String,        // e.g., "bigint"
    pub nullable: bool,
    pub ordinal_position: u32,
    pub default_value: Option<String>,
    pub extra: Option<String>,    // e.g., "auto_increment"
    pub comment: Option<String>,
    pub char_max_length: Option<i64>,
    pub numeric_precision: Option<i64>,
    pub numeric_scale: Option<i64>,
}
}

Schema information is fetched from INFORMATION_SCHEMA at startup and cached for the pipeline’s lifetime.

Schema Registry Architecture

The schema registry serves three core functions:

  1. Version tracking - maintains a history of schema versions per table
  2. Change detection - compares fingerprints to detect DDL changes
  3. Replay correlation - associates schemas with checkpoint positions for accurate replay

Schema Versions

Each registered schema version includes:

FieldDescription
versionPer-table version number (starts at 1)
hashContent fingerprint for deduplication
schema_jsonFull schema as JSON
registered_atRegistration timestamp
sequenceGlobal monotonic sequence number
checkpointSource checkpoint at registration time

Sequence Numbers for Replay

The registry maintains a global monotonic sequence counter. When a schema is registered, it receives the next sequence number. Events carry this sequence number, enabling the replay engine to look up the correct schema version:

#![allow(unused)]
fn main() {
// During replay: find schema active at event's sequence
let schema = registry.get_at_sequence(tenant, db, table, event.schema_sequence);
}

This ensures events are always interpreted with the schema that was active when they were produced, even if the table structure has since changed.

Checkpoint Correlation

Schemas can be registered with an associated checkpoint, creating a correlation between schema versions and source positions:

#![allow(unused)]
fn main() {
registry.register_with_checkpoint(
    tenant, db, table, 
    &fingerprint, 
    &schema_json,
    Some(checkpoint_bytes),  // Optional: binlog position when schema was observed
).await?;
}

This correlation supports scenarios like:

  • Replaying events from a specific checkpoint with the correct schema
  • Determining which schema was active at a particular binlog position
  • Rolling back schema state along with checkpoint rollback

Schema Loader

The MySqlSchemaLoader handles schema discovery and caching:

Pattern Expansion

Tables are specified using patterns that support wildcards:

PatternDescription
db.tableExact match
db.*All tables in database
db.prefix%Tables starting with prefix
%.tableTable in any database

Preloading

At startup, the loader expands patterns and preloads all matching schemas:

#![allow(unused)]
fn main() {
let schema_loader = MySqlSchemaLoader::new(dsn, registry, tenant);
let tracked_tables = schema_loader.preload(&["shop.orders", "shop.order_%"]).await?;
}

This ensures schemas are available before the first CDC event arrives.

Caching

Loaded schemas are cached to avoid repeated INFORMATION_SCHEMA queries:

#![allow(unused)]
fn main() {
// Fast path: return cached schema
if let Some(cached) = cache.get(&(db, table)) {
    return Ok(cached.clone());
}

// Slow path: fetch from database, register, cache
let schema = fetch_schema(db, table).await?;
let version = registry.register(...).await?;
cache.insert((db, table), loaded_schema);
}

DDL Handling

When the binlog contains DDL events, the schema loader responds:

DDL TypeAction
CREATE TABLESchema loaded on first row event
ALTER TABLECache invalidated, reloaded on next row
DROP TABLECache entry removed
TRUNCATENo schema change
RENAME TABLEOld removed, new loaded on first row

DDL detection uses the QueryEvent type in the binlog. On DDL, the entire database’s schema cache is invalidated since MySQL doesn’t always specify the exact table in DDL events.

API Endpoints

Reload Schemas

Force reload schemas from the database:

curl -X POST http://localhost:8080/pipelines/{name}/schemas/reload

This clears the cache and re-fetches schemas for all tracked tables.

List Cached Schemas

View currently cached schemas:

curl http://localhost:8080/pipelines/{name}/schemas

Limitations

  • In-memory registry - Schema versions are lost on restart. Persistent backends (SQLite, then PostgreSQL for HA) are planned.
  • No cross-pipeline sharing - Each pipeline maintains its own registry instance
  • Pattern expansion at startup - New tables matching patterns require pipeline restart or reload

Best Practices

  1. Use explicit table patterns in production to avoid accidentally capturing unwanted tables
  2. Monitor schema reload times - slow reloads may indicate overly broad patterns
  3. Trigger schema reload after DDL if your deployment process modifies schemas
  4. Include schema version in downstream events for consumers that need schema evolution awareness

Troubleshooting

Unknown table_id Errors

WARN write_rows for unknown table_id, table_id=42

The binlog contains row events for a table not in the table_map. This happens when:

  • A table was created after the CDC stream started
  • Table patterns don’t match the table

Solution: Trigger a schema reload via the REST API.

Schema Fetch Returned 0 Columns

WARN schema fetch returned 0 columns, db=shop, table=orders

Usually indicates:

  • Table doesn’t exist
  • MySQL user lacks SELECT privilege on INFORMATION_SCHEMA
  • Table was dropped between detection and schema load

Slow Schema Loading

WARN slow schema fetch, db=shop, table=orders, ms=350

Consider:

  • Narrowing table patterns to reduce the number of tables
  • Using exact table names instead of wildcards
  • Verifying network latency to the MySQL server

Schema Sensing

Schema sensing automatically infers and tracks schema structure from JSON event payloads. This complements the schema registry by discovering schema from data rather than database metadata.

When to Use Schema Sensing

Schema sensing is useful when:

  • Source doesn’t provide schema: Some sources emit JSON without metadata
  • JSON columns: Database JSON/JSONB columns have dynamic structure
  • Schema evolution tracking: Detect when payload structure changes over time
  • Downstream integration: Generate JSON Schema for consumers
  • Dynamic map keys: Session IDs, trace IDs, or other high-cardinality keys in JSON

How It Works

┌──────────────┐     ┌─────────────────┐     ┌──────────────────┐
│    Event     │────▶│  Schema Sensor  │────▶│  Inferred Schema │
│   Payload    │     │   (sampling)    │     │   + Fingerprint  │
└──────────────┘     └─────────────────┘     └──────────────────┘
                              │
                              ▼
                     ┌─────────────────┐
                     │ Structure Cache │
                     │ + HC Classifier │
                     └─────────────────┘
  1. Observation: Events flow through the sensor during batch processing
  2. Sampling: Not every event is fully analyzed (configurable rate)
  3. Deep inspection: Nested JSON structures are recursively analyzed
  4. High-cardinality detection: Dynamic map keys are classified and normalized
  5. Fingerprinting: Schema changes are detected via SHA-256 fingerprints
  6. Caching: Repeated structures skip full analysis for performance

High-Cardinality Key Handling

JSON payloads often contain dynamic keys like session IDs, trace IDs, or user-generated identifiers:

{
  "id": 1,
  "sessions": {
    "sess_abc123": {"user_id": 42, "started_at": 1700000000},
    "sess_xyz789": {"user_id": 43, "started_at": 1700000001}
  }
}

Without special handling, each unique key (sess_abc123, sess_xyz789) triggers a “schema evolution” event, causing:

  • 0% cache hit rate
  • Constant false evolution alerts
  • Unbounded schema growth

How It Works

DeltaForge uses probabilistic data structures (HyperLogLog, SpaceSaving) to classify fields:

ClassificationDescriptionExample
Stable fieldsAppear in most eventsid, type, timestamp
Dynamic fieldsUnique per event, high cardinalitysess_*, trace_*, uuid_*

When dynamic fields are detected, the schema sensor:

  1. Normalizes keys: Replaces sess_abc123 with <dynamic> placeholder
  2. Uses adaptive hashing: Structure cache ignores dynamic key names
  3. Produces stable fingerprints: Same schema despite different keys

Results

ScenarioWithout HCWith HC
Nested dynamic keys100% evolution rate<1% evolution rate
Top-level dynamic keys0% cache hits>99% cache hits
Stable structsBaseline~20% overhead during warmup, then ~0%

Configuration

Example

spec:
  schema_sensing:
    enabled: true
    
    deep_inspect:
      enabled: true
      max_depth: 3
      max_sample_size: 500
    
    sampling:
      warmup_events: 50
      sample_rate: 5
      structure_cache: true
      structure_cache_size: 50
    
    high_cardinality:
      enabled: true
      min_events: 100
      stable_threshold: 0.5
      min_dynamic_fields: 5
      confidence_threshold: 0.7
      reevaluate_interval: 10000

Options

FieldTypeDefaultDescription
enabledboolfalseEnable schema sensing
deep_inspect
enabledboolfalseInspect nested JSON
max_depthint3Max nesting depth
max_sample_sizeint500Max events for deep analysis
sampling
warmup_eventsint50Full analysis before sampling
sample_rateint5After warmup, analyze 1 in N
structure_cachebooltrueCache structure hashes
structure_cache_sizeint50Max cached per table
high_cardinality
enabledbooltrueDetect dynamic map keys
min_eventsint100Events before classification
stable_thresholdfloat0.5Frequency for stable fields
min_dynamic_fieldsint5Min unique for map detection
confidence_thresholdfloat0.7Required confidence
reevaluate_intervalint10000Re-check interval (0=never)

Inferred Types

Schema sensing infers these JSON types:

TypeDescription
nullJSON null value
booleantrue/false
integerWhole numbers
numberFloating point numbers
stringText values
arrayJSON arrays (element types tracked)
objectNested objects (fields recursively analyzed)

For fields with varying types across events, all observed types are recorded.

Schema Evolution

When schema structure changes, the sensor:

  1. Detects change: Fingerprint differs from previous version
  2. Increments sequence: Monotonic version number increases
  3. Logs evolution: Emits structured log with old/new fingerprints
  4. Updates cache: New structure becomes current

Evolution events are available via the REST API and can trigger alerts.

Stabilization

After observing enough events, a schema “stabilizes”:

  • Warmup phase completes
  • Structure stops changing
  • Sampling rate takes effect
  • Cache hit rate increases

Stabilized schemas have stabilized: true in API responses.

API Access

List Inferred Schemas

curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas

Get Schema Details

curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders

Export as JSON Schema

curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders/json-schema

Cache Statistics

curl http://localhost:8080/pipelines/my-pipeline/sensing/stats

Dynamic Map Classifications

curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders/classifications

Response:

{
  "table": "orders",
  "paths": {
    "": {"stable_fields": ["id", "type", "timestamp"], "has_dynamic_fields": false},
    "sessions": {"stable_fields": [], "has_dynamic_fields": true, "unique_keys": 1523},
    "metadata": {"stable_fields": ["version"], "has_dynamic_fields": true, "unique_keys": 42}
  }
}

Drift Detection

Schema sensing integrates with drift detection to compare:

  • Expected schema: From database metadata (schema registry)
  • Observed schema: From event payloads (schema sensing)

When mismatches occur, drift events are recorded:

Drift TypeDescription
unexpected_nullNon-nullable column has null values
type_mismatchObserved type differs from declared type
undeclared_columnField in data not in schema
missing_columnSchema field never seen in data
json_structure_changeJSON column structure changed

Access drift data via:

curl http://localhost:8080/pipelines/my-pipeline/drift

Performance Considerations

Sampling Tradeoffs

SettingEffect
Higher warmup_eventsBetter initial accuracy, slower stabilization
Higher sample_rateLower CPU usage, slower evolution detection
Larger structure_cache_sizeMore memory, better hit rate

High-throughput pipelines (>10k events/sec):

sampling:
  warmup_events: 100
  sample_rate: 10
  structure_cache: true
  structure_cache_size: 100
high_cardinality:
  enabled: true
  min_events: 200

Schema evolution monitoring:

sampling:
  warmup_events: 25
  sample_rate: 2
  structure_cache: true
high_cardinality:
  enabled: true
  min_events: 50

Payloads with dynamic keys (session stores, feature flags):

sampling:
  structure_cache: true
  structure_cache_size: 50
high_cardinality:
  enabled: true
  min_events: 100
  min_dynamic_fields: 3
  stable_threshold: 0.5

Development/debugging:

sampling:
  warmup_events: 10
  sample_rate: 1  # Analyze every event
high_cardinality:
  enabled: true
  min_events: 20  # Faster classification

Example: JSON Column Sensing

For tables with JSON columns, sensing reveals the internal structure:

# Database schema shows: metadata JSON
# Sensing reveals:
fields:
  - name: "metadata.user_agent"
    types: ["string"]
    nullable: false
  - name: "metadata.ip_address"
    types: ["string"]
    nullable: true
  - name: "metadata.tags"
    types: ["array"]
    array_element_types: ["string"]

This enables downstream consumers to understand JSON column structure without manual documentation.

Metrics

Schema sensing emits these Prometheus metrics:

MetricTypeLabelsDescription
deltaforge_schema_events_totalCountertableTotal events observed
deltaforge_schema_cache_hits_totalCountertableStructure cache hits
deltaforge_schema_cache_misses_totalCountertableStructure cache misses
deltaforge_schema_evolutions_totalCountertableSchema evolutions detected
deltaforge_schema_tables_totalGauge-Tables with detected schemas
deltaforge_schema_dynamic_maps_totalGauge-Paths classified as dynamic maps
deltaforge_schema_sensing_secondsHistogramtablePer-event sensing latency

Example Queries

# Cache hit rate per table
sum(rate(deltaforge_schema_cache_hits_total[5m])) by (table)
/
sum(rate(deltaforge_schema_events_total[5m])) by (table)

# Schema evolution rate (should be near zero after warmup)
sum(rate(deltaforge_schema_evolutions_total[5m])) by (table)

# P99 sensing latency
histogram_quantile(0.99, rate(deltaforge_schema_sensing_seconds_bucket[5m]))

Observability playbook

DeltaForge already ships a Prometheus exporter, structured logging, and a panic hook. The runtime now emits source ingress counters, batching/processor histograms, and sink latency/throughput so operators can build production dashboards immediately. The tables below capture what is wired today and the remaining gaps to make the platform production ready for data and infra engineers.

What exists today

  • Prometheus endpoint served at /metrics (default 0.0.0.0:9000) with descriptors for pipeline counts, source/sink counters, and a stage latency histogram. The recorder is installed automatically when metrics are enabled.
  • Structured logging via tracing_subscriber with JSON output by default, optional targets, and support for RUST_LOG overrides.
  • Panic hook increments a deltaforge_panics_total counter and logs captured panics before delegating to the default hook.

Instrumentation gaps and recommendations

The sections below call out concrete metrics and log events to add per component. All metrics should include pipeline, tenant, and component identifiers where applicable so users can aggregate across fleets.

Sources (MySQL/Postgres)

StatusMetric/logRationale
✅ Implementeddeltaforge_source_events_total{pipeline,source,table} counter increments when MySQL events are handed to the coordinator.Surfaces ingress per table and pipeline.
✅ Implementeddeltaforge_source_reconnects_total{pipeline,source} counter when binlog reads reconnect.Makes retry storms visible.
🚧 Gapdeltaforge_source_lag_seconds{pipeline,source} gauge based on binlog/WAL position vs. server time.Alert when sources fall behind.
🚧 Gapdeltaforge_source_idle_seconds{pipeline,source} gauge updated when no events arrive within the inactivity window.Catch stuck readers before downstream backlogs form.

Coordinator and batching

StatusMetric/logRationale
✅ Implementeddeltaforge_batch_events{pipeline} and deltaforge_batch_bytes{pipeline} histograms in Coordinator::process_deliver_and_maybe_commit.Tune batching policies with data.
✅ Implementeddeltaforge_stage_latency_seconds{pipeline,stage,trigger} histogram for processor stage.Provides batch timing per trigger (timer/limits/shutdown).
✅ Implementeddeltaforge_processor_latency_seconds{pipeline,processor} histogram around every processor invocation.Identify slow user functions.
🚧 Gapdeltaforge_pipeline_channel_depth{pipeline} gauge from mpsc::Sender::capacity()/len().Detect backpressure between sources and coordinator.
🚧 GapCheckpoint outcome counters/logs (deltaforge_checkpoint_success_total / _failure_total).Alert on persistence regressions and correlate to data loss risk.

Sinks (Kafka/Redis/custom)

StatusMetric/logRationale
✅ Implementeddeltaforge_sink_events_total{pipeline,sink} counter and deltaforge_sink_latency_seconds{pipeline,sink} histogram around each send.Throughput and responsiveness per sink.
✅ Implementeddeltaforge_sink_batch_total{pipeline,sink} counter for send.Number of batches sent per sink.
🚧 GapError taxonomy in deltaforge_sink_failures_total (add kind/details).Easier alerting on specific failure classes (auth, timeout, schema).
🚧 GapBackpressure gauge for client buffers (rdkafka queue, Redis pipeline depth).Early signal before errors occur.
🚧 GapDrop/skip counters from processors/sinks.Auditing and reconciliation.

Control plane and health endpoints

NeedSuggested metric/logRationale
API request accountingdeltaforge_api_requests_total{route,method,status} counter and latency histogram using Axum middleware.Production-grade visibility of operator actions.
Ready/Liveness transitionsLogs with pipeline counts and per-pipeline status when readiness changes.Explain probe failures in log aggregation.
Pipeline lifecycleCounters for create/patch/stop actions with success/error labels; include tenant and caller metadata in logs.Auditable control-plane operations.

Examples

Complete pipeline configurations demonstrating common DeltaForge use cases. Each example is ready to run with minimal modifications.

Available Examples

ExampleSourceSink(s)Key Features
MySQL to RedisMySQLRedisJavaScript processor, PII redaction
Turso to KafkaTurso/libSQLKafkaNative CDC, CloudEvents envelope
PostgreSQL to NATSPostgreSQLNATSLogical replication, CloudEvents
Multi-Sink Fan-OutMySQLKafka + Redis + NATSMultiple envelopes, selective checkpointing
Event FilteringMySQLKafkaJavaScript filtering, PII redaction
Schema SensingPostgreSQLKafkaJSON schema inference, drift detection
Production KafkaPostgreSQLKafkaSASL/SSL auth, exactly-once, tuning
Cache InvalidationMySQLRedisCDC stream for cache invalidation workers
Audit TrailPostgreSQLKafkaCompliance logging, PII redaction
Analytics PreprocessingMySQLKafka + RedisMetrics enrichment, analytics stream
Outbox PatternMySQLKafkaTransactional outbox, raw payload, per-aggregate routing

Quick Start

  1. Set environment variables for your database and sink connections
  2. Copy the example to a .yaml file
  3. Run DeltaForge:
    cargo run -p runner -- --config your-pipeline.yaml
    

Examples by Category

Getting Started

ExampleDescription
MySQL to RedisSimple pipeline with JavaScript transformation
Turso to KafkaEdge database to Kafka with CloudEvents
PostgreSQL to NATSPostgreSQL logical replication to NATS

Production Patterns

ExampleDescription
Production KafkaAuthentication, exactly-once, performance tuning
Multi-Sink Fan-OutMultiple sinks with different formats
Cache InvalidationCDC stream for cache invalidation
Outbox PatternTransactional outbox with raw payload delivery

Data Processing

ExampleDescription
Event FilteringFilter, drop, and redact events
Schema SensingAutomatic JSON schema discovery
Analytics PreprocessingPrepare events for analytics platforms

Compliance & Auditing

ExampleDescription
Audit TrailSOC2/HIPAA/GDPR-compliant change tracking

Examples by Feature

Envelope Formats

JavaScript Processors

Use CaseExample
PII RedactionMySQL to Redis, Audit Trail
Event FilteringEvent Filtering
EnrichmentTurso to Kafka, Analytics Preprocessing
Cache Key GenerationCache Invalidation
Audit MetadataAudit Trail

Multi-Sink Patterns

PatternExample
Fan-out with different formatsMulti-Sink Fan-Out
Primary + best-effort secondaryMulti-Sink Fan-Out, Analytics Preprocessing

Customizing Examples

Change Envelope Format

All sinks support configurable envelopes. Add to any sink config:

sinks:
  - type: kafka
    config:
      # ... other config
      envelope:
        type: cloudevents          # or: native, debezium
        type_prefix: "com.example" # required for cloudevents
      encoding: json

See Envelopes and Encodings for details.

Add Multiple Sinks

Fan out to multiple destinations with different formats:

sinks:
  - type: kafka
    config:
      id: primary-kafka
      envelope:
        type: debezium
      required: true    # Must succeed for checkpoint
  - type: redis
    config:
      id: cache-redis
      envelope:
        type: native
      required: false   # Best-effort, won't block checkpoint

See Sinks documentation for multi-sink patterns.

Enable Schema Sensing

Automatically discover JSON structure in your data:

schema_sensing:
  enabled: true
  deep_inspect:
    enabled: true
    max_depth: 3
  sampling:
    warmup_events: 100
    sample_rate: 10

See Schema Sensing for configuration options.

More Resources

MySQL to Redis

This example streams MySQL binlog events into a Redis stream with an inline JavaScript transformation for PII redaction.

Overview

ComponentConfiguration
SourceMySQL binlog CDC
ProcessorJavaScript email redaction
SinkRedis Streams
EnvelopeNative (configurable)

Pipeline Configuration

metadata:
  name: orders-mysql-to-redis
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders

  processors:
    - type: javascript
      id: redact-email
      inline: |
        function processBatch(events) {
          return events.map((event) => {
            if (event.after && event.after.email) {
              event.after.email = "[redacted]";
            }
            return event;
          });
        }
      limits:
        timeout_ms: 500

  sinks:
    - type: redis
      config:
        id: orders-redis
        uri: ${REDIS_URI}
        stream: orders
        envelope:
          type: native
        encoding: json
        required: true

  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 1000

  commit_policy:
    mode: required

Running the Example

1. Set Environment Variables

export MYSQL_DSN="mysql://user:password@localhost:3306/shop"
export REDIS_URI="redis://localhost:6379"

2. Start DeltaForge

# Save config as mysql-redis.yaml
cargo run -p runner -- --config mysql-redis.yaml

3. Insert Test Data

INSERT INTO shop.orders (email, total, status)
VALUES ('alice@example.com', 99.99, 'pending');

4. Verify in Redis

./dev.sh redis-read orders 10

You should see the event with the email redacted:

{
  "before": null,
  "after": {
    "id": 1,
    "email": "[redacted]",
    "total": 99.99,
    "status": "pending"
  },
  "op": "c",
  "ts_ms": 1700000000000
}

Variations

With Debezium Envelope

For Kafka Connect compatibility downstream:

sinks:
  - type: redis
    config:
      id: orders-redis
      uri: ${REDIS_URI}
      stream: orders
      envelope:
        type: debezium

Multi-Sink Fan-Out

Add Kafka alongside Redis for durability:

sinks:
  - type: kafka
    config:
      id: orders-kafka
      brokers: ${KAFKA_BROKERS}
      topic: orders
      envelope:
        type: debezium
      required: true    # Critical path

  - type: redis
    config:
      id: orders-redis
      uri: ${REDIS_URI}
      stream: orders
      envelope:
        type: native
      required: false   # Best-effort

With this configuration, checkpoints only wait for Kafka. Redis failures won’t block the pipeline.

With Schema Sensing

Automatically track schema changes:

spec:
  # ... source and sinks config ...

  schema_sensing:
    enabled: true
    deep_inspect:
      enabled: true
      max_depth: 3
    sampling:
      warmup_events: 100
      sample_rate: 10

Key Concepts Demonstrated

  • JavaScript Processors: Transform events in-flight with custom logic
  • PII Redaction: Mask sensitive data before it reaches downstream systems
  • Envelope Configuration: Choose output format based on consumer needs
  • Commit Policy: Control checkpoint behavior with required flag

Example: Turso to Kafka

This example demonstrates streaming changes from a Turso database to Kafka with schema sensing enabled.

Use Case

You have a Turso database (or local libSQL) and want to:

  • Stream table changes to a Kafka topic
  • Automatically detect schema structure from JSON payloads
  • Transform events with JavaScript before publishing

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: turso2kafka
  tenant: acme

spec:
  source:
    type: turso
    config:
      id: turso-main
      # Local libSQL for development
      url: "http://127.0.0.1:8080"
      # For Turso cloud:
      # url: "libsql://your-db.turso.io"
      # auth_token: "${TURSO_AUTH_TOKEN}"
      tables: ["users", "orders", "order_items"]
      poll_interval_ms: 1000
      cdc_mode: auto

  processors:
    - type: javascript
      id: enrich
      inline: |
        function processBatch(events) {
          return events.map(event => {
            // Add custom metadata to events
            event.source_type = "turso";
            event.processed_at = new Date().toISOString();
            return event;
          });
        }

  sinks:
    - type: kafka
      config:
        id: kafka-main
        brokers: "${KAFKA_BROKERS}"
        topic: turso.changes
        required: true
        exactly_once: false
        client_conf:
          message.timeout.ms: "5000"
          acks: "all"

  batch:
    max_events: 100
    max_bytes: 1048576
    max_ms: 500
    respect_source_tx: false
    max_inflight: 2

  commit_policy:
    mode: required

  schema_sensing:
    enabled: true
    deep_inspect:
      enabled: true
      max_depth: 3
    sampling:
      warmup_events: 50
      sample_rate: 5
      structure_cache: true

Running the Example

1. Start Infrastructure

# Start Kafka and other services
./dev.sh up

# Create the target topic
./dev.sh k-create turso.changes 6

2. Start Local libSQL (Optional)

For local development without Turso cloud:

# Using sqld (libSQL server)
sqld --http-listen-addr 127.0.0.1:8080

# Or with Docker
docker run -p 8080:8080 ghcr.io/libsql/sqld:latest

3. Create Test Tables

CREATE TABLE users (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  name TEXT NOT NULL,
  email TEXT UNIQUE,
  metadata TEXT  -- JSON column
);

CREATE TABLE orders (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  user_id INTEGER NOT NULL,
  total REAL NOT NULL,
  status TEXT DEFAULT 'pending',
  created_at TEXT DEFAULT CURRENT_TIMESTAMP
);

4. Run DeltaForge

# Save config as turso-kafka.yaml
cargo run -p runner -- --config turso-kafka.yaml

5. Insert Test Data

INSERT INTO users (name, email, metadata) 
VALUES ('Alice', 'alice@example.com', '{"role": "admin", "tags": ["vip"]}');

INSERT INTO orders (user_id, total, status) 
VALUES (1, 99.99, 'completed');

6. Verify Events in Kafka

./dev.sh k-consume turso.changes --from-beginning

You should see events like:

{
  "event_id": "550e8400-e29b-41d4-a716-446655440000",
  "tenant_id": "acme",
  "table": "users",
  "op": "insert",
  "after": {
    "id": 1,
    "name": "Alice",
    "email": "alice@example.com",
    "metadata": "{\"role\": \"admin\", \"tags\": [\"vip\"]}"
  },
  "source_type": "turso",
  "processed_at": "2025-01-15T10:30:00.000Z",
  "timestamp": "2025-01-15T10:30:00.123Z"
}

Monitoring

Check Pipeline Status

curl http://localhost:8080/pipelines/turso2kafka

View Inferred Schemas

# List all inferred schemas
curl http://localhost:8080/pipelines/turso2kafka/sensing/schemas

# Get details for users table
curl http://localhost:8080/pipelines/turso2kafka/sensing/schemas/users

# Export as JSON Schema
curl http://localhost:8080/pipelines/turso2kafka/sensing/schemas/users/json-schema

Check Drift Detection

curl http://localhost:8080/pipelines/turso2kafka/drift

Turso Cloud Configuration

For production with Turso cloud:

source:
  type: turso
  config:
    id: turso-prod
    url: "libsql://mydb-myorg.turso.io"
    auth_token: "${TURSO_AUTH_TOKEN}"
    tables: ["*"]
    cdc_mode: native
    poll_interval_ms: 1000
    native_cdc:
      level: data

Set the auth token via environment variable:

export TURSO_AUTH_TOKEN="your-token-here"

Notes

  • CDC Mode: auto tries native CDC first, then falls back to triggers or polling
  • Poll Interval: Lower values reduce latency but increase database load
  • Schema Sensing: Automatically discovers JSON structure in text columns
  • Exactly Once: Set to false for higher throughput; use true if Kafka cluster supports EOS

PostgreSQL to NATS

This one streams PostgreSQL logical replication changes to NATS JetStream with CloudEvents envelope format, for serverless architectures for example.

Overview

ComponentConfiguration
SourcePostgreSQL logical replication
ProcessorNone (passthrough)
SinkNATS JetStream
EnvelopeCloudEvents 1.0

Use Case

You have a PostgreSQL database and want to:

  • Stream changes to NATS for event-driven microservices
  • Use CloudEvents format for AWS Lambda, Azure Functions, or Knative
  • Leverage JetStream for durable, replay-capable event streams

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: users-postgres-to-nats
  tenant: acme

spec:
  source:
    type: postgres
    config:
      id: users-postgres
      dsn: ${POSTGRES_DSN}
      slot: deltaforge_users
      publication: users_pub
      tables:
        - public.users
        - public.profiles
        - public.user_sessions
      start_position: earliest

  sinks:
    - type: nats
      config:
        id: users-nats
        url: ${NATS_URL}
        subject: users.events
        stream: USERS
        envelope:
          type: cloudevents
          type_prefix: "com.acme.users"
        encoding: json
        required: true
        send_timeout_secs: 5
        batch_timeout_secs: 30

  batch:
    max_events: 500
    max_ms: 500
    respect_source_tx: true

  commit_policy:
    mode: required

Prerequisites

PostgreSQL Setup

-- Enable logical replication (postgresql.conf)
-- wal_level = logical

-- Create publication for the tables
CREATE PUBLICATION users_pub FOR TABLE users, profiles, user_sessions;

-- Verify publication
SELECT * FROM pg_publication_tables WHERE pubname = 'users_pub';

NATS JetStream Setup

# Start NATS with JetStream enabled
./dev.sh up

# Create the stream
./dev.sh nats-stream-add USERS 'users.>'

Running the Example

1. Set Environment Variables

export POSTGRES_DSN="postgres://user:password@localhost:5432/mydb"
export NATS_URL="nats://localhost:4222"

2. Start DeltaForge

cargo run -p runner -- --config postgres-nats.yaml

3. Insert Test Data

INSERT INTO users (name, email, created_at)
VALUES ('Alice', 'alice@example.com', NOW());

UPDATE users SET email = 'alice.new@example.com' WHERE name = 'Alice';

4. Verify in NATS

./dev.sh nats-sub 'users.>'

You should see CloudEvents formatted messages:

{
  "specversion": "1.0",
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "source": "deltaforge/users-postgres/public.users",
  "type": "com.acme.users.created",
  "time": "2025-01-15T10:30:00.000Z",
  "datacontenttype": "application/json",
  "subject": "public.users",
  "data": {
    "before": null,
    "after": {
      "id": 1,
      "name": "Alice",
      "email": "alice@example.com",
      "created_at": "2025-01-15T10:30:00.000Z"
    },
    "op": "c"
  }
}

Variations

With Debezium Envelope

For compatibility with existing Debezium consumers:

sinks:
  - type: nats
    config:
      id: users-nats
      url: ${NATS_URL}
      subject: users.events
      stream: USERS
      envelope:
        type: debezium

With Authentication

sinks:
  - type: nats
    config:
      id: users-nats
      url: ${NATS_URL}
      subject: users.events
      stream: USERS
      envelope:
        type: cloudevents
        type_prefix: "com.acme.users"
      credentials_file: /path/to/nats.creds
      # Or use username/password:
      # username: ${NATS_USER}
      # password: ${NATS_PASS}

Starting from Latest

Skip existing data and only capture new changes:

source:
  type: postgres
  config:
    id: users-postgres
    dsn: ${POSTGRES_DSN}
    slot: deltaforge_users
    publication: users_pub
    tables:
      - public.users
    start_position: latest

Key Concepts Demonstrated

  • PostgreSQL Logical Replication: Production-ready CDC with slot management
  • CloudEvents Format: Standard envelope for cloud-native event routing
  • JetStream Durability: Replay-capable event streams with consumer acknowledgment
  • Transaction Preservation: respect_source_tx: true keeps related changes together

Multi-Sink Fan-Out

This example demonstrates streaming changes to multiple destinations simultaneously, each with a different envelope format tailored to its consumers.

Overview

ComponentConfiguration
SourceMySQL binlog CDC
ProcessorJavaScript enrichment
SinksKafka (Debezium) + Redis (Native) + NATS (CloudEvents)
PatternFan-out with format adaptation

Use Case

You have a MySQL database and need to:

  • Send to Kafka Connect (requires Debezium format)
  • Populate a Redis cache (native format for efficiency)
  • Trigger serverless functions via NATS (CloudEvents format)
  • Handle sink failures independently without blocking the pipeline

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-fan-out
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders
        - shop.order_items

  processors:
    - type: javascript
      id: enrich
      inline: |
        function processBatch(events) {
          return events.map(event => {
            // Add routing hints via tags (tags is a valid Event field)
            event.tags = event.tags || [];
            
            if (event.table.includes('orders')) {
              event.tags.push('entity:order');
              // High-value orders get priority tag
              if (event.after && event.after.total > 1000) {
                event.tags.push('priority:high');
              }
            }
            
            event.tags.push('enriched');
            return event;
          });
        }
      limits:
        timeout_ms: 500

  sinks:
    # Primary: Kafka for data warehouse / Kafka Connect
    # Uses Debezium format for ecosystem compatibility
    - type: kafka
      config:
        id: warehouse-kafka
        brokers: ${KAFKA_BROKERS}
        topic: orders.cdc
        envelope:
          type: debezium
        encoding: json
        required: true           # Must succeed for checkpoint
        exactly_once: false
        client_conf:
          acks: "all"

    # Secondary: Redis for real-time cache
    # Uses native format for minimal overhead
    - type: redis
      config:
        id: cache-redis
        uri: ${REDIS_URI}
        stream: orders:cache
        envelope:
          type: native
        encoding: json
        required: false          # Best-effort, won't block pipeline

    # Tertiary: NATS for serverless triggers
    # Uses CloudEvents for Lambda/Functions compatibility
    - type: nats
      config:
        id: serverless-nats
        url: ${NATS_URL}
        subject: orders.events
        stream: ORDERS
        envelope:
          type: cloudevents
          type_prefix: "com.acme.orders"
        encoding: json
        required: false          # Best-effort

  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 500
    respect_source_tx: true

  commit_policy:
    mode: required    # Only wait for required sinks (Kafka)

How It Works

Checkpoint Behavior

With commit_policy.mode: required:

Source Event
    │
    ▼
┌─────────────────────────────────────────────────────┐
│              Parallel Sink Delivery                 │
├─────────────────┬─────────────────┬─────────────────┤
│ Kafka           │ Redis           │ NATS            │
│ required: true  │ required: false │ required: false │
│                 │                 │                 │
│ ✓ Must succeed  │ ✓ Best-effort   │ ✓ Best-effort   │
└────────┬────────┴────────┬────────┴──────┬──────────┘
         │                 │               │
         ▼                 │               │
    Checkpoint <───────────┘               │
    advances                               │
    (even if Redis/NATS fail)              │

Output Formats

Kafka (Debezium):

{
  "payload": {
    "before": null,
    "after": {"id": 1, "total": 1500.00, "status": "pending"},
    "source": {"connector": "mysql", "db": "shop", "table": "orders"},
    "op": "c",
    "ts_ms": 1700000000000
  }
}

Redis (Native):

{
  "before": null,
  "after": {"id": 1, "total": 1500.00, "status": "pending"},
  "source": {"connector": "mysql", "db": "shop", "table": "orders"},
  "op": "c",
  "ts_ms": 1700000000000
}

NATS (CloudEvents):

{
  "specversion": "1.0",
  "id": "evt-123",
  "source": "deltaforge/orders-mysql/shop.orders",
  "type": "com.acme.orders.created",
  "time": "2025-01-15T10:30:00.000Z",
  "data": {
    "before": null,
    "after": {"id": 1, "total": 1500.00, "status": "pending"},
    "op": "c"
  }
}

Running the Example

1. Set Environment Variables

export MYSQL_DSN="mysql://user:password@localhost:3306/shop"
export KAFKA_BROKERS="localhost:9092"
export REDIS_URI="redis://localhost:6379"
export NATS_URL="nats://localhost:4222"

2. Start Infrastructure

./dev.sh up
./dev.sh k-create orders.cdc 6
./dev.sh nats-stream-add ORDERS 'orders.>'

3. Start DeltaForge

cargo run -p runner -- --config fan-out.yaml

4. Insert Test Data

INSERT INTO shop.orders (customer_id, total, status)
VALUES (1, 1500.00, 'pending');

5. Verify Each Sink

# Kafka
./dev.sh k-consume orders.cdc --from-beginning

# Redis
./dev.sh redis-read orders:cache 10

# NATS
./dev.sh nats-sub 'orders.>'

Variations

Quorum Mode

Require 2 of 3 sinks to succeed:

sinks:
  - type: kafka
    config:
      id: kafka-1
      required: true   # Counts toward quorum
  - type: redis
    config:
      id: redis-1
      required: true   # Counts toward quorum
  - type: nats
    config:
      id: nats-1
      required: true   # Counts toward quorum

commit_policy:
  mode: quorum
  quorum: 2            # Any 2 must succeed

All-or-Nothing

Require all sinks to succeed (strongest consistency):

commit_policy:
  mode: all

⚠️ Warning: mode: all means any sink failure blocks the entire pipeline. Use only when all destinations are equally critical.

Key Concepts Demonstrated

  • Multi-Sink Fan-Out: Single source to multiple destinations
  • Format Adaptation: Different envelope per consumer requirement
  • Selective Checkpointing: required flag controls which sinks gate progress
  • Failure Isolation: Non-critical sinks don’t block the pipeline
  • Tag-Based Enrichment: Use event.tags for routing metadata

Processor Constraints: JavaScript processors can only modify event.before, event.after, and event.tags. Arbitrary top-level fields would be lost during serialization.

Event Filtering with JavaScript

This example demonstrates using JavaScript processors to filter and selectively drop events before they reach sinks.

Overview

ComponentConfiguration
SourceMySQL binlog CDC
ProcessorJavaScript filter + redaction
SinkKafka
PatternEvent filtering and transformation

Use Case

You have a MySQL database and want to:

  • Filter out events from certain tables or with specific conditions
  • Drop low-value events to reduce downstream load
  • Redact sensitive fields conditionally

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: filtered-orders
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders
        - shop.order_items
        - shop.audit_log        # We'll filter most of these out

  processors:
    - type: javascript
      id: filter-and-redact
      inline: |
        function processBatch(events) {
          return events
            // 1. Filter out audit_log events except errors
            .filter(event => {
              if (event.table === 'shop.audit_log') {
                // Only keep error-level audit events
                return event.after && event.after.level === 'error';
              }
              return true;
            })
            
            // 2. Filter out soft-deleted records
            .filter(event => {
              if (event.after && event.after.deleted_at !== null) {
                return false;  // Drop soft-deleted records
              }
              return true;
            })
            
            // 3. Filter out test/staging data
            .filter(event => {
              if (event.after && event.after.email) {
                // Drop test accounts
                if (event.after.email.endsWith('@test.local')) {
                  return false;
                }
              }
              return true;
            })
            
            // 4. Transform remaining events
            .map(event => {
              // Redact PII for non-admin tables
              if (event.after) {
                if (event.after.email) {
                  event.after.email = maskEmail(event.after.email);
                }
                if (event.after.phone) {
                  event.after.phone = '[redacted]';
                }
              }
              
              // Add filter tag (tags is a valid Event field)
              event.tags = (event.tags || []).concat(['filtered']);
              
              return event;
            });
        }
        
        // helper: mask email keeping domain
        function maskEmail(email) {
          const [local, domain] = email.split('@');
          if (!domain) return '[invalid-email]';
          const masked = local.charAt(0) + '***' + local.charAt(local.length - 1);
          return masked + '@' + domain;
        }
      limits:
        timeout_ms: 1000
        mem_mb: 128

  sinks:
    - type: kafka
      config:
        id: filtered-kafka
        brokers: ${KAFKA_BROKERS}
        topic: orders.filtered
        envelope:
          type: native
        encoding: json
        required: true

  batch:
    max_events: 500
    max_ms: 500
    respect_source_tx: true

  commit_policy:
    mode: required

Filter Patterns

Drop Events (Return Empty Array Element)

.filter(event => {
  // return false to drop the event
  if (event.table === 'internal_logs') {
    return false;
  }
  return true;
})

Conditional Field-Based Filtering

.filter(event => {
  // drop events where status is 'draft'
  if (event.after && event.after.status === 'draft') {
    return false;
  }
  return true;
})

Drop by Operation Type

.filter(event => {
  // only keep inserts and updates, drop deletes
  return event.op === 'c' || event.op === 'u';
})

Sample Events (Rate Limiting)

// keep only 10% of events (for high-volume tables)
.filter(event => {
  if (event.table === 'high_volume_table') {
    return Math.random() < 0.1;
  }
  return true;
})

Running the Example

1. Set Environment Variables

export MYSQL_DSN="mysql://user:password@localhost:3306/shop"
export KAFKA_BROKERS="localhost:9092"

2. Start DeltaForge

cargo run -p runner -- --config filtered-orders.yaml

3. Insert Test Data

-- This will be captured and transformed
INSERT INTO shop.orders (customer_email, total, status)
VALUES ('alice@example.com', 99.99, 'pending');

-- This will be filtered out (test account)
INSERT INTO shop.orders (customer_email, total, status)
VALUES ('test@test.local', 50.00, 'pending');

-- This will be filtered out (soft-deleted)
INSERT INTO shop.orders (customer_email, total, status, deleted_at)
VALUES ('bob@example.com', 75.00, 'pending', NOW());

-- Audit log - will be filtered out (not error level)
INSERT INTO shop.audit_log (level, message)
VALUES ('info', 'User logged in');

-- Audit log - will be kept (error level)
INSERT INTO shop.audit_log (level, message)
VALUES ('error', 'Payment failed');

4. Verify Filtered Output

./dev.sh k-consume orders.filtered --from-beginning

You should only see:

  • Alice’s order (with masked email: a***e@example.com)
  • The error-level audit log entry

Performance Considerations

Tip: Filtering early reduces downstream load. If you’re filtering out 50% of events, your sinks process half the data.

processors:
  - type: javascript
    id: filter
    inline: |
      function processBatch(events) {
        // Filter FIRST, then transform
        return events
          .filter(e => shouldKeep(e))  // Reduces array size
          .map(e => transform(e));      // Processes fewer events
      }
    limits:
      timeout_ms: 500    # Increase if filtering logic is complex
      mem_mb: 128        # Increase for large batches

Key Concepts Demonstrated

  • Event Filtering: Drop events before they reach sinks
  • Conditional Logic: Filter based on table, operation, or field values
  • PII Redaction: Mask sensitive data in remaining events
  • Sampling: Rate-limit high-volume event streams

Processor Constraints: JavaScript processors can only modify event.before, event.after, and event.tags. Arbitrary top-level fields like event.filtered_at would be lost during serialization.

Schema Sensing and Drift Detection

This example demonstrates DeltaForge’s automatic schema inference for JSON columns and drift detection capabilities.

Overview

ComponentConfiguration
SourcePostgreSQL logical replication
ProcessorNone
SinkKafka
FeatureSchema sensing with deep JSON inspection

Use Case

You have a PostgreSQL database with JSON/JSONB columns and want to:

  • Automatically discover the structure of JSON payloads
  • Detect when JSON schemas change over time (drift)
  • Export inferred schemas as JSON Schema for downstream validation
  • Monitor schema evolution without manual tracking

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: products-with-sensing
  tenant: acme

spec:
  source:
    type: postgres
    config:
      id: products-postgres
      dsn: ${POSTGRES_DSN}
      slot: deltaforge_products
      publication: products_pub
      tables:
        - public.products
        - public.product_variants
      start_position: earliest

  sinks:
    - type: kafka
      config:
        id: products-kafka
        brokers: ${KAFKA_BROKERS}
        topic: products.changes
        envelope:
          type: debezium
        encoding: json
        required: true

  batch:
    max_events: 500
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: required

  # Schema sensing configuration
  schema_sensing:
    enabled: true
    
    deep_inspect:
      enabled: true
      max_depth: 5           # How deep to traverse nested JSON
      max_sample_size: 500   # Events to analyze for deep inspection
    
    sampling:
      warmup_events: 100     # Analyze every event during warmup
      sample_rate: 20        # After warmup, analyze 1 in 20 events
      structure_cache: true  # Cache seen structures to avoid re-analysis
      structure_cache_size: 100
    
    tracking:
      detect_drift: true     # Enable drift detection
      drift_threshold: 0.1   # Alert if >10% of events have new fields
    
    output:
      emit_schemas: true     # Include schema info in API responses
      json_schema_format: draft-07

Table Structure

CREATE TABLE products (
  id SERIAL PRIMARY KEY,
  name TEXT NOT NULL,
  sku TEXT UNIQUE,
  price DECIMAL(10,2),
  
  -- JSON columns that schema sensing will analyze
  metadata JSONB,          -- Product attributes, tags, etc.
  specifications JSONB,    -- Technical specs
  
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE product_variants (
  id SERIAL PRIMARY KEY,
  product_id INTEGER REFERENCES products(id),
  variant_name TEXT,
  
  -- Nested JSON with variable structure
  attributes JSONB,        -- Color, size, material, etc.
  pricing JSONB,           -- Regional pricing, discounts
  
  created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Create publication
CREATE PUBLICATION products_pub FOR TABLE products, product_variants;

Sample Data

-- Insert products with JSON metadata
INSERT INTO products (name, sku, price, metadata, specifications) VALUES
(
  'Wireless Headphones',
  'WH-1000',
  299.99,
  '{
    "brand": "AudioTech",
    "category": "Electronics",
    "tags": ["wireless", "bluetooth", "noise-canceling"],
    "ratings": {"average": 4.5, "count": 1250}
  }',
  '{
    "battery_life_hours": 30,
    "driver_size_mm": 40,
    "frequency_response": {"min_hz": 20, "max_hz": 20000},
    "connectivity": ["bluetooth", "aux"]
  }'
);

-- Insert variant with nested attributes
INSERT INTO product_variants (product_id, variant_name, attributes, pricing) VALUES
(
  1,
  'Midnight Black',
  '{
    "color": {"name": "Midnight Black", "hex": "#1a1a2e"},
    "material": "Premium Plastic",
    "weight_grams": 250
  }',
  '{
    "base_price": 299.99,
    "regional": {
      "US": {"price": 299.99, "currency": "USD"},
      "EU": {"price": 279.99, "currency": "EUR"}
    },
    "discounts": [
      {"code": "SAVE20", "percent": 20, "expires": "2025-12-31"}
    ]
  }'
);

Running the Example

1. Set Environment Variables

export POSTGRES_DSN="postgres://user:password@localhost:5432/shop"
export KAFKA_BROKERS="localhost:9092"

2. Start DeltaForge

cargo run -p runner -- --config products-sensing.yaml

3. Insert Data and Let Sensing Analyze

-- Insert several products to build schema profile
INSERT INTO products (name, sku, price, metadata, specifications) VALUES
('Smart Watch', 'SW-200', 399.99, 
 '{"brand": "TechWear", "tags": ["fitness", "smart"]}',
 '{"battery_days": 7, "water_resistant": true}');

Using the Schema Sensing API

List Inferred Schemas

curl http://localhost:8080/pipelines/products-with-sensing/sensing/schemas

Response:

{
  "schemas": [
    {
      "table": "public.products",
      "columns": {
        "metadata": {
          "type": "object",
          "inferred_at": "2025-01-15T10:30:00Z",
          "sample_count": 150
        },
        "specifications": {
          "type": "object",
          "inferred_at": "2025-01-15T10:30:00Z",
          "sample_count": 150
        }
      }
    }
  ]
}

Get Detailed Schema for a Table

curl http://localhost:8080/pipelines/products-with-sensing/sensing/schemas/public.products

Response:

{
  "table": "public.products",
  "json_columns": {
    "metadata": {
      "inferred_schema": {
        "type": "object",
        "properties": {
          "brand": {"type": "string"},
          "category": {"type": "string"},
          "tags": {"type": "array", "items": {"type": "string"}},
          "ratings": {
            "type": "object",
            "properties": {
              "average": {"type": "number"},
              "count": {"type": "integer"}
            }
          }
        }
      },
      "sample_count": 150,
      "last_updated": "2025-01-15T10:35:00Z"
    }
  }
}

Export as JSON Schema

curl http://localhost:8080/pipelines/products-with-sensing/sensing/schemas/public.products/json-schema

Response:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "public.products.metadata",
  "type": "object",
  "properties": {
    "brand": {"type": "string"},
    "category": {"type": "string"},
    "tags": {
      "type": "array",
      "items": {"type": "string"}
    },
    "ratings": {
      "type": "object",
      "properties": {
        "average": {"type": "number"},
        "count": {"type": "integer"}
      }
    }
  }
}

Check Drift Detection

curl http://localhost:8080/pipelines/products-with-sensing/drift

Response:

{
  "drift_detected": true,
  "tables": {
    "public.products": {
      "metadata": {
        "new_fields": ["promotion", "seasonal"],
        "removed_fields": [],
        "type_changes": [],
        "drift_percentage": 0.15,
        "first_seen": "2025-01-15T11:00:00Z"
      }
    }
  }
}

Get Sensing Statistics

curl http://localhost:8080/pipelines/products-with-sensing/sensing/stats

Response:

{
  "total_events_analyzed": 1500,
  "total_events_sampled": 250,
  "cache_hits": 1250,
  "cache_misses": 250,
  "tables_tracked": 2,
  "json_columns_tracked": 4
}

Performance Tuning

Performance tip: Schema sensing can be CPU-intensive. Tune based on your throughput needs.

High-Throughput Configuration

schema_sensing:
  enabled: true
  deep_inspect:
    enabled: true
    max_depth: 3           # Limit depth for faster processing
    max_sample_size: 200   # Fewer samples
  sampling:
    warmup_events: 50      # Shorter warmup
    sample_rate: 100       # Analyze 1 in 100 events
    structure_cache: true
    structure_cache_size: 200  # Larger cache

Development/Debugging Configuration

schema_sensing:
  enabled: true
  deep_inspect:
    enabled: true
    max_depth: 10          # Full depth
    max_sample_size: 1000  # More samples
  sampling:
    warmup_events: 500     # Longer warmup
    sample_rate: 1         # Analyze every event

Key Concepts Demonstrated

  • Automatic Schema Inference: Discover JSON structure without manual definition
  • Deep JSON Inspection: Traverse nested objects and arrays
  • Drift Detection: Alert when schemas change unexpectedly
  • JSON Schema Export: Generate standard schemas for validation
  • Sampling Strategy: Balance accuracy vs. performance

Production Kafka Configuration

This example demonstrates a production-ready Kafka sink configuration with authentication, high availability settings, and some performance tuning.

Overview

ComponentConfiguration
SourcePostgreSQL logical replication
ProcessorNone
SinkKafka with SASL/SSL authentication
PatternProduction-grade reliability

Use Case

You’re deploying DeltaForge to production and need:

  • Secure authentication (SASL/SCRAM or mTLS)
  • High availability with proper acknowledgment settings
  • Optimal batching and compression for throughput
  • Exactly-once semantics for critical data

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-to-kafka-prod
  tenant: acme

spec:
  source:
    type: postgres
    config:
      id: orders-postgres
      dsn: ${POSTGRES_DSN}
      slot: deltaforge_orders
      publication: orders_pub
      tables:
        - public.orders
        - public.order_items
        - public.payments
      start_position: earliest

  sinks:
    - type: kafka
      config:
        id: orders-kafka
        brokers: ${KAFKA_BROKERS}
        topic: orders.cdc.events
        envelope:
          type: debezium
        encoding: json
        required: true
        
        # enable exactly-once semantics
        exactly_once: true
        
        # timeout for individual sends
        send_timeout_secs: 60
        
        # librdkafka client configuration
        client_conf:
          # security - SASL/SCRAM authentication
          security.protocol: "SASL_SSL"
          sasl.mechanism: "SCRAM-SHA-512"
          sasl.username: "${KAFKA_USERNAME}"
          sasl.password: "${KAFKA_PASSWORD}"
          
          # SSL/TLS configuration
          ssl.ca.location: "/etc/ssl/certs/kafka-ca.pem"
          ssl.endpoint.identification.algorithm: "https"
          
          # reliability - wait for all replicas
          acks: "all"
          
          # idempotence (required for exactly-once)
          enable.idempotence: "true"
          
          # retries and timeouts
          retries: "2147483647"
          retry.backoff.ms: "100"
          delivery.timeout.ms: "300000"
          request.timeout.ms: "30000"
          
          # kafka batching for throughput
          batch.size: "65536"
          linger.ms: "10"
          
          # compression
          compression.type: "lz4"
          
          # buffer management
          queue.buffering.max.messages: "100000"
          queue.buffering.max.kbytes: "1048576"

  batch:
    max_events: 1000
    max_bytes: 1048576
    max_ms: 100
    respect_source_tx: true
    max_inflight: 4

  commit_policy:
    mode: required

Security Configurations

SASL/SCRAM (Username/Password)

client_conf:
  security.protocol: "SASL_SSL"
  sasl.mechanism: "SCRAM-SHA-512"
  sasl.username: "${KAFKA_USERNAME}"
  sasl.password: "${KAFKA_PASSWORD}"
  ssl.ca.location: "/etc/ssl/certs/ca.pem"

mTLS (Mutual TLS)

client_conf:
  security.protocol: "SSL"
  ssl.ca.location: "/etc/ssl/certs/kafka-ca.pem"
  ssl.certificate.location: "/etc/ssl/certs/client.pem"
  ssl.key.location: "/etc/ssl/private/client.key"
  ssl.key.password: "${SSL_KEY_PASSWORD}"

AWS MSK with IAM

client_conf:
  security.protocol: "SASL_SSL"
  sasl.mechanism: "AWS_MSK_IAM"
  sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required;"
  sasl.client.callback.handler.class: "software.amazon.msk.auth.iam.IAMClientCallbackHandler"

Confluent Cloud

client_conf:
  security.protocol: "SASL_SSL"
  sasl.mechanism: "PLAIN"
  sasl.username: "${CONFLUENT_API_KEY}"
  sasl.password: "${CONFLUENT_API_SECRET}"

Performance Tuning

High Throughput

Optimize for maximum events per second:

client_conf:
  acks: "1"                    # Leader-only ack (faster, less safe)
  batch.size: "131072"         # 128KB batches
  linger.ms: "50"              # Wait longer to fill batches
  compression.type: "lz4"      # Fast compression
  
batch:
  max_events: 5000             # Larger batches
  max_ms: 200                  # More time to accumulate
  max_inflight: 8              # More concurrent requests

Low Latency

Optimize for minimal delay:

client_conf:
  acks: "1"                    # Don't wait for replicas
  batch.size: "16384"          # Smaller batches
  linger.ms: "0"               # Send immediately
  compression.type: "none"     # Skip compression
  
batch:
  max_events: 100              # Smaller batches
  max_ms: 10                   # Flush quickly
  max_inflight: 2              # Limit in-flight

Maximum Durability

Optimize for zero data loss:

client_conf:
  acks: "all"                  # All replicas must ack
  enable.idempotence: "true"   # Prevent duplicates
  max.in.flight.requests.per.connection: "5"  # Required for idempotence
  retries: "2147483647"        # Infinite retries
  
exactly_once: true             # Transactional producer

batch:
  respect_source_tx: true      # Preserve source transactions
  max_inflight: 1              # Strict ordering

Running the Example

1. Set Environment Variables

export POSTGRES_DSN="postgres://user:password@localhost:5432/orders"
export KAFKA_BROKERS="kafka1:9093,kafka2:9093,kafka3:9093"
export KAFKA_USERNAME="deltaforge"
export KAFKA_PASSWORD="secret"

2. Create Kafka Topic

kafka-topics.sh --create \
  --topic orders.cdc.events \
  --partitions 12 \
  --replication-factor 3 \
  --config min.insync.replicas=2 \
  --bootstrap-server ${KAFKA_BROKERS}

3. Start DeltaForge

cargo run -p runner --release -- --config kafka-prod.yaml

Monitoring

Key Metrics to Watch

  • deltaforge_sink_events_sent_total — Events delivered
  • deltaforge_sink_send_latency_seconds — Delivery latency
  • deltaforge_sink_errors_total — Delivery failures
  • deltaforge_checkpoint_lag_events — Events pending checkpoint

Health Check

curl http://localhost:8080/health

Pipeline Status

curl http://localhost:8080/pipelines/orders-to-kafka-prod

Key Concepts Demonstrated

  • SASL/SSL Authentication: Secure broker connections
  • Exactly-Once Semantics: Transactional producer for no duplicates
  • Acknowledgment Modes: Trade-off between durability and latency
  • Batching & Compression: Optimize throughput
  • Production Tuning: Real-world configuration patterns

Redis Cache Invalidation

This example demonstrates streaming database changes to a Redis stream where a worker can consume them to invalidate cache entries, ensuring cache consistency without polling.

Overview

ComponentConfiguration
SourceMySQL binlog CDC
ProcessorJavaScript cache key generator
SinkRedis Streams
PatternCDC-driven cache invalidation

Use Case

You have a MySQL database with Redis caching and want to:

  • Stream change events that trigger cache invalidation
  • Avoid stale cache issues without TTL-based expiration
  • Generate cache keys matching your application’s key format

Architecture

┌─────────┐     ┌─────────────┐     ┌────────────────┐     ┌─────────┐
│  MySQL  │────>│ DeltaForge  │────>│ Redis Stream   │────>│ Worker  │
│         │     │             │     │ (invalidations)│     │         │
└─────────┘     └─────────────┘     └────────────────┘     └────┬────┘
                                                                │
                                    ┌───────────────┐           │
                                    │  Redis Cache  │<──────────┘
                                    │  (DEL keys)   │
                                    └───────────────┘

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: cache-invalidation
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: app-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - app.users
        - app.products
        - app.orders
        - app.inventory

  processors:
    - type: javascript
      id: generate-cache-keys
      inline: |
        function processBatch(events) {
          return events.map(event => {
            const keys = generateCacheKeys(event);
            const strategy = getStrategy(event);
            
            // Store cache keys in event.after metadata
            // (we can modify event.after since it's a JSON Value)
            if (event.after) {
              event.after._cache_keys = keys;
              event.after._invalidation_strategy = strategy;
            } else if (event.before) {
              // For deletes, add to before
              event.before._cache_keys = keys;
              event.before._invalidation_strategy = strategy;
            }
            
            // Add tags for routing/filtering
            event.tags = (event.tags || []).concat([
              'cache:invalidate',
              `strategy:${strategy}`,
              `keys:${keys.length}`
            ]);
            
            return event;
          });
        }
        
        function generateCacheKeys(event) {
          const table = event.table.split('.')[1];
          const keys = [];
          const record = event.after || event.before;
          
          if (!record || !record.id) return keys;
          const id = record.id;
          
          switch (table) {
            case 'users':
              keys.push(`user:${id}`);
              if (record.email) {
                keys.push(`user:email:${record.email}`);
              }
              // If email changed, invalidate old email key
              if (event.before && event.before.email && 
                  event.before.email !== record.email) {
                keys.push(`user:email:${event.before.email}`);
              }
              keys.push(`user:${id}:orders`);
              keys.push(`user:${id}:profile`);
              break;
              
            case 'products':
              keys.push(`product:${id}`);
              keys.push(`product:${id}:details`);
              if (record.category_id) {
                keys.push(`category:${record.category_id}:products`);
              }
              break;
              
            case 'orders':
              keys.push(`order:${id}`);
              if (record.user_id) {
                keys.push(`user:${record.user_id}:orders`);
              }
              break;
              
            case 'inventory':
              keys.push(`inventory:${record.product_id}`);
              keys.push(`product:${record.product_id}:stock`);
              break;
          }
          
          return keys;
        }
        
        function getStrategy(event) {
          const table = event.table.split('.')[1];
          if (table === 'inventory') return 'immediate';
          if (event.op === 'd') return 'immediate';
          return 'batched';
        }
      limits:
        timeout_ms: 500
        mem_mb: 128

  sinks:
    - type: redis
      config:
        id: invalidation-stream
        uri: ${REDIS_URI}
        stream: cache:invalidations
        envelope:
          type: native
        encoding: json
        required: true

  batch:
    max_events: 100
    max_ms: 100
    respect_source_tx: true

  commit_policy:
    mode: required

JavaScript Processor Constraints

Important: The processor stores cache keys in event.after._cache_keys (or event.before for deletes) because we can only modify existing Event fields. Arbitrary top-level fields like event.cache_keys would be lost.

Cache Worker (Consumer)

// cache-invalidation-worker.js
const Redis = require('ioredis');

const redis = new Redis(process.env.REDIS_URI);
const streamKey = 'cache:invalidations';
const consumerGroup = 'cache-workers';
const consumerId = `worker-${process.pid}`;

async function processInvalidations() {
  try {
    await redis.xgroup('CREATE', streamKey, consumerGroup, '0', 'MKSTREAM');
  } catch (e) {
    if (!e.message.includes('BUSYGROUP')) throw e;
  }

  console.log(`Starting cache invalidation worker: ${consumerId}`);

  while (true) {
    try {
      const results = await redis.xreadgroup(
        'GROUP', consumerGroup, consumerId,
        'COUNT', 100, 'BLOCK', 5000,
        'STREAMS', streamKey, '>'
      );

      if (!results) continue;

      for (const [stream, messages] of results) {
        for (const [id, fields] of messages) {
          const event = JSON.parse(fields[1]);
          await invalidateKeys(event);
          await redis.xack(streamKey, consumerGroup, id);
        }
      }
    } catch (error) {
      console.error('Worker error:', error);
      await new Promise(r => setTimeout(r, 1000));
    }
  }
}

async function invalidateKeys(event) {
  // Get cache keys from the event payload
  const record = event.after || event.before;
  const cacheKeys = record?._cache_keys || [];
  
  if (cacheKeys.length === 0) return;

  for (const key of cacheKeys) {
    if (key.includes('*')) {
      await scanAndDelete(key);
    } else {
      await redis.del(key);
    }
  }

  console.log(`Invalidated ${cacheKeys.length} keys for ${event.table}`);
}

async function scanAndDelete(pattern) {
  let cursor = '0';
  do {
    const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100);
    cursor = nextCursor;
    if (keys.length > 0) {
      await redis.del(...keys);
    }
  } while (cursor !== '0');
}

processInvalidations().catch(console.error);

Running the Example

1. Set Environment Variables

export MYSQL_DSN="mysql://user:password@localhost:3306/app"
export REDIS_URI="redis://localhost:6379"

2. Start DeltaForge

cargo run -p runner -- --config cache-invalidation.yaml

3. Start Worker(s)

node cache-invalidation-worker.js

4. Test Invalidation

UPDATE app.users SET email = 'alice.new@example.com' WHERE id = 1;

Worker output:

Invalidated 5 keys for app.users

Key Concepts Demonstrated

  • CDC to Stream: DeltaForge captures changes and writes to Redis Streams
  • Custom Key Generation: Processor computes cache keys for downstream worker
  • Consumer Groups: Scalable worker processing with acknowledgments
  • Before/After Diffing: Compute invalidation keys for both old and new values

Note: DeltaForge streams events to Redis; a separate worker (shown above) consumes them and performs the actual cache invalidation.

Audit Trail and Compliance Logging

This example demonstrates building an immutable audit trail for compliance requirements (SOC2, HIPAA, GDPR) by capturing all database changes with full context.

Overview

ComponentConfiguration
SourcePostgreSQL logical replication
ProcessorJavaScript audit enrichment
SinkKafka (durable audit log)
PatternCompliance-grade change tracking

Use Case

You need to meet compliance requirements and want to:

  • Capture every change to sensitive tables with before/after values
  • Add audit metadata (classification, retention, regulations)
  • Redact sensitive fields while preserving change records
  • Store immutable audit logs for required retention periods

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: compliance-audit-trail
  tenant: acme

spec:
  source:
    type: postgres
    config:
      id: app-postgres
      dsn: ${POSTGRES_DSN}
      slot: deltaforge_audit
      publication: audit_pub
      tables:
        - public.users
        - public.user_profiles
        - public.payment_methods
        - public.transactions
        - public.roles
        - public.permissions
        - public.user_roles
      start_position: earliest

  processors:
    - type: javascript
      id: audit-enrichment
      inline: |
        function processBatch(events) {
          return events.map(event => {
            const table = event.table.split('.')[1];
            
            // Build audit tags (event.tags is a valid Event field)
            const auditTags = [
              'audited',
              `sensitivity:${classifySensitivity(table)}`,
              `classification:${getDataClassification(table)}`,
              `retention:${getRetentionPeriod(table)}d`
            ];
            
            // Add regulation tags
            for (const reg of getApplicableRegulations(table)) {
              auditTags.push(`regulation:${reg}`);
            }
            
            // Track changed fields for updates
            if (event.before && event.after) {
              const changed = detectChangedFields(event.before, event.after);
              for (const field of changed) {
                auditTags.push(`changed:${field}`);
              }
            }
            
            event.tags = (event.tags || []).concat(auditTags);
            
            // Redact sensitive fields in before/after
            if (event.before) {
              event.before = sanitizeRecord(event.before, table);
            }
            if (event.after) {
              event.after = sanitizeRecord(event.after, table);
            }
            
            return event;
          });
        }
        
        function classifySensitivity(table) {
          const highSensitivity = ['users', 'payment_methods', 'transactions'];
          const mediumSensitivity = ['user_profiles', 'user_roles'];
          
          if (highSensitivity.includes(table)) return 'HIGH';
          if (mediumSensitivity.includes(table)) return 'MEDIUM';
          return 'LOW';
        }
        
        function sanitizeRecord(record, table) {
          if (!record) return null;
          const sanitized = { ...record };
          
          const sensitiveFields = {
            'users': ['password_hash', 'ssn', 'tax_id'],
            'payment_methods': ['card_number', 'cvv', 'account_number'],
            'user_profiles': ['date_of_birth']
          };
          
          const fieldsToMask = sensitiveFields[table] || [];
          
          for (const field of fieldsToMask) {
            if (sanitized[field] !== undefined) {
              sanitized[`_${field}_redacted`] = true;
              sanitized[field] = '[REDACTED]';
            }
          }
          
          return sanitized;
        }
        
        function detectChangedFields(before, after) {
          const changed = [];
          const allKeys = new Set([...Object.keys(before), ...Object.keys(after)]);
          for (const key of allKeys) {
            if (JSON.stringify(before[key]) !== JSON.stringify(after[key])) {
              changed.push(key);
            }
          }
          return changed;
        }
        
        function getRetentionPeriod(table) {
          if (['transactions', 'payment_methods'].includes(table)) return 2555;
          if (['users', 'user_profiles'].includes(table)) return 2190;
          return 1095;
        }
        
        function getDataClassification(table) {
          if (['payment_methods', 'transactions'].includes(table)) return 'PCI';
          if (['users', 'user_profiles'].includes(table)) return 'PII';
          return 'INTERNAL';
        }
        
        function getApplicableRegulations(table) {
          const regs = [];
          if (['users', 'user_profiles'].includes(table)) regs.push('GDPR', 'CCPA');
          if (['payment_methods', 'transactions'].includes(table)) regs.push('PCI-DSS', 'SOX');
          return regs;
        }
      limits:
        timeout_ms: 1000
        mem_mb: 256

  sinks:
    - type: kafka
      config:
        id: audit-kafka
        brokers: ${KAFKA_BROKERS}
        topic: audit.trail.events
        envelope:
          type: debezium
        encoding: json
        required: true
        exactly_once: true
        client_conf:
          acks: "all"
          enable.idempotence: "true"
          compression.type: "gzip"

  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: required

JavaScript Processor Constraints

Important: The JavaScript processor can only modify fields that exist on DeltaForge’s Event struct. You can:

  • Modify event.before and event.after values (JSON objects)
  • Set event.tags (array of strings)
  • Filter out events (return empty array)

You cannot add arbitrary top-level fields like event.audit or event.metadata - they will be lost during serialization. This limitation will be addressed soon.

This example uses event.tags to store audit metadata as key:value strings that downstream systems can parse.

PostgreSQL Setup

-- Create publication for audited tables
CREATE PUBLICATION audit_pub FOR TABLE 
  public.users,
  public.user_profiles,
  public.payment_methods,
  public.transactions,
  public.roles,
  public.permissions,
  public.user_roles
WITH (publish = 'insert, update, delete');

-- Enable REPLICA IDENTITY FULL to capture before values on updates
ALTER TABLE public.users REPLICA IDENTITY FULL;
ALTER TABLE public.user_profiles REPLICA IDENTITY FULL;
ALTER TABLE public.payment_methods REPLICA IDENTITY FULL;
ALTER TABLE public.transactions REPLICA IDENTITY FULL;

Sample Audit Event Output

With Debezium envelope:

{
  "payload": {
    "before": {
      "id": 42,
      "email": "alice@old-domain.com",
      "name": "Alice Smith",
      "password_hash": "[REDACTED]",
      "_password_hash_redacted": true
    },
    "after": {
      "id": 42,
      "email": "alice@new-domain.com",
      "name": "Alice Smith",
      "password_hash": "[REDACTED]",
      "_password_hash_redacted": true
    },
    "source": {
      "version": "0.1.0",
      "connector": "postgres",
      "name": "app-postgres",
      "ts_ms": 1705312199123,
      "db": "app",
      "schema": "public",
      "table": "users"
    },
    "op": "u",
    "ts_ms": 1705312200000,
    "tags": [
      "audited",
      "sensitivity:HIGH",
      "classification:PII",
      "retention:2190d",
      "regulation:GDPR",
      "regulation:CCPA",
      "changed:email"
    ]
  }
}

Parsing Audit Tags

Downstream consumers can parse the structured tags:

// Parse audit tags into an object
function parseAuditTags(tags) {
  const audit = {};
  for (const tag of tags || []) {
    if (tag.includes(':')) {
      const [key, value] = tag.split(':', 2);
      if (audit[key]) {
        if (!Array.isArray(audit[key])) {
          audit[key] = [audit[key]];
        }
        audit[key].push(value);
      } else {
        audit[key] = value;
      }
    } else {
      audit[tag] = true;
    }
  }
  return audit;
}

// Example: parseAuditTags(event.tags)
// Returns: {
//   audited: true,
//   sensitivity: "HIGH",
//   classification: "PII",
//   retention: "2190d",
//   regulation: ["GDPR", "CCPA"],
//   changed: "email"
// }

Kafka Topic Configuration

kafka-topics.sh --create \
  --topic audit.trail.events \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=189216000000 \
  --config cleanup.policy=delete \
  --config min.insync.replicas=2 \
  --config compression.type=gzip \
  --bootstrap-server ${KAFKA_BROKERS}

Running the Example

1. Set Environment Variables

export POSTGRES_DSN="postgres://user:password@localhost:5432/app"
export KAFKA_BROKERS="kafka1:9092,kafka2:9092,kafka3:9092"

2. Start DeltaForge

cargo run -p runner -- --config audit-trail.yaml

3. Verify Audit Events

./dev.sh k-consume audit.trail.events --from-beginning

Querying Audit Data

-- Find high-sensitivity changes
SELECT * FROM audit_events 
WHERE ARRAY_CONTAINS(payload.tags, 'sensitivity:HIGH');

-- Find all email changes
SELECT * FROM audit_events
WHERE ARRAY_CONTAINS(payload.tags, 'changed:email');

-- Find PCI-regulated changes
SELECT * FROM audit_events
WHERE ARRAY_CONTAINS(payload.tags, 'regulation:PCI-DSS');

Key Concepts Demonstrated

  • Full Change Capture: Before and after values with REPLICA IDENTITY FULL
  • PII Redaction: Sensitive fields masked, presence tracked via _field_redacted
  • Tag-Based Metadata: Audit info stored in event.tags as parseable strings
  • Immutable Storage: Exactly-once delivery to append-only Kafka log
  • Compliance Tagging: Retention periods, classifications, regulations as tags

Real-Time Analytics Preprocessing Pipeline

This example demonstrates preparing CDC events for real-time analytics by enriching events with dimensions, metrics, and routing tags (pre-processing).

Overview

ComponentConfiguration
SourceMySQL binlog CDC
ProcessorJavaScript analytics enrichment
SinksKafka (stream processing) + Redis (real-time counters)
PatternAnalytics-ready event preparation

Use Case

You have an e-commerce MySQL database and want to:

  • Stream order events to real-time dashboards
  • Feed a worker that maintains live counters in Redis
  • Prepare events for stream processing (Flink, Spark Streaming)

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: ecommerce-analytics
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: ecommerce-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders
        - shop.order_items
        - shop.payments
        - shop.cart_events

  processors:
    - type: javascript
      id: analytics-enrichment
      inline: |
        function processBatch(events) {
          return events.map(event => {
            const table = event.table.split('.')[1];
            const record = event.after || event.before;
            
            // Add analytics metadata to event.after
            // (we can modify event.after since it's a JSON Value)
            if (event.after) {
              event.after._analytics = {
                event_type: `${table}.${mapOperation(event.op)}`,
                dimensions: extractDimensions(table, record, event),
                metrics: extractMetrics(table, record)
              };
            }
            
            // Add routing tags
            event.tags = generateTags(table, record, event);
            
            return event;
          });
        }
        
        function mapOperation(op) {
          return { 'c': 'created', 'u': 'updated', 'd': 'deleted', 'r': 'snapshot' }[op] || op;
        }
        
        function extractDimensions(table, record, event) {
          if (!record) return {};
          
          const dims = {
            hour_of_day: new Date(event.timestamp).getUTCHours(),
            day_of_week: new Date(event.timestamp).getUTCDay()
          };
          
          switch (table) {
            case 'orders':
              dims.customer_id = record.customer_id;
              dims.status = record.status;
              dims.channel = record.channel || 'web';
              break;
            case 'order_items':
              dims.order_id = record.order_id;
              dims.product_id = record.product_id;
              break;
            case 'payments':
              dims.order_id = record.order_id;
              dims.payment_method = record.method;
              break;
          }
          
          return dims;
        }
        
        function extractMetrics(table, record) {
          if (!record) return {};
          
          const metrics = { event_count: 1 };
          
          switch (table) {
            case 'orders':
              metrics.order_total = parseFloat(record.total) || 0;
              metrics.item_count = parseInt(record.item_count) || 0;
              break;
            case 'order_items':
              metrics.quantity = parseInt(record.quantity) || 1;
              metrics.line_total = parseFloat(record.line_total) || 0;
              break;
            case 'payments':
              metrics.payment_amount = parseFloat(record.amount) || 0;
              break;
          }
          
          return metrics;
        }
        
        function generateTags(table, record, event) {
          const tags = [table, event.op];
          
          if (!record) return tags;
          
          if (table === 'orders' && record.total > 500) {
            tags.push('high_value');
          }
          if (record.status) {
            tags.push(`status:${record.status}`);
          }
          
          return tags;
        }
      limits:
        timeout_ms: 500
        mem_mb: 256

  sinks:
    - type: kafka
      config:
        id: analytics-kafka
        brokers: ${KAFKA_BROKERS}
        topic: analytics.events
        envelope:
          type: native
        encoding: json
        required: true
        client_conf:
          compression.type: "lz4"

    - type: redis
      config:
        id: realtime-redis
        uri: ${REDIS_URI}
        stream: analytics:realtime
        envelope:
          type: native
        encoding: json
        required: false

  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 100
    respect_source_tx: false

  commit_policy:
    mode: required

JavaScript Processor Constraints

Important: Analytics metadata is stored in event.after._analytics because the processor can only modify existing Event fields (before, after, tags). Arbitrary top-level fields would be lost during serialization.

Sample Event Output

{
  "before": null,
  "after": {
    "id": 98765,
    "customer_id": 12345,
    "total": 299.99,
    "status": "pending",
    "channel": "mobile",
    "_analytics": {
      "event_type": "orders.created",
      "dimensions": {
        "hour_of_day": 10,
        "day_of_week": 3,
        "customer_id": 12345,
        "status": "pending",
        "channel": "mobile"
      },
      "metrics": {
        "event_count": 1,
        "order_total": 299.99,
        "item_count": 3
      }
    }
  },
  "source": {
    "connector": "mysql",
    "db": "shop",
    "table": "orders"
  },
  "op": "c",
  "ts_ms": 1705312200000,
  "tags": ["orders", "c", "status:pending"]
}

Redis Counter Worker

const Redis = require('ioredis');
const redis = new Redis(process.env.REDIS_URI);

async function processAnalyticsEvents() {
  let lastId = '0';
  
  while (true) {
    const results = await redis.xread(
      'COUNT', 100, 'BLOCK', 1000,
      'STREAMS', 'analytics:realtime', lastId
    );
    
    if (!results) continue;
    
    for (const [stream, messages] of results) {
      for (const [id, fields] of messages) {
        const event = JSON.parse(fields[1]);
        await updateCounters(event);
        lastId = id;
      }
    }
  }
}

async function updateCounters(event) {
  const pipe = redis.pipeline();
  const now = new Date();
  const hourKey = `${now.getUTCFullYear()}:${now.getUTCMonth()+1}:${now.getUTCDate()}:${now.getUTCHours()}`;
  const dayKey = `${now.getUTCFullYear()}:${now.getUTCMonth()+1}:${now.getUTCDate()}`;
  
  // Get analytics from event.after
  const analytics = event.after?._analytics || {};
  const eventType = analytics.event_type || `${event.table}.${event.op}`;
  const metrics = analytics.metrics || {};
  const dimensions = analytics.dimensions || {};
  
  // Event counts
  pipe.hincrby(`stats:events:${hourKey}`, eventType, 1);
  
  // Order-specific counters
  if (eventType === 'orders.created') {
    pipe.incr(`stats:orders:count:${hourKey}`);
    pipe.incrbyfloat(`stats:orders:revenue:${hourKey}`, metrics.order_total || 0);
    
    if (dimensions.channel) {
      pipe.hincrby(`stats:orders:channel:${dayKey}`, dimensions.channel, 1);
    }
  }
  
  // High-value alerts
  if (event.tags?.includes('high_value')) {
    pipe.lpush('alerts:high_value_orders', JSON.stringify({
      order_id: event.after?.id,
      total: metrics.order_total
    }));
    pipe.ltrim('alerts:high_value_orders', 0, 99);
  }
  
  pipe.expire(`stats:events:${hourKey}`, 90000);
  await pipe.exec();
}

processAnalyticsEvents().catch(console.error);

Running the Example

1. Set Environment Variables

export MYSQL_DSN="mysql://user:password@localhost:3306/shop"
export KAFKA_BROKERS="localhost:9092"
export REDIS_URI="redis://localhost:6379"

2. Create Kafka Topic

./dev.sh k-create analytics.events 12

3. Start DeltaForge

cargo run -p runner -- --config analytics-pipeline.yaml

4. Start Counter Worker

node realtime-counter-worker.js

Key Concepts Demonstrated

  • Event Enrichment: Add dimensions/metrics in event.after._analytics
  • Tag-Based Filtering: Use event.tags for high-value order detection
  • Multi-Sink Fan-Out: Kafka for stream processing + Redis for worker consumption
  • Worker Pattern: Separate worker consumes Redis stream to update counters

Outbox Pattern

This example demonstrates the transactional outbox pattern - writing business data and a domain event in the same database transaction, then streaming the event to Kafka via DeltaForge.

Overview

ComponentConfiguration
SourceMySQL binlog CDC
ProcessorOutbox (extract + route)
SinkKafka (per-aggregate topics)
PatternTransactional outbox with raw payload delivery

Use Case

Your application writes orders and needs to publish OrderCreated, OrderShipped, etc. events to Kafka. You want:

  • Atomicity: event published if and only if the transaction commits
  • Clean payloads: consumers receive the application’s JSON, not CDC envelopes
  • Per-aggregate routing: events land on Order.OrderCreated, Payment.PaymentReceived, etc.
  • Zero polling: DeltaForge tails the binlog, no application-side outbox relay

Database Setup

-- Business table
CREATE TABLE orders (
  id         INT AUTO_INCREMENT PRIMARY KEY,
  customer   VARCHAR(64),
  total      DECIMAL(10,2),
  status     VARCHAR(32),
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Outbox table (BLACKHOLE = binlog only, no disk storage)
CREATE TABLE outbox (
  id             INT AUTO_INCREMENT PRIMARY KEY,
  aggregate_type VARCHAR(64),
  aggregate_id   VARCHAR(64),
  event_type     VARCHAR(64),
  payload        JSON
) ENGINE=BLACKHOLE;

Application Code

Write both in the same transaction:

BEGIN;

INSERT INTO orders (customer, total, status)
VALUES ('alice', 149.99, 'pending');

INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES (
  'Order',
  LAST_INSERT_ID(),
  'OrderCreated',
  JSON_OBJECT('customer', 'alice', 'total', 149.99, 'status', 'pending')
);

COMMIT;

If the transaction rolls back, neither the order nor the event exist. If it commits, both do.

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-outbox
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders
        - shop.outbox
      outbox:
        tables: ["shop.outbox"]

  processors:
    - type: outbox
      topic: "${aggregate_type}.${event_type}"
      default_topic: events.unrouted
      raw_payload: true

  sinks:
    - type: kafka
      config:
        id: events-kafka
        brokers: ${KAFKA_BROKERS}
        topic: "cdc.${source.db}.${source.table}"
        envelope:
          type: debezium
        encoding: json
        required: true

  batch:
    max_events: 500
    max_ms: 500
    respect_source_tx: true

  commit_policy:
    mode: required

What lands on Kafka

Outbox event –> topic Order.OrderCreated:

{"customer": "alice", "total": 149.99, "status": "pending"}

Raw payload, no envelope. Kafka headers carry metadata: df-aggregate-type: Order, df-aggregate-id: 1, df-event-type: OrderCreated.

CDC event –> topic cdc.shop.orders (from sink’s topic template):

{
  "payload": {
    "before": null,
    "after": {"id": 1, "customer": "alice", "total": 149.99, "status": "pending"},
    "source": {"connector": "mysql", "db": "shop", "table": "orders"},
    "op": "c",
    "ts_ms": 1700000000000
  }
}

Full Debezium envelope. Both flow through the same pipeline - the outbox processor only touches events tagged as outbox.

Running the Example

1. Start Infrastructure

./dev.sh up
./dev.sh k-create Order.OrderCreated 3
./dev.sh k-create Order.OrderShipped 3
./dev.sh k-create cdc.shop.orders 3

2. Set Environment Variables

export MYSQL_DSN="mysql://deltaforge:dfpw@localhost:3306/shop"
export KAFKA_BROKERS="localhost:9092"

3. Start DeltaForge

cargo run -p runner -- --config outbox.yaml

4. Insert Test Data

BEGIN;
INSERT INTO shop.orders (customer, total, status) VALUES ('alice', 149.99, 'pending');
INSERT INTO shop.outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES ('Order', LAST_INSERT_ID(), 'OrderCreated',
        '{"customer":"alice","total":149.99,"status":"pending"}');
COMMIT;

5. Verify

# Outbox event (raw payload, per-aggregate topic)
./dev.sh k-inspect Order.OrderCreated

# CDC event (Debezium envelope, per-table topic)
./dev.sh k-inspect cdc.shop.orders

Variations

PostgreSQL with WAL Messages

PostgreSQL doesn’t need an outbox table - write directly to the WAL:

BEGIN;
INSERT INTO orders (customer, total, status) VALUES ('alice', 149.99, 'pending');
SELECT pg_logical_emit_message(
  true, 'outbox',
  '{"aggregate_type":"Order","aggregate_id":"1","event_type":"OrderCreated","payload":{"customer":"alice","total":149.99}}'
);
COMMIT;
source:
  type: postgres
  config:
    id: orders-pg
    dsn: ${POSTGRES_DSN}
    slot: deltaforge_orders
    publication: orders_pub
    tables: [public.orders]
    outbox:
      prefixes: [outbox]

No table, no index, no vacuum - just a WAL entry.

Multiple Outbox Channels

Route order events and payment events to different topic hierarchies:

processors:
  - type: outbox
    tables: [orders_outbox]
    topic: "orders.${event_type}"
    raw_payload: true

  - type: outbox
    tables: [payments_outbox]
    topic: "payments.${event_type}"
    raw_payload: true
    columns:
      payload: data

Additional Headers for Tracing

Forward trace and correlation IDs as Kafka headers:

processors:
  - type: outbox
    topic: "${aggregate_type}.${event_type}"
    raw_payload: true
    additional_headers:
      x-trace-id: trace_id
      x-correlation-id: correlation_id
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload, trace_id, correlation_id)
VALUES ('Order', '42', 'OrderCreated', '{"total":99.99}', 'abc-123', 'req-456');

Migrating from Debezium

If your outbox table uses Debezium’s column names (aggregatetype, aggregateid, type):

processors:
  - type: outbox
    topic: "${aggregatetype}.${type}"
    raw_payload: true
    columns:
      aggregate_type: aggregatetype
      aggregate_id: aggregateid
      event_type: type
    additional_headers:
      x-trace-id: traceid
      x-tenant: tenant

Column mappings control header extraction. The topic template uses raw column names directly.

Key Concepts Demonstrated

  • Transactional outbox: atomicity without distributed transactions
  • BLACKHOLE engine: binlog-only storage for zero-cost outbox tables on MySQL
  • Raw payload delivery: raw_payload: true bypasses envelope wrapping - consumers get exactly what the application wrote
  • Mixed pipeline: outbox events and CDC events coexist in the same pipeline with different serialization
  • Per-aggregate routing: topic template routes events by aggregate_type and event_type

Troubleshooting

Common issues and quick checks when running DeltaForge.

  • 🩺 Health-first: start with /healthz and /readyz to pinpoint failing components.

Runner fails to start

  • Confirm the config path passed to --config exists and is readable.
  • Validate YAML syntax and that required fields like metadata.name and spec.source are present.
  • Ensure environment variables referenced in the spec are set (dsn, brokers, uri, etc.).

Pipelines remain unready

  • Check the /readyz endpoint for per-pipeline status and error messages.
  • Verify upstream credentials allow replication (MySQL binlog). Other engines are experimental unless explicitly documented.
  • Inspect sink connectivity; a required sink that cannot connect will block checkpoints.

Slow throughput

  • Increase batch.max_events or batch.max_bytes to reduce flush frequency.
  • Adjust max_inflight to allow more concurrent batches if sinks can handle parallelism.
  • Reduce processor work or add guardrails (limits) to prevent slow JavaScript from stalling the pipeline.

Checkpoints not advancing

  • Review the commit policy: mode: all or required sinks that are unavailable will block progress.
  • Look for sink-specific errors (for example, Kafka broker unreachability or Redis backpressure).
  • Pause and resume the pipeline to force a clean restart after addressing the underlying issue.

DeltaForge

Development Guide

Use this guide to build, test, and extend DeltaForge. It covers local workflows, optional dependency containers, and how to work with Docker images.

All contributions are welcome and highly appreciated.

Local prerequisites

  • Rust toolchain 1.89+ (install via rustup).
  • Optional: Docker or Podman for running the dev dependency stack and the container image.

Workspace layout

  • crates/deltaforge-core : shared event model, pipeline engine, and checkpointing primitives.
  • crates/deltaforge-config : YAML config parsing, environment variable expansion, and pipeline spec types.
  • crates/sources : database CDC readers (MySQL binlog, Postgres logical replication) implemented as pluggable sources.
  • crates/processors : JavaScript-based processors and support code for transforming batches.
  • crates/sinks : sink implementations (Kafka producer, Redis streams, NATS JetStream) plus sink utilities.
  • crates/rest-api : HTTP control plane with health/readiness and pipeline lifecycle endpoints.
  • crates/runner : CLI entrypoint that wires the runtime, metrics, and control plane together.

Use these crate boundaries as reference points when adding new sources, sinks, or pipeline behaviors.

Start dev dependencies

Bring up the optional backing services (MySQL, Kafka, Redis) with Docker Compose:

docker compose -f docker-compose.dev.yml up -d

Each service is exposed on localhost for local runs (5432, 3306, 9092, 6379). The MySQL container seeds demo data from ./init-scripts and configures binlog settings required for CDC.

Prefer the convenience dev.sh wrapper to keep common tasks consistent:

./dev.sh up     # start the dependency stack
./dev.sh down   # stop and remove it
./dev.sh ps     # see container status

Build and test locally

Run the usual Rust workflow from the repo root:

cargo fmt --all
cargo clippy --workspace --all-targets --all-features
cargo test --workspace

Or use the helper script for a single command that mirrors CI expectations:

./dev.sh build         # build project (debug)
./dev.sh build-release # build project (release)
./dev.sh run           # run with examples/dev.yaml
./dev.sh fmt           # format code
./dev.sh lint          # clippy with warnings as errors
./dev.sh test          # full test suite
./dev.sh check         # fmt --check + clippy + tests (mirrors CI)
./dev.sh cov           # generate coverage report

Docker images

Use pre-built images

Multi-arch images (amd64/arm64) are published to GHCR and Docker Hub:

# Minimal (~57MB, scratch-based, no shell)
docker pull ghcr.io/vnvo/deltaforge:latest
docker pull vnvohub/deltaforge:latest

# Debug (~140MB, includes shell for troubleshooting)
docker pull ghcr.io/vnvo/deltaforge:latest-debug
docker pull vnvohub/deltaforge:latest-debug
VariantSizeBaseUse case
latest~57MBscratchProduction
latest-debug~140MBdebian-slimTroubleshooting, has shell

Build locally

Two Dockerfiles are provided:

# Minimal image (~57MB)
docker build -t deltaforge:local .

# Debug image (~140MB, includes shell)
docker build -t deltaforge:local-debug -f Dockerfile.debug .

Or use the dev helper:

./dev.sh docker            # build minimal image
./dev.sh docker-debug      # build debug image
./dev.sh docker-test       # test minimal image runs
./dev.sh docker-test-debug # test debug image runs
./dev.sh docker-all        # build and test all variants
./dev.sh docker-shell      # open shell in debug container

Build multi-arch locally

To build for both amd64 and arm64:

./dev.sh docker-multi-setup  # create buildx builder (once)
./dev.sh docker-multi        # build both architectures

Note: Multi-arch builds use QEMU emulation and take ~30-35 minutes. The images are not loaded locally - use --push to push to a registry.

Run the image

Run the container by mounting your pipeline specs and exposing the API and metrics ports:

docker run --rm \
  -p 8080:8080 -p 9000:9000 \
  -v $(pwd)/examples/dev.yaml:/etc/deltaforge/pipeline.yaml:ro \
  -v deltaforge-checkpoints:/app/data \
  ghcr.io/vnvo/deltaforge:latest \
  --config /etc/deltaforge/pipeline.yaml

Notes:

  • The container listens on 0.0.0.0:8080 for the control plane API with metrics on :9000.
  • Checkpoints are written to /app/data/df_checkpoints.json; mount a volume to persist them across restarts.
  • Environment variables inside the YAML are expanded before parsing.
  • Pass any other runner flags as needed (e.g., --api-addr or --metrics-addr).

Debug a running container

Use the debug image to troubleshoot:

# Run with shell access
docker run --rm -it --entrypoint /bin/bash ghcr.io/vnvo/deltaforge:latest-debug

# Exec into a running container
docker exec -it <container_id> /bin/bash

Dev helper commands

The dev.sh script provides shortcuts for common tasks:

./dev.sh help  # show all commands

Infrastructure

./dev.sh up           # start MySQL, Kafka, Redis
./dev.sh down         # stop and remove containers
./dev.sh ps           # list running services

Kafka

./dev.sh k-list                         # list topics
./dev.sh k-create <topic>               # create topic
./dev.sh k-consume <topic> --from-beginning
./dev.sh k-produce <topic>              # interactive producer

Redis

./dev.sh redis-cli                      # open redis-cli
./dev.sh redis-read <stream>            # read from stream

Database shells

./dev.sh pg-sh       # psql into Postgres
./dev.sh mysql-sh    # mysql into MySQL

Documentation

./dev.sh docs        # serve docs locally (opens browser)
./dev.sh docs-build  # build docs

Pre-release checks

./dev.sh release-check  # run all checks + build all Docker variants

Contributing

  1. Fork the repository
  2. Create a branch from main (e.g., feature/new-sink, fix/checkpoint-bug)
  3. Make your changes
  4. Run ./dev.sh check to ensure CI will pass
  5. Submit a PR against main

Things to Remember

Tests

There a few #[ignore] tests, run them when making deep changes to the sources, pipeline coordination and anything with impact on core functionality.

Logging hygiene

  • Include pipeline, tenant, source_id/sink_id, and batch_id fields on all warnings/errors to make traces joinable in log aggregation tools.
  • Normalize retry/backoff logs so they include the attempt count and sleep duration; consider a structured reason field alongside error details for dashboards.
  • Add info-level summaries on interval (e.g., every N batches) reporting batches processed, average batch size, lag, and sink latency percentiles pulled from the metrics registry to create human-friendly breadcrumbs.
  • Add metrics in a backward-compatible way: prefer new metric names over redefining existing ones to avoid breaking dashboards. Validate cardinality (bounded label sets) before merging.
  • Gate noisy logs behind levels (debug for per-event traces, info for batch summaries, warn/error for retries and failures).
  • Exercise the new metrics in integration tests by asserting counters change when sending synthetic events through pipelines.

Roadmap