Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

DeltaForge

Version Arch License

Introduction

DeltaForge is a versatile, high-performance Change Data Capture (CDC) engine built in Rust. It streams database changes into downstream systems like Kafka, Redis, NATS, and HTTP endpoints - giving you full control over how events are routed, transformed, and delivered. Supports JSON and Avro encoding (with Confluent Schema Registry), end-to-end exactly-once delivery via Kafka transactions, and built-in schema discovery that automatically infers and tracks the shape of your data as it flows through.

Pipelines are defined declaratively in YAML, making it straightforward to onboard new use cases without custom code.

Built with Sources Processors Sinks Output Formats
Rust
Rust
MySQL PostgreSQL
MySQL · PostgreSQL
JavaScript
JavaScript · Outbox
Kafka Redis NATS
Kafka · Redis · NATS · HTTP
JSON Avro
Native Debezium CloudEvents

Why DeltaForge?

Core Capabilities

  • Powered by Rust : Predictable performance, memory safety, and minimal resource footprint.
  • 🔌 Pluggable architecture : Sources, processors, and sinks are modular and independently extensible.
  • 🧩 Declarative pipelines : Define sources, transforms, sinks, and commit policies in version-controlled YAML with environment variable expansion for secrets.
  • 📦 Reliable checkpointing : Per-sink independent checkpoints. Exactly-once delivery via Kafka transactions. At-least-once with dedup for NATS and Redis.
  • 🔁 Avro encoding : Confluent wire format with Schema Registry. DDL-derived schemas with exact types and nullability. Safe defaults for unsigned integers, enums, and timestamps.
  • 🪦 Dead letter queue : Poison events routed to DLQ instead of blocking the pipeline. REST API for inspection, filtering, and replay.
  • 🔀 Dynamic routing : Route events to per-table topics, streams, or subjects using templates or JavaScript logic.
  • 📤 Transactional outbox : Publish domain events atomically with database writes. Per-aggregate routing, raw payload delivery, zero polling.
  • 🛠️ Cloud-native ready : Single binary, Docker images, JSON logs, Prometheus metrics, and liveness/readiness probes for Kubernetes.

Schema Intelligence

  • 🔍 Schema sensing : Automatically infer and track schema from event payloads, including deep inspection of nested JSON structures.
  • 🗺️ High-cardinality handling : Detect and normalize dynamic map keys (session IDs, trace IDs) to prevent false schema evolution events.
  • 🏷️ Schema fingerprinting : SHA-256 based change detection with schema-to-checkpoint correlation for reliable replay.
  • 🗃️ Source-owned semantics : Preserves native database types (PostgreSQL arrays, MySQL JSON, etc.) instead of normalizing to a universal type system.

Operational Features

  • 🔄 Graceful failover : Handles source failover with automatic schema revalidation - no manual intervention needed.
  • 🧬 Zero-downtime schema evolution : Detects DDL changes and reloads schemas automatically, no pipeline restart needed.
  • 🎯 Flexible table selection : Wildcard patterns (db.*, schema.prefix%) for easy onboarding.
  • 📀 Transaction boundaries : Optionally keep source transactions intact across batches.
  • ⚙️ Commit policies : Control checkpoint behavior with all, required, or quorum modes across multiple sinks.
  • 🔧 Live pipeline management : Pause, resume, patch, and inspect running pipelines via the REST API.
  • 🗄️ Safe initial snapshot : Consistent parallel backfill of existing tables before streaming begins, with binlog/WAL retention validation, background guards, and crash-resume at table granularity.

Use Cases

DeltaForge is designed for:

  • Real-time data synchronization : Keep caches, search indexes, and analytics systems in sync with your primary database.
  • Event-driven architectures : Stream database changes to Kafka or NATS for downstream microservices.
  • Transactional messaging : Use the outbox pattern to publish domain events atomically with database writes - no distributed transactions needed.
  • Audit trails and compliance : Capture every mutation with full before/after images for SOC2, HIPAA, or GDPR requirements.
  • Lightweight ETL : Transform, filter, and route data in-flight with JavaScript processors - no Spark or Flink cluster needed.

DeltaForge is not a DAG-based stream processor. It is a focused CDC engine meant to replace tools like Debezium when you need a lighter, cloud-native, and more customizable runtime.

Getting Started

GuideDescription
QuickstartGet DeltaForge running in minutes
CDC OverviewUnderstand Change Data Capture concepts
ConfigurationPipeline spec reference
DevelopmentBuild from source, contribute

Quick Example

metadata:
  name: orders-to-kafka
  tenant: acme

spec:
  source:
    type: mysql
    config:
      dsn: ${MYSQL_DSN}
      tables: [shop.orders]

  processors:
    - type: javascript
      inline: |
        (event) => {
          event.processed_at = Date.now();
          return [event];
        }

  sinks:
    - type: kafka
      config:
        brokers: ${KAFKA_BROKERS}
        topic: order-events

Installation

Docker (recommended):

docker pull ghcr.io/vnvo/deltaforge:latest

From source:

git clone https://github.com/vnvo/deltaforge.git
cd deltaforge
cargo build --release

See the Development Guide for detailed build instructions and available Docker image variants.

License

DeltaForge is dual-licensed under MIT and Apache 2.0.

Change Data Capture (CDC)

Change Data Capture (CDC) is the practice of streaming database mutations as ordered events so downstream systems can stay in sync without periodic full loads. Rather than asking “what does my data look like now?”, CDC answers “what changed, and when?”

DeltaForge is a CDC engine built to make log-based change streams reliable, observable, and operationally simple in modern, containerized environments. It focuses on row-level CDC from MySQL binlog and Postgres logical replication to keep consumers accurate and latency-aware while minimizing impact on source databases.

In short: DeltaForge tails committed transactions from MySQL and Postgres logs, preserves ordering and transaction boundaries, and delivers events to Kafka and Redis with checkpointed delivery, configurable batching, and Prometheus metrics - without requiring a JVM or distributed coordinator.


Why CDC matters

Traditional data integration relies on periodic batch jobs that query source systems, compare snapshots, and push differences downstream. This approach worked for decades, but modern architectures demand something better.

The batch ETL problem: A nightly sync means your analytics are always a day stale. An hourly sync still leaves gaps and hammers your production database with expensive SELECT * queries during business hours. As data volumes grow, these jobs take longer, fail more often, and compete for the same resources your customers need.

CDC flips the model. Instead of pulling data on a schedule, you subscribe to changes as they happen.

AspectBatch ETLCDC
LatencyMinutes to hoursSeconds to milliseconds
Source loadHigh (repeated scans)Minimal (log tailing)
Data freshnessStale between runsNear real-time
Failure recoveryRe-run entire jobResume from checkpoint
Change detectionDiff comparisonNative from source

These benefits compound as systems scale and teams decentralize:

  • Deterministic replay: Ordered events allow consumers to reconstruct state or power exactly-once delivery with checkpointing.
  • Polyglot delivery: The same change feed can serve caches, queues, warehouses, and search indexes simultaneously without additional source queries.

How CDC works

All CDC implementations share a common goal: detect changes and emit them as events. The approaches differ in how they detect those changes.

Log-based CDC

Databases maintain transaction logs (MySQL binlog, Postgres WAL) that record every committed change for durability and replication. Log-based CDC reads these logs directly, capturing changes without touching application tables.

┌─────────────┐    commits    ┌─────────────┐    tails     ┌─────────────┐
│ Application │──────────────▶│  Database   │─────────────▶│ CDC Engine  │
└─────────────┘               │   + WAL     │              └──────┬──────┘
                              └─────────────┘                     │
                                                                  ▼
                                                           ┌─────────────┐
                                                           │   Kafka /   │
                                                           │   Redis     │
                                                           └─────────────┘

Advantages:

  • Zero impact on source table performance
  • Captures all changes including those from triggers and stored procedures
  • Preserves transaction boundaries and ordering
  • Can capture deletes without soft-delete columns

Trade-offs:

  • Requires database configuration (replication slots, binlog retention)
  • Schema changes need careful handling
  • Log retention limits how far back you can replay

DeltaForge uses log-based CDC exclusively. This allows DeltaForge to provide stronger ordering guarantees, lower source impact, and simpler operational semantics than hybrid approaches that mix log tailing with polling or triggers.

Trigger-based CDC

Database triggers fire on INSERT, UPDATE, and DELETE operations, writing change records to a shadow table that a separate process polls.

Advantages:

  • Works on databases without accessible transaction logs
  • Can capture application-level context unavailable in logs

Trade-offs:

  • Adds write overhead to every transaction
  • Triggers can be disabled or forgotten during schema migrations
  • Shadow tables require maintenance and can grow unbounded

Polling-based CDC

A process periodically queries tables for rows modified since the last check, typically using an updated_at timestamp or incrementing ID.

Advantages:

  • Simple to implement
  • No special database configuration required

Trade-offs:

  • Cannot reliably detect deletes
  • Requires updated_at columns on every table
  • Polling frequency trades off latency against database load
  • Clock skew and transaction visibility can cause missed or duplicate events

Anatomy of a CDC event

A well-designed CDC event contains everything downstream consumers need to process the change correctly.

{
  "id": "evt_01J7K9X2M3N4P5Q6R7S8T9U0V1",
  "source": {
    "database": "shop",
    "table": "orders",
    "server_id": "mysql-prod-1"
  },
  "operation": "update",
  "timestamp": "2025-01-15T14:32:01.847Z",
  "transaction": {
    "id": "gtid:3E11FA47-71CA-11E1-9E33-C80AA9429562:42",
    "position": 15847293
  },
  "before": {
    "id": 12345,
    "status": "pending",
    "total": 99.99,
    "updated_at": "2025-01-15T14:30:00.000Z"
  },
  "after": {
    "id": 12345,
    "status": "shipped",
    "total": 99.99,
    "updated_at": "2025-01-15T14:32:01.000Z"
  },
  "schema_version": "v3"
}

Key components:

FieldPurpose
operationINSERT, UPDATE, DELETE, or DDL
before / afterRow state before and after the change (enables diff logic)
transactionGroups changes from the same database transaction
timestampWhen the change was committed at the source
schema_versionHelps consumers handle schema evolution

Not all fields are present for every operation-before is omitted for INSERTs, after is omitted for DELETEs - but the event envelope and metadata fields are consistent across MySQL and Postgres sources.


Real-world use cases

Cache invalidation and population

Problem: Your Redis cache serves product catalog data, but cache invalidation logic is scattered across dozens of services. Some forget to invalidate, others invalidate too aggressively, and debugging staleness issues takes hours.

CDC solution: Stream changes from the products table to a message queue. A dedicated consumer reads the stream and updates or deletes cache keys based on the operation type. This centralizes invalidation logic, provides replay capability for cache rebuilds, and removes cache concerns from application code entirely.

┌──────────┐     CDC      ┌─────────────┐   consumer   ┌─────────────┐
│  MySQL   │─────────────▶│   Stream    │─────────────▶│ Cache Keys  │
│ products │              │  (Kafka/    │              │ product:123 │
└──────────┘              │   Redis)    │              └─────────────┘
                          └─────────────┘

Event-driven microservices

Problem: Your order service needs to notify inventory, shipping, billing, and analytics whenever an order changes state. Direct service-to-service calls create tight coupling and cascade failures.

CDC solution: Publish order changes to a durable message queue. Each downstream service subscribes independently, processes at its own pace, and can replay events after failures or during onboarding. The order service doesn’t need to know about its consumers.

┌─────────────┐     CDC      ┌─────────────┐
│   Orders    │─────────────▶│   Message   │
│   Database  │              │   Queue     │
└─────────────┘              └──────┬──────┘
                    ┌───────────────┼───────────────┐
                    ▼               ▼               ▼
             ┌───────────┐   ┌───────────┐   ┌───────────┐
             │ Inventory │   │ Shipping  │   │ Analytics │
             └───────────┘   └───────────┘   └───────────┘

Search index synchronization

Problem: Your Elasticsearch index drifts from the source of truth. Full reindexing takes hours and blocks search updates during the process.

CDC solution: Stream changes continuously to keep the index synchronized. Use the before image to remove old documents and the after image to index new content.

Data warehouse incremental loads

Problem: Nightly full loads into your warehouse take 6 hours and block analysts until noon. Business users complain that dashboards show yesterday’s numbers.

CDC solution: Stream changes to a staging topic, then micro-batch into your warehouse every few minutes. Analysts get near real-time data without impacting source systems.

Audit logging and compliance

Problem: Regulations require you to maintain a complete history of changes to sensitive data. Application-level audit logging is inconsistent and can be bypassed.

CDC solution: The transaction log captures every committed change regardless of how it was made - application code, admin scripts, or direct SQL. Stream these events to immutable storage for compliance.

Cross-region replication

Problem: You need to replicate data to a secondary region for disaster recovery, but built-in replication doesn’t support the transformations you need.

CDC solution: Stream changes through a CDC pipeline that filters, transforms, and routes events to the target region’s databases or message queues.


Architecture patterns

The outbox pattern

When you need to update a database and publish an event atomically, the outbox pattern provides exactly-once semantics without distributed transactions.

┌─────────────────────────────────────────┐
│              Single Transaction         │
│  ┌─────────────┐    ┌─────────────────┐ │
│  │ UPDATE      │    │ INSERT INTO     │ │
│  │ orders SET  │ +  │ outbox (event)  │ │
│  │ status=...  │    │                 │ │
│  └─────────────┘    └─────────────────┘ │
└─────────────────────────────────────────┘
                      │
                      ▼ CDC
               ┌─────────────┐
               │   Kafka     │
               └─────────────┘
  1. The application writes business data and an event record in the same transaction.
  2. CDC tails the outbox table and publishes events to Kafka.
  3. A cleanup process (or CDC itself) removes processed outbox rows.

This guarantees that events are published if and only if the transaction commits.

When to skip the outbox: If your only requirement is to react to committed database state (not application intent), direct CDC from business tables is often simpler than introducing an outbox. The outbox pattern adds value when you need custom event payloads, explicit event versioning, or when the event schema differs significantly from table structure.

Event sourcing integration

CDC complements event sourcing by bridging legacy systems that weren’t built event-first. Stream changes from existing tables into event stores, then gradually migrate to native event sourcing.

CQRS (Command Query Responsibility Segregation)

CDC naturally supports CQRS by populating read-optimized projections from the write model. Changes flow through the CDC pipeline to update denormalized views, search indexes, or cache layers.


Challenges and solutions

Schema evolution

Databases change. Columns get added, renamed, or removed. Types change. CDC pipelines need to handle this gracefully.

Strategies:

  • Schema registry: Store and version schemas centrally (e.g., Confluent Schema Registry with Avro/Protobuf).
  • Forward compatibility: Add columns as nullable; avoid removing columns that consumers depend on.
  • Consumer tolerance: Design consumers to ignore unknown fields and handle missing optional fields.
  • Processor transforms: Use DeltaForge’s JavaScript processors to normalize schemas before sinks.

Ordering guarantees

Events must arrive in the correct order for consumers to reconstruct state accurately. A DELETE arriving before its corresponding INSERT would be catastrophic.

DeltaForge guarantees:

  • Source-order preservation per table partition (always enabled)
  • Transaction boundary preservation when respect_source_tx: true is configured in batch settings

Kafka sink uses consistent partitioning by primary key to maintain ordering within a partition at the consumer.

Delivery guarantees

Network failures, process crashes, and consumer restarts can cause duplicates or gaps. End-to-end exactly-once requires coordination between source, pipeline, and sink.

DeltaForge approach:

  • Checkpoints track the last committed position in the source log.
  • Configurable commit policies (all, required, quorum) control when checkpoints advance.
  • Kafka: end-to-end exactly-once via transactional producer (exactly_once: true). Consumers set isolation.level=read_committed.
  • NATS: at-least-once with server-side dedup via Nats-Msg-Id header within duplicate_window.
  • Redis: at-least-once with consumer-side dedup via idempotency_key field.

Default behavior: DeltaForge provides at-least-once delivery out of the box. End-to-end exactly-once is available for Kafka with exactly_once: true. See the Guarantees page for the full delivery tier matrix.

High availability

Production CDC pipelines need to handle failures without data loss or extended downtime.

Best practices:

  • Run multiple pipeline instances with leader election.
  • Store checkpoints in durable storage (DeltaForge persists to local files, mountable volumes in containers).
  • Monitor lag between source position and checkpoint position.
  • Set up alerts for pipeline failures and excessive lag.

Expectations: DeltaForge checkpoints ensure no data loss on restart, but does not currently include built-in leader election. For HA deployments, use external coordination (Kubernetes leader election, etcd locks) or run active-passive with health-check-based failover.

Backpressure

When sinks can’t keep up with the change rate, pipelines need to slow down gracefully rather than dropping events or exhausting memory.

DeltaForge handles backpressure through:

  • Configurable batch sizes (max_events, max_bytes, max_ms).
  • In-flight limits (max_inflight) that bound concurrent sink writes.
  • Blocking reads from source when batches queue up.

Performance considerations

Batching trade-offs

SettingLow valueHigh value
max_eventsLower latency, more overheadHigher throughput, more latency
max_msFaster flush, smaller batchesLarger batches, delayed flush
max_bytesMemory-safe, frequent commitsEfficient for large rows

Start with DeltaForge defaults and tune based on observed latency and throughput.

Source database impact

Log-based CDC has minimal impact, but consider:

  • Replication slot retention: Paused pipelines cause WAL/binlog accumulation.
  • Connection limits: Each pipeline holds a replication connection.
  • Network bandwidth: High-volume tables generate significant log traffic.

Sink throughput

  • Kafka: Tune batch.size, linger.ms, and compression in client_conf.
  • Redis: Use pipelining and connection pooling for high-volume streams.

Monitoring and observability

CDC pipelines are long-running, stateful processes; without metrics and alerts, failures are silent by default. A healthy pipeline requires visibility into lag, throughput, and errors.

Key metrics to track

MetricDescriptionAlert threshold
cdc_lag_secondsTime between event timestamp and processing> 60s
events_processed_totalThroughput counterSudden drops
checkpoint_lag_eventsEvents since last checkpoint> 10,000
sink_errors_totalFailed sink writesAny sustained errors
batch_size_avgEvents per batchOutside expected range

DeltaForge exposes Prometheus metrics on the configurable metrics endpoint (default :9000).

Health checks

  • GET /health: Liveness probe - is the process running?
  • GET /ready: Readiness probe - are pipelines connected and processing?
  • GET /pipelines: Detailed status of each pipeline including configuration.

Choosing a CDC solution

When evaluating CDC tools, consider:

FactorQuestions to ask
Source supportDoes it support your databases? MySQL binlog? Postgres logical replication?
Sink flexibilityCan it write to your target systems? Kafka, Redis, HTTP, custom?
TransformationCan you filter, enrich, or reshape events in-flight?
Operational overheadHow much infrastructure does it require? JVM? Distributed coordinator?
Resource efficiencyWhat’s the memory/CPU footprint per pipeline?
Cloud-nativeDoes it containerize cleanly? Support health checks? Emit metrics?

Where DeltaForge fits

DeltaForge intentionally avoids the operational complexity of JVM-based CDC stacks (Kafka Connect-style deployments with Zookeeper, Connect workers, and converter configurations) while remaining compatible with Kafka-centric architectures.

DeltaForge is designed for teams that want:

  • Lightweight runtime: Single binary, minimal memory footprint, no JVM warmup.
  • Config-driven pipelines: YAML specs instead of code for common patterns.
  • Inline transformation: JavaScript processors for custom logic without recompilation.
  • Container-native operations: Built for Kubernetes with health endpoints and Prometheus metrics.

DeltaForge is not designed for:

  • Complex DAG-based stream processing with windowed aggregations
  • Stateful joins across multiple streams
  • Sources beyond MySQL and Postgres (currently)
  • Built-in schema registry integration (use external registries)

If you need those capabilities, consider dedicated stream processors or the broader Kafka ecosystem. DeltaForge excels at getting data out of databases and into your event infrastructure reliably and efficiently.


Getting started

Ready to try CDC with DeltaForge? Head to the Quickstart guide to run your first pipeline in minutes.

For production deployments, review the Development guide for container builds and operational best practices.

Quickstart

Get DeltaForge running in minutes.

1. Prepare a pipeline spec

Create a YAML file that defines your CDC pipeline. Environment variables are expanded at runtime, so secrets stay out of version control.

metadata:
  name: orders-mysql-to-kafka
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders

  processors:
    - type: javascript
      id: transform
      inline: |
        (event) => {
          event.tags = ["processed"];
          return [event];
        }

  sinks:
    - type: kafka
      config:
        id: orders-kafka
        brokers: ${KAFKA_BROKERS}
        topic: orders
        required: true

  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 1000

2. Start DeltaForge

Using Docker (recommended):

docker run --rm \
  -p 8080:8080 -p 9000:9000 \
  -e MYSQL_DSN="mysql://user:pass@host:3306/db" \
  -e KAFKA_BROKERS="kafka:9092" \
  -v $(pwd)/pipeline.yaml:/etc/deltaforge/pipeline.yaml:ro \
  -v deltaforge-checkpoints:/app/data \
  ghcr.io/vnvo/deltaforge:latest \
  --config /etc/deltaforge/pipeline.yaml

From source:

cargo run -p runner -- --config ./pipeline.yaml

Runner options

FlagDefaultDescription
--config(required)Path to pipeline spec file or directory
--api-addr0.0.0.0:8080REST API address
--metrics-addr0.0.0.0:9095Prometheus metrics address

3. Verify it’s running

Check health and pipeline status:

# Liveness probe
curl http://localhost:8080/health

# Readiness with pipeline status
curl http://localhost:8080/ready

# List all pipelines
curl http://localhost:8080/pipelines

4. Manage pipelines

Control pipelines via the REST API:

# Pause a pipeline
curl -X POST http://localhost:8080/pipelines/orders-mysql-to-kafka/pause

# Resume a pipeline
curl -X POST http://localhost:8080/pipelines/orders-mysql-to-kafka/resume

# Stop a pipeline
curl -X POST http://localhost:8080/pipelines/orders-mysql-to-kafka/stop

Next steps

Configuration

Pipelines are defined as YAML documents that map directly to the PipelineSpec type. Environment variables are expanded before parsing using ${VAR} syntax, so secrets and connection strings can be injected at runtime. Unknown variables (e.g. ${source.table}) pass through for use as routing templates.

Document structure

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: <pipeline-name>
  tenant: <tenant-id>
spec:
  source: { ... }
  processors: [ ... ]
  sinks: [ ... ]
  batch: { ... }
  commit_policy: { ... }
  schema_sensing: { ... }

Metadata

FieldTypeRequiredDescription
namestringYesUnique pipeline identifier. Used in API routes and metrics.
tenantstringYesBusiness-oriented tenant label for multi-tenancy.

Spec fields

FieldTypeRequiredDescription
sourceobjectYesDatabase source configuration. See Sources.
processorsarrayNoOrdered list of processors. See Processors.
sinksarrayYes (at least one)One or more sinks that receive each batch. See Sinks.
shardingobjectNoOptional hint for downstream distribution.
connection_policyobjectNoHow the runtime establishes upstream connections.
batchobjectNoCommit unit thresholds. See Batching.
commit_policyobjectNoHow sink acknowledgements gate checkpoints. See Commit policy.
schema_sensingobjectNoAutomatic schema inference from event payloads. See Schema sensing.
journalobjectNoEvent journal (DLQ). See Dead Letter Queue.

Sources

MySQL

Captures row-level changes via binlog replication. See MySQL source documentation for prerequisites and detailed configuration.

source:
  type: mysql
  config:
    id: orders-mysql
    dsn: ${MYSQL_DSN}
    tables:
      - shop.orders
      - shop.order_items
    outbox:
      tables: ["shop.outbox"]
    snapshot:
      mode: initial
    on_schema_drift: adapt
FieldTypeDescription
idstringUnique identifier for checkpoints and metrics
dsnstringMySQL connection string with replication privileges
tablesarrayTable patterns to capture; omit for all tables
outbox.tablesarrayTable patterns to tag as outbox events. Must also appear in tables. Supports globs: shop.outbox, *.outbox, shop.outbox_%.
snapshot.modestringnever (default), initial - run once if no checkpoint exists, always - re-snapshot on every restart
snapshot.max_parallel_tablesintTables snapshotted concurrently (default: 8)
snapshot.chunk_sizeintRows per range chunk for integer-PK tables (default: 10000)
on_schema_driftstringadapt (default) - reload schema and continue after failover drift; halt — stop and require operator intervention. See Failover Handling.

Table patterns support SQL LIKE syntax:

  • db.table - exact match
  • db.prefix% - tables matching prefix
  • db.% - all tables in database

PostgreSQL

Captures row-level changes via logical replication. See PostgreSQL source documentation for prerequisites and detailed configuration.

source:
  type: postgres
  config:
    id: users-postgres
    dsn: ${POSTGRES_DSN}
    slot: deltaforge_users
    publication: users_pub
    tables:
      - public.users
      - public.sessions
    start_position: earliest
    outbox:
      prefixes: ["outbox", "order_outbox_%"]
    snapshot:
      mode: initial
    on_schema_drift: adapt
FieldTypeDescription
idstringUnique identifier
dsnstringPostgreSQL connection string
slotstringReplication slot name
publicationstringPublication name
tablesarrayTable patterns to capture
start_positionstringearliest, latest, or lsn
outbox.prefixesarraypg_logical_emit_message prefixes to tag as outbox events. Supports globs: outbox, outbox_%, *.
snapshot.modestringnever (default), initial - run once if no checkpoint exists, always - re-snapshot on every restart
snapshot.max_parallel_tablesintTables snapshotted concurrently (default: 8)
snapshot.chunk_sizeintRows per range chunk (default: 10000)
on_schema_driftstringadapt (default) — reload schema and continue after failover drift; halt — stop and require operator intervention. See Failover Handling.

Note: The source-level outbox field only tags matching events with the __outbox sentinel. Routing and transformation are handled by the outbox processor.


Processors

Processors transform events between source and sinks. They run in order and can filter, enrich, or modify events.

JavaScript

processors:
  - type: javascript
    id: transform
    inline: |
      function processBatch(events) {
        return events.map(e => {
          e.tags = ["processed"];
          return e;
        });
      }
    limits:
      cpu_ms: 50
      mem_mb: 128
      timeout_ms: 500
FieldTypeDescription
idstringProcessor identifier
inlinestringJavaScript code
limits.cpu_msintCPU time limit
limits.mem_mbintMemory limit
limits.timeout_msintExecution timeout

Flatten

processors:
  - type: flatten
    id: flat
    separator: "__"
    max_depth: 3
    on_collision: last
    empty_object: preserve
    lists: preserve
    empty_list: drop
FieldTypeDefaultDescription
idstring"flatten"Processor identifier
separatorstring"__"Separator between path segments
max_depthintunlimitedStop recursing at this depth; objects at the boundary kept as-is
on_collisionstringlastKey collision policy: last, first, or error
empty_objectstringpreserveEmpty object policy: preserve, drop, or null
listsstringpreserveArray policy: preserve or index
empty_liststringpreserveEmpty array policy: preserve, drop, or null

Filter

processors:
  - type: filter
    id: only-active-orders
    ops: [create, update]
    tables:
      include: ["shop.orders"]
      exclude: ["*.tmp"]
    fields:
      - path: status
        op: eq
        value: "active"
      - path: total
        op: gte
        value: 100
    match: all
FieldTypeDefaultDescription
idstring"filter"Processor identifier
opslist[]Op types to keep. Empty = all. create, update, delete, read, truncate
tables.includelist[]Table glob patterns to include. Empty = all
tables.excludelist[]Table glob patterns to exclude. Takes priority over include
fieldslist[]Field predicates against event.after. See Filter operators
matchstringallall - every predicate must match; any - at least one

Outbox

Transforms raw outbox events into routed, sink-ready events. Requires the source to have outbox configured so events are tagged before reaching this processor. See Outbox pattern documentation for full details.

processors:
  - type: outbox
    id: outbox
    topic: "${aggregate_type}.${event_type}"
    default_topic: "events.unrouted"
    raw_payload: true
    columns:
      payload: data
    additional_headers:
      x-trace-id: trace_id
FieldTypeDefaultDescription
idstring"outbox"Processor identifier
tablesarray[]Filter: only process outbox events matching these patterns. Empty = all outbox events.
topicstringTopic template resolved against the raw payload using ${field} placeholders
default_topicstringFallback topic when template resolution fails and no topic column exists
keystringKey template resolved against raw payload. Default: aggregate_id value.
raw_payloadboolfalseDeliver the extracted payload as-is to sinks, bypassing envelope wrapping
strictboolfalseFail the batch if required fields are missing rather than silently falling back
columns.payloadstringpayloadColumn containing the event payload
columns.aggregate_typestringaggregate_typeColumn for aggregate type
columns.aggregate_idstringaggregate_idColumn for aggregate ID
columns.event_typestringevent_typeColumn for event type
columns.topicstringtopicColumn for pre-computed topic override
columns.event_idstringidColumn extracted as df-event-id header
additional_headersmap{}Forward extra payload fields as routing headers: header-name: column-name

Sinks

Sinks deliver events to downstream systems. Each sink supports configurable envelope formats and wire encodings to match consumer expectations. See the Sinks documentation for detailed information on multi-sink patterns, commit policies, and failure handling.

Envelope and encoding

All sinks support these serialization options:

FieldTypeDefaultDescription
envelopeobjectnativeOutput structure format. See Envelopes.
encodingstringjsonWire encoding format

Envelope types:

  • native - Direct Debezium payload structure (default, most efficient)
  • debezium - Full {"payload": ...} wrapper
  • cloudevents - CloudEvents 1.0 specification (requires type_prefix)
# Native (default)
envelope:
  type: native

# Debezium wrapper
envelope:
  type: debezium

# CloudEvents
envelope:
  type: cloudevents
  type_prefix: "com.example.cdc"

Kafka

See Kafka sink documentation for detailed configuration options and best practices.

sinks:
  - type: kafka
    config:
      id: orders-kafka
      brokers: ${KAFKA_BROKERS}
      topic: orders
      envelope:
        type: debezium
      encoding: json
      required: true
      exactly_once: false
      send_timeout_secs: 30
      client_conf:
        security.protocol: SASL_SSL
FieldTypeDefaultDescription
idstring-Sink identifier
brokersstring-Kafka broker addresses
topicstring-Target topic or template
keystring-Message key template
envelopeobjectnativeOutput format
encodingstringjsonWire encoding
requiredbooltrueGates checkpoints
exactly_onceboolfalseTransactional mode
send_timeout_secsint30Send timeout
client_confmap-librdkafka overrides

CloudEvents example:

sinks:
  - type: kafka
    config:
      id: events-kafka
      brokers: ${KAFKA_BROKERS}
      topic: events
      envelope:
        type: cloudevents
        type_prefix: "com.acme.cdc"
      encoding: json

Redis

See Redis sink documentation for detailed configuration options and best practices.

sinks:
  - type: redis
    config:
      id: orders-redis
      uri: ${REDIS_URI}
      stream: orders
      envelope:
        type: native
      encoding: json
      required: true
FieldTypeDefaultDescription
idstring-Sink identifier
uristring-Redis connection URI
streamstring-Redis stream key or template
keystring-Entry key template
envelopeobjectnativeOutput format
encodingstringjsonWire encoding
requiredbooltrueGates checkpoints
send_timeout_secsint5XADD timeout
batch_timeout_secsint30Pipeline timeout
connect_timeout_secsint10Connection timeout

NATS

See NATS sink documentation for detailed configuration options and best practices.

sinks:
  - type: nats
    config:
      id: orders-nats
      url: ${NATS_URL}
      subject: orders.events
      stream: ORDERS
      envelope:
        type: native
      encoding: json
      required: true
      send_timeout_secs: 5
      batch_timeout_secs: 30
FieldTypeDefaultDescription
idstring-Sink identifier
urlstring-NATS server URL
subjectstring-Subject or template
keystring-Message key template
streamstring-JetStream stream name
envelopeobjectnativeOutput format
encodingstringjsonWire encoding
requiredbooltrueGates checkpoints
send_timeout_secsint5Publish timeout
batch_timeout_secsint30Batch timeout
connect_timeout_secsint10Connection timeout
credentials_filestring-NATS credentials file
usernamestring-Auth username
passwordstring-Auth password
tokenstring-Auth token

Batching

batch:
  max_events: 500
  max_bytes: 1048576
  max_ms: 1000
  respect_source_tx: true
  max_inflight: 2
FieldTypeDefaultDescription
max_eventsint500Flush after this many events
max_bytesint1048576Flush after size reaches limit
max_msint1000Flush after time (ms)
respect_source_txbooltrueNever split source transactions
max_inflightint2Max concurrent batches

Commit policy

commit_policy:
  mode: required

# For quorum mode:
commit_policy:
  mode: quorum
  quorum: 2
ModeDescription
allEvery sink must acknowledge before checkpoint
requiredOnly required: true sinks must acknowledge (default)
quorumCheckpoint after quorum sinks acknowledge

Schema sensing

Schema sensing automatically infers and tracks schema from event payloads. See the Schema Sensing documentation for detailed information on how it works, drift detection, and API endpoints.

Performance tip: Schema sensing can be CPU-intensive, especially with deep JSON inspection. Consider your throughput requirements when configuring:

  • Set enabled: false if you don’t need runtime schema inference
  • Limit deep_inspect.max_depth to avoid traversing deeply nested structures
  • Increase sampling.sample_rate to analyze fewer events (e.g., 1 in 100 instead of 1 in 10)
  • Reduce sampling.warmup_events if you’re confident in schema stability
schema_sensing:
  enabled: true
  deep_inspect:
    enabled: true
    max_depth: 3
    max_sample_size: 500
  sampling:
    warmup_events: 50
    sample_rate: 5
    structure_cache: true
    structure_cache_size: 50
  high_cardinality:
    enabled: true
    min_events: 100
    stable_threshold: 0.5
    min_dynamic_fields: 5
FieldTypeDefaultDescription
enabledboolfalseEnable schema sensing
deep_inspect.enabledbooltrueInspect nested JSON
deep_inspect.max_depthint10Max nesting depth
deep_inspect.max_sample_sizeint1000Max events for deep analysis
sampling.warmup_eventsint1000Events to fully analyze first
sampling.sample_rateint10After warmup, analyze 1 in N
sampling.structure_cachebooltrueCache structure fingerprints
sampling.structure_cache_sizeint100Max cached structures
high_cardinality.enabledbooltrueDetect dynamic map keys
high_cardinality.min_eventsint100Events before classification
high_cardinality.stable_thresholdfloat0.5Frequency for stable fields
high_cardinality.min_dynamic_fieldsint5Min unique fields for map

Complete examples

MySQL to Kafka with Debezium envelope

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-mysql-to-kafka
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders

  processors:
    - type: javascript
      id: transform
      inline: |
        function processBatch(events) {
          return events.map(event => {
            event.tags = (event.tags || []).concat(["normalized"]);
            return event;
          });
        }
      limits:
        cpu_ms: 50
        mem_mb: 128
        timeout_ms: 500

  sinks:
    - type: kafka
      config:
        id: orders-kafka
        brokers: ${KAFKA_BROKERS}
        topic: orders
        envelope:
          type: debezium
        encoding: json
        required: true
        exactly_once: false
        client_conf:
          message.timeout.ms: "5000"

  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 1000
    respect_source_tx: true
    max_inflight: 2

  commit_policy:
    mode: required

PostgreSQL to Kafka with CloudEvents

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: users-postgres-to-kafka
  tenant: acme

spec:
  source:
    type: postgres
    config:
      id: users-postgres
      dsn: ${POSTGRES_DSN}
      slot: deltaforge_users
      publication: users_pub
      tables:
        - public.users
        - public.user_sessions
      start_position: earliest

  sinks:
    - type: kafka
      config:
        id: users-kafka
        brokers: ${KAFKA_BROKERS}
        topic: user-events
        envelope:
          type: cloudevents
          type_prefix: "com.acme.users"
        encoding: json
        required: true

  batch:
    max_events: 500
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: required

Multi-sink with different formats

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-multi-sink
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders

  sinks:
    # Kafka Connect expects Debezium format
    - type: kafka
      config:
        id: connect-sink
        brokers: ${KAFKA_BROKERS}
        topic: connect-events
        envelope:
          type: debezium
        required: true

    # Lambda expects CloudEvents
    - type: kafka
      config:
        id: lambda-sink
        brokers: ${KAFKA_BROKERS}
        topic: lambda-events
        envelope:
          type: cloudevents
          type_prefix: "com.acme.cdc"
        required: false

    # Redis cache uses native format
    - type: redis
      config:
        id: cache-redis
        uri: ${REDIS_URI}
        stream: orders-cache
        envelope:
          type: native
        required: false

  batch:
    max_events: 500
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: required

MySQL to NATS

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-mysql-to-nats
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders
        - shop.order_items

  sinks:
    - type: nats
      config:
        id: orders-nats
        url: ${NATS_URL}
        subject: orders.events
        stream: ORDERS
        envelope:
          type: native
        encoding: json
        required: true

  batch:
    max_events: 500
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: required

REST API Reference

DeltaForge exposes a REST API for health checks, pipeline management, schema inspection, and drift detection. All endpoints return JSON.

Base URL

Default: http://localhost:8080

Configure with --api-addr:

deltaforge --config pipelines.yaml --api-addr 0.0.0.0:9090

Health Endpoints

Liveness Probe

GET /health

Returns ok when the process is running and all pipelines are healthy. Returns 503 if any pipeline has entered a failed state (e.g. position lost after failover, binlog purged, unrecoverable source error). Use for Kubernetes liveness probes — a 503 indicates the process should be restarted.

Response: 200 OK — all pipelines healthy

{"status": "healthy", "pipelines": 3}

Response: 503 Service Unavailable — one or more pipelines failed

{"status": "unhealthy", "failed_pipelines": ["orders-cdc"]}

Readiness Probe

GET /ready

Returns pipeline states. Use for Kubernetes readiness probes.

Response: 200 OK

{
  "status": "ready",
  "pipelines": [
    {
      "name": "orders-cdc",
      "status": "running",
      "spec": { ... }
    }
  ]
}

Pipeline Management

List Pipelines

GET /pipelines
GET /pipelines?label=env:prod
GET /pipelines?label=env:prod&label=team:platform

Returns all pipelines with current status. Filter by labels with AND logic. Key-only filter (?label=env) matches any value.

Response: 200 OK

[
  {
    "name": "orders-cdc",
    "status": "running",
    "spec": {
      "metadata": { "name": "orders-cdc", "tenant": "acme" },
      "spec": { ... }
    }
  }
]

Get Pipeline

GET /pipelines/{name}

Returns a single pipeline by name with operational status.

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "running",
  "spec": { ... },
  "ops": {
    "uptime_seconds": 3600.5,
    "dlq_entries": 0,
    "sink_errors": {},
    "checkpoints": [
      {"sink_id": "kafka-primary", "position": {"file": "mysql-bin.000005", "pos": 12345}, "age_seconds": 0.3}
    ]
  }
}

Errors:

  • 404 Not Found - Pipeline doesn’t exist

Create Pipeline

POST /pipelines
Content-Type: application/json

Creates a new pipeline from a full spec.

Request:

{
  "metadata": {
    "name": "orders-cdc",
    "tenant": "acme"
  },
  "spec": {
    "source": {
      "type": "mysql",
      "config": {
        "id": "mysql-1",
        "dsn": "mysql://user:pass@host/db",
        "tables": ["shop.orders"]
      }
    },
    "processors": [],
    "sinks": [
      {
        "type": "kafka",
        "config": {
          "id": "kafka-1",
          "brokers": "localhost:9092",
          "topic": "orders"
        }
      }
    ]
  }
}

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "running",
  "spec": { ... }
}

Errors:

  • 409 Conflict - Pipeline already exists

Update Pipeline

PATCH /pipelines/{name}
Content-Type: application/json

Applies a partial update to an existing pipeline. The spec is merged, the pipeline is restarted from its last saved checkpoint, and the new config takes effect immediately. Only the fields present in the request body are changed — omitted fields retain their current values.

If the pipeline is currently stopped, PATCH applies the new config and restarts it from the saved checkpoint. This is the recommended way to tune throughput settings before resuming after a planned stop.

Request:

{
  "spec": {
    "batch": {
      "max_events": 1000,
      "max_ms": 500
    }
  }
}

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "running",
  "spec": { ... }
}

Errors:

  • 404 Not Found - Pipeline doesn’t exist
  • 400 Bad Request - Invalid field value or name mismatch in patch

Pause Pipeline

POST /pipelines/{name}/pause

Suspends event processing while keeping the source connection alive. No new events are consumed from the binlog/WAL. Resume restarts processing from exactly where it paused — no events are missed.

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "paused",
  "spec": { ... }
}

Resume Pipeline

POST /pipelines/{name}/resume

Resumes a paused or stopped pipeline.

  • From paused — restarts event processing immediately; source connection was kept alive.
  • From stopped — reconnects to the source and replays from the last saved checkpoint; any events written to the binlog/WAL while stopped are replayed in order.

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "running",
  "spec": { ... }
}

Stop Pipeline

POST /pipelines/{name}/stop

Gracefully stops a pipeline: flushes in-flight events, saves the binlog/WAL checkpoint, and disconnects from the source. The pipeline remains in the registry and can be resumed with POST /pipelines/{name}/resume or by issuing a PATCH with updated config.

Use stop (rather than delete) when you intend to restart the pipeline later — for example, before a planned maintenance window or when tuning config for a backlog drain.

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "stopped",
  "spec": { ... }
}

Delete Pipeline

DELETE /pipelines/{name}

Permanently removes a pipeline from the runtime. The checkpoint is not preserved. Use stop first if you may want to restart the pipeline later.

Response: 204 No Content

Errors:

  • 404 Not Found - Pipeline doesn’t exist

Schema Management

List Database Schemas

GET /pipelines/{name}/schemas

Returns all tracked database schemas for a pipeline. These are the schemas loaded directly from the source database.

Response: 200 OK

[
  {
    "database": "shop",
    "table": "orders",
    "column_count": 5,
    "primary_key": ["id"],
    "fingerprint": "sha256:a1b2c3d4e5f6...",
    "registry_version": 2
  },
  {
    "database": "shop",
    "table": "customers",
    "column_count": 8,
    "primary_key": ["id"],
    "fingerprint": "sha256:f6e5d4c3b2a1...",
    "registry_version": 1
  }
]

Get Schema Details

GET /pipelines/{name}/schemas/{db}/{table}

Returns detailed schema information including all columns.

Response: 200 OK

{
  "database": "shop",
  "table": "orders",
  "columns": [
    {
      "name": "id",
      "type": "bigint(20) unsigned",
      "nullable": false,
      "default": null,
      "extra": "auto_increment"
    },
    {
      "name": "customer_id",
      "type": "bigint(20)",
      "nullable": false,
      "default": null
    }
  ],
  "primary_key": ["id"],
  "fingerprint": "sha256:a1b2c3d4..."
}

Schema Sensing

Schema sensing automatically infers schema structure from JSON event payloads. This is useful for sources that don’t provide schema metadata or for detecting schema evolution in JSON columns.

List Inferred Schemas

GET /pipelines/{name}/sensing/schemas

Returns all schemas inferred via sensing for a pipeline.

Response: 200 OK

[
  {
    "table": "orders",
    "fingerprint": "sha256:abc123...",
    "sequence": 3,
    "event_count": 1500,
    "stabilized": true,
    "first_seen": "2025-01-15T10:30:00Z",
    "last_seen": "2025-01-15T14:22:00Z"
  }
]
FieldDescription
tableTable name (or table:column for JSON column sensing)
fingerprintSHA-256 content hash of current schema
sequenceMonotonic version number (increments on evolution)
event_countTotal events observed
stabilizedWhether schema has stopped sampling (structure stable)
first_seenFirst observation timestamp
last_seenMost recent observation timestamp

Get Inferred Schema Details

GET /pipelines/{name}/sensing/schemas/{table}

Returns detailed inferred schema including all fields.

Response: 200 OK

{
  "table": "orders",
  "fingerprint": "sha256:abc123...",
  "sequence": 3,
  "event_count": 1500,
  "stabilized": true,
  "fields": [
    {
      "name": "id",
      "types": ["integer"],
      "nullable": false,
      "optional": false
    },
    {
      "name": "metadata",
      "types": ["object"],
      "nullable": true,
      "optional": false,
      "nested_field_count": 5
    },
    {
      "name": "tags",
      "types": ["array"],
      "nullable": false,
      "optional": true,
      "array_element_types": ["string"]
    }
  ],
  "first_seen": "2025-01-15T10:30:00Z",
  "last_seen": "2025-01-15T14:22:00Z"
}

Export JSON Schema

GET /pipelines/{name}/sensing/schemas/{table}/json-schema

Exports the inferred schema as a standard JSON Schema document.

Response: 200 OK

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "orders",
  "type": "object",
  "properties": {
    "id": { "type": "integer" },
    "metadata": { "type": ["object", "null"] },
    "tags": {
      "type": "array",
      "items": { "type": "string" }
    }
  },
  "required": ["id", "metadata"]
}

Get Sensing Cache Statistics

GET /pipelines/{name}/sensing/stats

Returns cache performance statistics for schema sensing.

Response: 200 OK

{
  "tables": [
    {
      "table": "orders",
      "cached_structures": 3,
      "max_cache_size": 100,
      "cache_hits": 1450,
      "cache_misses": 50
    }
  ],
  "total_cache_hits": 1450,
  "total_cache_misses": 50,
  "hit_rate": 0.9667
}

Drift Detection

Drift detection compares expected database schema against observed data patterns to detect mismatches, unexpected nulls, and type drift.

Get Drift Results

GET /pipelines/{name}/drift

Returns drift detection results for all tables in a pipeline.

Response: 200 OK

[
  {
    "table": "orders",
    "has_drift": true,
    "columns": [
      {
        "column": "amount",
        "expected_type": "decimal(10,2)",
        "observed_types": ["string"],
        "mismatch_count": 42,
        "examples": ["\"99.99\""]
      }
    ],
    "events_analyzed": 1500,
    "events_with_drift": 42
  }
]

Get Table Drift

GET /pipelines/{name}/drift/{table}

Returns drift detection results for a specific table.

Response: 200 OK

{
  "table": "orders",
  "has_drift": false,
  "columns": [],
  "events_analyzed": 1000,
  "events_with_drift": 0
}

Errors:

  • 404 Not Found - Table not found or no drift data available

Dead Letter Queue

See the DLQ page for full documentation.

Peek DLQ Entries

GET /pipelines/{name}/journal/dlq?limit=50&sink_id=kafka-primary&error_kind=serialization

Returns DLQ entries (oldest first). All query params are optional.

DLQ Count

GET /pipelines/{name}/journal/dlq/count

Response: 200 OK

{"count": 42}

Acknowledge DLQ Entries

POST /pipelines/{name}/journal/dlq/ack
Content-Type: application/json

{"up_to_seq": 42}

Permanently removes entries from the head up to the given sequence number.

Response: 200 OK

{"acked": 12}

Purge DLQ

DELETE /pipelines/{name}/journal/dlq

Response: 200 OK

{"purged": 42}

Checkpoint Inspection

Get Checkpoints

GET /pipelines/{name}/checkpoints

Returns per-sink checkpoint positions and ages.

Response: 200 OK

[
  {"sink_id": "kafka-primary", "position": {"file": "mysql-bin.000005", "pos": 12345}, "age_seconds": 0.3},
  {"sink_id": "redis-cache", "position": {"file": "mysql-bin.000005", "pos": 11000}, "age_seconds": 2.1}
]

System Endpoints

Log Level

GET /log-level

Returns the current RUST_LOG value.

Response: 200 OK

{"level": "deltaforge=info,sources=info,sinks=info,warn"}

Validate Config

POST /validate
Content-Type: application/json

Dry-run validation of a pipeline config without creating it.

Response: 200 OK — config is valid

{"valid": true, "pipeline": "orders-cdc", "source_type": "mysql", "sink_count": 2}

Response: 400 Bad Request — config has errors

{"valid": false, "error": "spec: missing field `processors` at line 7 column 3"}

Error Responses

All error responses return structured JSON:

{
  "code": "PIPELINE_NOT_FOUND",
  "message": "pipeline orders-cdc not found"
}
Status CodeCodeMeaning
400 Bad RequestPIPELINE_NAME_MISMATCHInvalid request body or name mismatch
404 Not FoundPIPELINE_NOT_FOUNDResource doesn’t exist
409 ConflictPIPELINE_ALREADY_EXISTSResource already exists
500 Internal Server ErrorINTERNAL_ERRORUnexpected server error

Pipelines

Each pipeline is created from a single PipelineSpec. The runtime spawns the source, processors, and sinks defined in the spec and coordinates them with batching and checkpointing.

  • 🔄 Live control: pause, resume, or stop pipelines through the REST API without redeploying.
  • 📦 Coordinated delivery: batching and commit policy keep sinks consistent even when multiple outputs are configured.

Lifecycle controls

The REST API addresses pipelines by metadata.name and returns PipeInfo records containing the live spec and status.

  • GET /health - liveness probe.
  • GET /ready - readiness with pipeline states.
  • GET /pipelines - list pipelines.
  • POST /pipelines - create from a full spec.
  • PATCH /pipelines/{name} - merge a partial spec (for example, adjust batch thresholds) and restart the pipeline.
  • POST /pipelines/{name}/pause - pause ingestion and coordination.
  • POST /pipelines/{name}/resume - resume a paused pipeline.
  • POST /pipelines/{name}/stop - stop a running pipeline.

Pausing halts both source ingestion and the coordinator. Resuming re-enables both ends so buffered events can drain cleanly.

Processors

Processors run in the declared order for each batch. The built-in processor type is JavaScript, powered by deno_core.

  • type: javascript
    • id: processor label.
    • inline: JS source. Export a processBatch(events) function that returns the transformed batch.
    • limits (optional): resource guardrails (cpu_ms, mem_mb, timeout_ms).

Batching

The coordinator builds batches using soft thresholds:

  • max_events: flush after this many events.
  • max_bytes: flush after the serialized size reaches this limit.
  • max_ms: flush after this much time has elapsed since the batch started.
  • respect_source_tx: when true, never split a single source transaction across batches.
  • max_inflight: cap the number of batches being processed concurrently.

Commit policy

When multiple sinks are configured, checkpoints can wait for different acknowledgement rules:

  • all: every sink must acknowledge.
  • required (default): only sinks marked required: true must acknowledge; others are best-effort.
  • quorum: checkpoint after at least quorum sinks acknowledge.

Sources

DeltaForge captures database changes through pluggable source connectors. Each source is configured under spec.source with a type field and a config object. Environment variables are expanded before parsing using ${VAR} syntax.

Supported Sources

SourceStatusDescription
mysql✅ ProductionMySQL binlog CDC with GTID support
postgres✅ ProductionPostgreSQL logical replication via pgoutput
turso🔧 BetaTurso/libSQL CDC with multiple modes

Common Behavior

All sources share these characteristics:

  • Checkpointing: Progress is automatically saved and resumed on restart
  • Schema tracking: Table schemas are loaded and fingerprinted for change detection
  • At-least-once delivery: Events may be redelivered after failures; sinks should be idempotent
  • Batching: Events are batched according to the pipeline’s batch configuration
  • Transaction boundaries: respect_source_tx: true (default) keeps source transactions intact

Source Interface

Sources implement a common trait that provides:

#![allow(unused)]
fn main() {
trait Source {
    fn checkpoint_key(&self) -> &str;
    async fn run(&self, tx: Sender<Event>, checkpoint_store: Arc<dyn CheckpointStore>) -> SourceHandle;
}
}

The returned SourceHandle supports pause/resume and graceful cancellation.

Adding Custom Sources

The source interface is pluggable. To add a new source:

  1. Implement the Source trait
  2. Add configuration parsing in deltaforge-config
  3. Register the source type in the pipeline builder

See existing sources for implementation patterns.

MySQL

MySQL source

DeltaForge tails the MySQL binlog to capture row-level changes with automatic checkpointing and schema tracking.

Prerequisites

MySQL Server Configuration

Ensure your MySQL server has binary logging enabled with row-based format:

-- Required server settings (my.cnf or SET GLOBAL)
log_bin = ON
binlog_format = ROW
binlog_row_image = FULL  -- Recommended for complete before-images

If binlog_row_image is not FULL, DeltaForge will warn at startup and before-images on UPDATE/DELETE events may be incomplete.

User Privileges

Create a dedicated replication user with the required grants:

-- Create user with mysql_native_password (required by binlog connector)
CREATE USER 'deltaforge'@'%' IDENTIFIED WITH mysql_native_password BY 'your_password';

-- Replication privileges (required)
GRANT REPLICATION REPLICA, REPLICATION CLIENT ON *.* TO 'deltaforge'@'%';

-- Schema introspection (required for table discovery)
GRANT SELECT, SHOW VIEW ON your_database.* TO 'deltaforge'@'%';

FLUSH PRIVILEGES;

For capturing all databases, grant SELECT on *.* instead.

Configuration

Set spec.source.type to mysql and provide a config object:

FieldTypeRequiredDescription
idstringYesUnique identifier used for checkpoints, server_id derivation, and metrics
dsnstringYesMySQL connection string with replication privileges
tablesarrayNoTable patterns to capture; omit or leave empty to capture all user tables

Table Patterns

The tables field supports flexible pattern matching using SQL LIKE syntax:

tables:
  - shop.orders          # exact match: database "shop", table "orders"
  - shop.order_%         # LIKE pattern: tables starting with "order_" in "shop"
  - analytics.*          # wildcard: all tables in "analytics" database
  - %.audit_log          # cross-database: "audit_log" table in any database
  # omit entirely to capture all user tables (excludes system schemas)

System schemas (mysql, information_schema, performance_schema, sys) are always excluded.

Example

source:
  type: mysql
  config:
    id: orders-mysql
    dsn: ${MYSQL_DSN}
    tables:
      - shop.orders
      - shop.order_items

Resume Behavior

DeltaForge automatically checkpoints progress and resumes from the last position on restart. The resume strategy follows this priority:

  1. GTID: Preferred if the MySQL server has GTID enabled. Provides the most reliable resume across binlog rotations and failovers.
  2. File:position: Used when GTID is not available. Resumes from the exact binlog file and byte offset.
  3. Binlog tail: On first run with no checkpoint, starts from the current end of the binlog (no historical replay).

Checkpoints are stored using the id field as the key.

Snapshot (Initial Load)

DeltaForge can perform a consistent initial snapshot of existing table data before starting binlog streaming. This is the recommended approach for migrating existing tables without manual backfills.

How it works

DeltaForge opens all worker connections simultaneously, each with START TRANSACTION WITH CONSISTENT SNAPSHOT. The binlog position is captured after all workers have started - InnoDB guarantees every visible row was committed at or before that position, so CDC streaming from there has no gaps.

No FLUSH TABLES WITH READ LOCK or RELOAD privilege is required.

Additional privileges

-- Required for snapshot (SELECT is already needed for introspection)
GRANT SELECT ON your_database.* TO 'deltaforge'@'%';

Configuration

source:
  type: mysql
  config:
    id: orders-mysql
    dsn: ${MYSQL_DSN}
    tables:
      - shop.orders
    snapshot:
      mode: initial           # initial | always | never (default: never)
      max_parallel_tables: 8  # tables snapshotted concurrently
      chunk_size: 10000       # rows per chunk for integer-PK tables
FieldDefaultDescription
modeneverinitial: run once if no checkpoint exists; always: re-snapshot on every restart; never: skip
max_parallel_tables8Tables snapshotted concurrently
chunk_size10000Rows per range chunk (integer single-column PK tables only; others do a full scan)

Snapshot events

Snapshot rows are emitted as Op::Read events (Debezium op: "r"), distinguishable from live CDC Op::Create events. The binlog position captured at snapshot time becomes the CDC resume point, so no rows are missed or duplicated.

Resume after interruption

If the snapshot is interrupted, DeltaForge resumes at table granularity on the next restart - already-completed tables are skipped.

Binlog retention safety

DeltaForge validates binlog retention before starting a snapshot and monitors it throughout. This prevents the silent failure mode where a long snapshot completes successfully but CDC startup then fails because the captured binlog position was purged.

Preflight checks (before any rows are read):

  • Fails hard if log_bin=0 or binlog_format != ROW
  • Estimates snapshot duration from table sizes and max_parallel_tables
  • Warns at ≥50% of binlog_expire_logs_seconds usage; HIGH RISK at ≥80%

During snapshot:

  • Background task polls SHOW BINARY LOGS every 30s
  • Cancels the snapshot immediately if the captured file is purged

After all tables complete:

  • Synchronous final check before writing finished=true
  • finished=true means the position is confirmed valid for CDC resume, not just that rows were emitted

If you see retention risk warnings, the recommended actions are:

  1. Increase binlog_expire_logs_seconds to cover the estimated snapshot duration
  2. Use a read replica as the snapshot source to avoid load on the primary

Server ID Handling

MySQL replication requires each replica to have a unique server_id. DeltaForge derives this automatically from the source id using a CRC32 hash:

server_id = 1 + (CRC32(id) % 4,000,000,000)

When running multiple DeltaForge instances against the same MySQL server, ensure each has a unique id to avoid server_id conflicts.

Schema Tracking

DeltaForge has a built-in schema registry to track table schemas, per source. For MySQL source:

  • Schemas are preloaded at startup by querying INFORMATION_SCHEMA
  • Each schema is fingerprinted using SHA-256 for change detection
  • Events carry schema_version (fingerprint) and schema_sequence (monotonic counter)
  • Schema-to-checkpoint correlation enables reliable replay

Schema changes (DDL) trigger automatic reload of affected table schemas.

Timeouts and Heartbeats

BehaviorValueDescription
Heartbeat interval15sServer sends heartbeat if no events
Read timeout90sMaximum wait for next binlog event
Inactivity timeout60sTriggers reconnect if no data received
Connect timeout30sMaximum time to establish connection

These values are currently fixed. Reconnection uses exponential backoff with jitter.

Event Format

Each captured row change produces an event with:

  • op: insert, update, or delete
  • before: Previous row state (updates and deletes only, requires binlog_row_image = FULL)
  • after: New row state (inserts and updates only)
  • table: Fully qualified table name (database.table)
  • tx_id: GTID if available
  • checkpoint: Binlog position for resume
  • schema_version: Schema fingerprint
  • schema_sequence: Monotonic sequence for schema correlation

Troubleshooting

Connection Issues

If you see authentication errors mentioning mysql_native_password:

ALTER USER 'deltaforge'@'%' IDENTIFIED WITH mysql_native_password BY 'password';

Missing Before-Images

If UPDATE/DELETE events have incomplete before data:

SET GLOBAL binlog_row_image = 'FULL';

Binlog Not Enabled

Check binary logging status:

SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
SHOW BINARY LOG STATUS;  -- or SHOW MASTER STATUS on older versions

Privilege Issues

Verify grants for your user:

SHOW GRANTS FOR 'deltaforge'@'%';

Required grants include REPLICATION REPLICA and REPLICATION CLIENT.

PostgreSQL

PostgreSQL source

DeltaForge captures row-level changes from PostgreSQL using logical replication with the pgoutput plugin.

Prerequisites

PostgreSQL Server Configuration

Enable logical replication in postgresql.conf:

# Required settings
wal_level = logical
max_replication_slots = 10    # At least 1 per DeltaForge pipeline
max_wal_senders = 10          # At least 1 per DeltaForge pipeline

Restart PostgreSQL after changing these settings.

User Privileges

Create a replication user with the required privileges:

-- Create user with replication capability
CREATE ROLE deltaforge WITH LOGIN REPLICATION PASSWORD 'your_password';

-- Grant connect access
GRANT CONNECT ON DATABASE your_database TO deltaforge;

-- Grant schema usage and table access for schema introspection
GRANT USAGE ON SCHEMA public TO deltaforge;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO deltaforge;

-- For automatic publication/slot creation (optional)
-- If you prefer manual setup, skip this and create them yourself
ALTER ROLE deltaforge SUPERUSER;  -- Or use manual setup below

pg_hba.conf

Ensure your pg_hba.conf allows replication connections:

# TYPE  DATABASE        USER            ADDRESS                 METHOD
host    replication     deltaforge      0.0.0.0/0               scram-sha-256
host    your_database   deltaforge      0.0.0.0/0               scram-sha-256

Replication Slot and Publication

DeltaForge can automatically create the replication slot and publication on first run. Alternatively, create them manually:

-- Create publication for specific tables
CREATE PUBLICATION my_pub FOR TABLE public.orders, public.order_items;

-- Or for all tables
CREATE PUBLICATION my_pub FOR ALL TABLES;

-- Create replication slot
SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput');

Replica Identity

For complete before-images on UPDATE and DELETE operations, set tables to REPLICA IDENTITY FULL:

ALTER TABLE public.orders REPLICA IDENTITY FULL;
ALTER TABLE public.order_items REPLICA IDENTITY FULL;

Without this setting:

  • FULL: Complete row data in before-images
  • DEFAULT (primary key): Only primary key columns in before-images
  • NOTHING: No before-images at all

DeltaForge warns at startup if tables don’t have REPLICA IDENTITY FULL.

Configuration

Set spec.source.type to postgres and provide a config object:

FieldTypeRequiredDefaultDescription
idstringYesUnique identifier for checkpoints and metrics
dsnstringYesPostgreSQL connection string
slotstringYesReplication slot name
publicationstringYesPublication name
tablesarrayYesTable patterns to capture
start_positionstring/objectNoearliestWhere to start when no checkpoint exists

DSN Formats

DeltaForge accepts both URL-style and key=value DSN formats:

# URL style
dsn: "postgres://user:pass@localhost:5432/mydb"

# Key=value style
dsn: "host=localhost port=5432 user=deltaforge password=pass dbname=mydb"

Table Patterns

The tables field supports flexible pattern matching:

tables:
  - public.orders          # exact match: schema "public", table "orders"
  - public.order_%         # LIKE pattern: tables starting with "order_"
  - myschema.*             # wildcard: all tables in "myschema"
  - %.audit_log            # cross-schema: "audit_log" table in any schema
  - orders                 # defaults to public schema: "public.orders"

System schemas (pg_catalog, information_schema, pg_toast) are always excluded.

Start Position

Controls where replication begins when no checkpoint exists:

# Start from the earliest available position (slot's restart_lsn)
start_position: earliest

# Start from current WAL position (skip existing data)
start_position: latest

# Start from a specific LSN
start_position:
  lsn: "0/16B6C50"

Example

source:
  type: postgres
  config:
    id: orders-postgres
    dsn: ${POSTGRES_DSN}
    slot: deltaforge_orders
    publication: orders_pub
    tables:
      - public.orders
      - public.order_items
    start_position: earliest

Resume Behavior

DeltaForge checkpoints progress using PostgreSQL’s LSN (Log Sequence Number):

  1. With checkpoint: Resumes from the stored LSN
  2. Without checkpoint: Uses the slot’s confirmed_flush_lsn or restart_lsn
  3. New slot: Starts from pg_current_wal_lsn() or the configured start_position

Checkpoints are stored using the id field as the key.

Snapshot (Initial Load)

DeltaForge performs a consistent initial snapshot using PostgreSQL’s exported snapshot mechanism before starting logical replication.

How it works

A coordinator connection exports a snapshot and captures the current WAL LSN in a single round trip. Worker connections each import the shared snapshot into their own REPEATABLE READ transaction - all workers see the same consistent DB state with no locks held on the source.

Tables with a single integer primary key use PK-range chunking. All others fall back to ctid page-range chunking.

Configuration

source:
  type: postgres
  config:
    id: orders-postgres
    dsn: ${POSTGRES_DSN}
    slot: deltaforge_orders
    publication: orders_pub
    tables:
      - public.orders
    snapshot:
      mode: initial           # initial | always | never (default: never)
      max_parallel_tables: 8  # tables snapshotted concurrently
      chunk_size: 10000       # rows per chunk for integer-PK tables
FieldDefaultDescription
modeneverinitial: run once if no checkpoint exists; always: re-snapshot on every restart; never: skip
max_parallel_tables8Tables snapshotted concurrently
chunk_size10000Rows per range chunk (integer PK tables only; others use ctid chunking)

Snapshot events

Snapshot rows are emitted as Op::Read events (Debezium op: "r"), distinguishable from live CDC Op::Create events. The WAL LSN captured at snapshot time becomes the CDC resume point - no rows are missed or duplicated.

Resume after interruption

If the snapshot is interrupted, DeltaForge resumes at table granularity on the next restart - already-completed tables are skipped.

WAL slot retention safety

DeltaForge validates replication slot health before starting a snapshot and monitors it throughout. This prevents the slot from being invalidated during a long snapshot, which would make the captured LSN unreachable for CDC resume.

Preflight checks (before any rows are read):

  • Fails hard if the slot does not exist or is already invalidated
  • Fails hard if wal_status=lost
  • Warns if wal_status=unreserved (WAL retention no longer guaranteed)
  • Estimates WAL generated during snapshot (~2× data size) against max_slot_wal_keep_size; warns at ≥50%, HIGH RISK at ≥80%

During snapshot:

  • Background task polls pg_replication_slots every 30s
  • Cancels immediately on slot invalidation or disappearance
  • Warns but continues on wal_status=unreserved

After all tables complete:

  • Synchronous final check before writing finished=true
  • finished=true means the position is confirmed valid for CDC resume, not just that rows were emitted

If you see WAL retention risk warnings:

ALTER SYSTEM SET max_slot_wal_keep_size = '10GB';
SELECT pg_reload_conf();

Type Handling

DeltaForge preserves PostgreSQL’s native type semantics:

PostgreSQL TypeJSON Representation
booleantrue / false
integer, bigintJSON number
real, double precisionJSON number
numericJSON string (preserves precision)
text, varcharJSON string
json, jsonbParsed JSON object/array
bytea{"_base64": "..."}
uuidJSON string
timestamp, date, timeISO 8601 string
Arrays (int[], text[], etc.)JSON array
TOAST unchanged{"_unchanged": true}

Event Format

Each captured row change produces an event with:

  • op: insert, update, delete, or truncate
  • before: Previous row state (updates and deletes, requires appropriate replica identity)
  • after: New row state (inserts and updates)
  • table: Fully qualified table name (schema.table)
  • tx_id: PostgreSQL transaction ID (xid)
  • checkpoint: LSN position for resume
  • schema_version: Schema fingerprint
  • schema_sequence: Monotonic sequence for schema correlation

WAL Management

Logical replication slots prevent WAL segments from being recycled until the consumer confirms receipt. To avoid disk space issues:

  1. Monitor slot lag: Check pg_replication_slots.restart_lsn vs pg_current_wal_lsn()
  2. Set retention limits: Configure max_slot_wal_keep_size (PostgreSQL 13+)
  3. Handle stale slots: Drop unused slots with pg_drop_replication_slot('slot_name')
-- Check slot status and lag
SELECT slot_name, 
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;

Troubleshooting

Connection Issues

If you see authentication errors:

-- Verify user has replication privilege
SELECT rolname, rolreplication FROM pg_roles WHERE rolname = 'deltaforge';

-- Check pg_hba.conf allows replication connections
-- Ensure the line type includes "replication" database

Missing Before-Images

If UPDATE/DELETE events have incomplete before data:

-- Check current replica identity
SELECT relname, relreplident 
FROM pg_class 
WHERE relname = 'your_table';
-- d = default, n = nothing, f = full, i = index

-- Set to FULL for complete before-images
ALTER TABLE your_table REPLICA IDENTITY FULL;

Slot/Publication Not Found

-- List existing publications
SELECT * FROM pg_publication;

-- List existing slots
SELECT * FROM pg_replication_slots;

-- Create if missing
CREATE PUBLICATION my_pub FOR TABLE public.orders;
SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput');

WAL Disk Usage Growing

-- Check slot lag
SELECT slot_name, 
       active,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;

-- If slot is inactive and not needed, drop it
SELECT pg_drop_replication_slot('unused_slot');

Logical Replication Not Enabled

-- Check wal_level
SHOW wal_level;  -- Should be 'logical'

-- If not, update postgresql.conf and restart PostgreSQL
-- wal_level = logical

Turso Source

⚠️ STATUS: EXPERIMENTAL / PAUSED

The Turso source is not yet ready for production use. Native CDC in Turso/libSQL is still evolving and has limitations:

  • CDC is per-connection (only changes from the enabling connection are captured)
  • File locking prevents concurrent access
  • sqld Docker image doesn’t have CDC support yet

This documentation is retained for reference. The code exists but is not officially supported.


The Turso source captures changes from Turso and libSQL databases. It supports multiple CDC modes to work with different database configurations and Turso versions.

CDC Modes

DeltaForge supports four CDC modes for Turso/libSQL:

ModeDescriptionRequirements
nativeUses Turso’s built-in CDC via turso_cdc tableTurso v0.1.2+ with CDC enabled
triggersShadow tables populated by database triggersStandard SQLite/libSQL
pollingTracks changes via rowid comparisonAny SQLite/libSQL (inserts only)
autoAutomatic fallback: native → triggers → pollingAny

Native Mode

Native mode uses Turso’s built-in CDC capabilities. This is the most efficient mode when available.

Requirements:

  • Turso database with CDC enabled
  • Turso server v0.1.2 or later

How it works:

  1. Queries the turso_cdc system table for changes
  2. Uses bin_record_json_object() to extract row data as JSON
  3. Tracks position via change ID in checkpoints

Triggers Mode

Triggers mode uses shadow tables and database triggers to capture changes. This works with standard SQLite and libSQL without requiring native CDC support.

How it works:

  1. Creates shadow tables (_df_cdc_{table}) for each tracked table
  2. Installs INSERT/UPDATE/DELETE triggers that write to shadow tables
  3. Polls shadow tables for new change records
  4. Cleans up processed records periodically

Polling Mode

Polling mode uses rowid tracking to detect new rows. This is the simplest mode but only captures inserts (not updates or deletes).

How it works:

  1. Tracks the maximum rowid seen per table
  2. Queries for rows with rowid greater than last seen
  3. Emits insert events for new rows

Limitations:

  • Only captures INSERT operations
  • Cannot detect UPDATE or DELETE
  • Requires tables to have accessible rowid (not WITHOUT ROWID tables)

Auto Mode

Auto mode tries each CDC mode in order and uses the first one that works:

  1. Try native mode (check for turso_cdc table)
  2. Try triggers mode (check for existing CDC triggers)
  3. Fall back to polling mode

This is useful for deployments where the database capabilities may vary.

Configuration

source:
  type: turso
  config:
    id: turso-main
    url: "libsql://your-db.turso.io"
    auth_token: "${TURSO_AUTH_TOKEN}"
    tables: ["users", "orders", "order_items"]
    cdc_mode: auto
    poll_interval_ms: 1000
    native_cdc:
      level: data

Configuration Options

FieldTypeRequiredDefaultDescription
idstringYesLogical identifier for metrics and logging
urlstringYesDatabase URL (libsql://, http://, or file path)
auth_tokenstringNoAuthentication token for Turso cloud
tablesarrayYesTables to track (supports wildcards)
cdc_modestringNoautoCDC mode: native, triggers, polling, auto
poll_interval_msintegerNo1000Polling interval in milliseconds
native_cdc.levelstringNodataNative CDC level: binlog or data

Table Patterns

The tables field supports patterns:

tables:
  - users              # Exact match
  - order%             # Prefix match (order, orders, order_items)
  - "*"                # All tables (excluding system tables)

System tables and DeltaForge infrastructure tables are automatically excluded:

  • sqlite_* — SQLite system tables
  • _df_* — DeltaForge CDC shadow tables
  • _litestream* — Litestream replication tables
  • _turso* — Turso internal tables
  • turso_cdc — Turso CDC system table

Native CDC Levels

When using native mode, you can choose the CDC level:

LevelDescription
dataOnly row data changes (default, more efficient)
binlogFull binlog-style events with additional metadata

Examples

Local Development

source:
  type: turso
  config:
    id: local-dev
    url: "http://127.0.0.1:8080"
    tables: ["users", "orders"]
    cdc_mode: auto
    poll_interval_ms: 500

Turso Cloud

source:
  type: turso
  config:
    id: turso-prod
    url: "libsql://mydb-myorg.turso.io"
    auth_token: "${TURSO_AUTH_TOKEN}"
    tables: ["*"]
    cdc_mode: native
    poll_interval_ms: 1000

SQLite File (Polling Only)

source:
  type: turso
  config:
    id: sqlite-file
    url: "file:./data/myapp.db"
    tables: ["events", "audit_log"]
    cdc_mode: polling
    poll_interval_ms: 2000

Checkpoints

Turso checkpoints track position differently depending on the CDC mode:

{
  "mode": "native",
  "last_change_id": 12345,
  "table_positions": {}
}

For polling mode, positions are tracked per-table:

{
  "mode": "polling",
  "last_change_id": null,
  "table_positions": {
    "users": 1000,
    "orders": 2500
  }
}

Schema Loading

The Turso source includes a schema loader that:

  • Queries PRAGMA table_info() for column metadata
  • Detects SQLite type affinities (INTEGER, TEXT, REAL, BLOB, NUMERIC)
  • Identifies primary keys and autoincrement columns
  • Handles WITHOUT ROWID tables
  • Checks for existing CDC triggers

Schema information is available via the REST API at /pipelines/{name}/schemas.

Notes

  • WITHOUT ROWID tables: Polling mode cannot track WITHOUT ROWID tables. Use triggers or native mode instead.
  • Type affinity: SQLite uses type affinity rather than strict types. The schema loader maps declared types to SQLite affinities.
  • Trigger cleanup: In triggers mode, processed change records are cleaned up automatically based on checkpoint position.
  • Connection handling: The source maintains a single connection and reconnects automatically on failure.

Processors

Processors can transform, modify or take extra action per event batch between the source and sinks. They run in the order listed in spec.processors, and each receives the output of the previous one. A processor can modify events, filter them out, or add routing metadata.

Available Processors

ProcessorDescription
javascriptCustom transformations using V8-powered JavaScript
outboxTransactional outbox pattern - extracts payload, resolves topic, sets routing headers
flattenFlatten nested JSON objects into conmbined keys
filterDrop events by op type, table pattern, or field value

JavaScript

Run arbitrary JavaScript against each event batch. Uses the V8 engine via deno_core for near-native speed with configurable resource limits.

processors:
  - type: javascript
    id: enrich
    inline: |
      function processBatch(events) {
        return events.map(e => {
          e.tags = ["processed"];
          return e;
        });
      }
    limits:
      cpu_ms: 50
      mem_mb: 128
      timeout_ms: 500
FieldTypeDefaultDescription
idstring(required)Processor identifier
inlinestring(required)JavaScript source code
limits.cpu_msint50CPU time limit per batch
limits.mem_mbint128V8 heap memory limit
limits.timeout_msint500Wall-clock timeout per batch

Performance

JavaScript processing adds a fixed overhead per batch plus linear scaling per event. Benchmarks show ~70K+ events/sec throughput with typical transforms — roughly 61× slower than native Rust processors, but sufficient for most workloads. If you need higher throughput, keep transform logic simple or move heavy processing downstream.

Tips

  • Return an empty array to drop all events in a batch.
  • Return multiple copies of an event to fan out.
  • JavaScript numbers are f64 — integer columns wider than 53 bits may lose precision. Use string representation for such values.

Outbox

The outbox processor transforms events captured by the outbox pattern into routed, sink-ready events. It extracts business fields from the raw outbox payload, resolves the destination topic, and passes through all non-outbox events unchanged.

processors:
  - type: outbox
    topic: "${aggregate_type}.${event_type}"
    default_topic: "events.unrouted"
FieldTypeDefaultDescription
idstring"outbox"Processor identifier
tablesarray[]Only process outbox events matching these patterns. Empty = all.
topicstringTopic template with ${field} placeholders (resolved against raw payload columns)
default_topicstringFallback when template resolution fails
columnsobject(defaults below)Field name mappings
additional_headersmap{}Forward extra payload fields as routing headers. Key = header name, value = column name.
raw_payloadboolfalseDeliver payload as-is, bypassing envelope wrapping

Column Defaults

KeyDefaultDescription
payload"payload"Field containing the event body
aggregate_type"aggregate_type"Domain aggregate type
aggregate_id"aggregate_id"Aggregate identifier
event_type"event_type"Domain event type
topic"topic"Optional explicit topic override in payload

See the Outbox Pattern guide for source configuration, complete examples, and multi-outbox routing.

Flatten

Flattens nested JSON objects in event payloads into top-level keys joined by a configurable separator. Works on every object-valued field present on the event without assuming any particular envelope structure - CDC before/after, outbox business payloads, or any custom fields introduced by upstream processors are all handled uniformly.

processors:
  - type: flatten
    id: flat
    separator: "__"
    max_depth: 3
    on_collision: last
    empty_object: preserve
    lists: preserve
    empty_list: drop

Input:

{
  "after": {
    "order_id": "abc",
    "customer": {
      "id": 1,
      "address": { "city": "Berlin", "zip": "10115" }
    },
    "tags": ["vip"],
    "meta": {}
  }
}

Output (with defaults — separator: "__", empty_object: preserve, lists: preserve):

{
  "after": {
    "order_id": "abc",
    "customer__id": 1,
    "customer__address__city": "Berlin",
    "customer__address__zip": "10115",
    "tags": ["vip"],
    "meta": {}
  }
}

Configuration

FieldTypeDefaultDescription
idstring"flatten"Processor identifier
separatorstring"__"Separator inserted between path segments
max_depthintunlimitedStop recursing at this depth; objects at the boundary are kept as opaque leaves
on_collisionstringlastWhat to do when two paths produce the same key. last, first, or error
empty_objectstringpreserveHow to handle {} values. preserve, drop, or null
listsstringpreserveHow to handle array values. preserve (keep as-is) or index (expand to field__0, field__1, …)
empty_liststringpreserveHow to handle [] values. preserve, drop, or null

max_depth in practice

max_depth counts nesting levels from the top of the payload object. Objects at the boundary are treated as opaque leaves rather than expanded further, still subject to empty_object policy.

# max_depth: 2

depth 0: customer               -> object, recurse
depth 1: customer__address      -> object, recurse
depth 2: customer__address__geo -> STOP, kept as leaf {"lat": 52.5, "lng": 13.4}

Without max_depth, a deeply nested or recursive payload could produce a large number of keys. Setting a limit is recommended for payloads with variable or unknown nesting depth.

Collision policy

A collision occurs when two input paths produce the same flattened key - typically when a payload already contains a pre-flattened key (e.g. "a__b": 1) alongside a nested object ("a": {"b": 2}).

  • last - the last path to write a key wins (default, never fails)
  • first - the first path to write a key wins, subsequent writes are ignored
  • error - the batch fails immediately, useful in strict pipelines where collisions indicate a schema problem

Working with outbox payloads

After the outbox processor runs, event.after holds the extracted business payload - there is no before. The flatten processor handles this naturally since it operates on whatever fields are present:

processors:
  - type: outbox
    topic: "${aggregate_type}.${event_type}"
  - type: flatten
    id: flat
    separator: "."
    empty_list: drop

Envelope interaction

The flatten processor runs on the raw Event struct before sink delivery. Envelope wrapping happens inside the sink, after all processors have run. This means the envelope always wraps already-flattened data, no special configuration needed.

Source → [flatten processor] → Sink (envelope → bytes)

All envelope formats work as expected:

// Native
{ "before": null, "after": { "customer__id": 1, "customer__address__city": "Berlin" }, "op": "c" }

// Debezium
{ "payload": { "before": null, "after": { "customer__id": 1, "customer__address__city": "Berlin" }, "op": "c" } }

// CloudEvents
{ "specversion": "1.0", ..., "data": { "before": null, "after": { "customer__id": 1, "customer__address__city": "Berlin" } } }

Outbox + raw_payload: true

When the outbox processor is configured with raw_payload: true, the sink delivers event.after directly, bypassing the envelope entirely. If the flatten processor runs after outbox, the raw payload delivered to the sink is the flattened object — which is the intended behavior for analytics sinks that can’t handle nested JSON.

processors:
  - type: outbox
    topic: "${aggregate_type}.${event_type}"
    raw_payload: true       # sink delivers event.after directly, no envelope
  - type: flatten
    id: flat
    separator: "__"
    empty_list: drop

The flatten processor runs second, so by the time the sink delivers the raw payload it is already flat.

Analytics sink example

When sending to column-oriented sinks (ClickHouse, BigQuery, S3 Parquet) that don’t handle nested JSON:

processors:
  - type: flatten
    id: flat
    separator: "__"
    lists: index          # expand arrays to indexed columns
    empty_object: drop    # remove sparse marker objects
    empty_list: drop      # remove empty arrays

Filter

Drops events that do not pass configured criteria. Each criterion is independent — omit any of them to skip that check entirely. An event must pass all configured checks to be forwarded.

processors:
  - type: filter
    id: only-active-orders
    ops: [create, update]
    tables:
      include: ["shop.orders"]
      exclude: ["*.tmp"]
    fields:
      - path: status
        op: eq
        value: "active"
      - path: total
        op: gte
        value: 100
    match: all
FieldTypeDefaultDescription
idstring"filter"Processor identifier
opslist[] (all)Operation types to keep: create, update, delete, read, truncate
tables.includelist[] (all)Table patterns to include. Uses AllowList glob syntax: db.table, shop.*, *.orders
tables.excludelist[] (none)Table patterns to exclude. Applied after include; takes priority
fieldslist[] (skip)Predicates evaluated against event.after
matchstringallHow to combine multiple field predicates: all (every predicate must pass) or any (at least one must pass)

Field operators

The path field is a dot-separated path into event.after, e.g. "status" or "order.total".

OpvalueDescription
eqscalarField equals value
nescalarField does not equal value
existsField is present and non-null
not_existsField is absent or null
gt / gtenumber or stringGreater than / greater than or equal
lt / ltenumber or stringLess than / less than or equal
inarrayField value is one of the items in the array
not_inarrayField value is not in the array
containsscalarString field contains the substring, or array field contains the element
changedField value differs between event.before and event.after. Creates and Deletes always pass (no pair to compare)
regexstringString field matches the regex pattern. Compiled once at startup; invalid patterns fail pipeline initialization

Notes

Numeric equalityeq and in compare integers and floats by value, so 42 and 42.0 are considered equal. This matters when events have passed through the JavaScript processor, which converts all numbers to f64.

not_in with a missing field — if the field is absent from the event, the event passes. Absence is not membership in any set.

regex on non-string fields — silently does not match. Use exists first if the field may be absent or non-string.

changed and the before image — the predicate reads event.before, which is only populated for update operations from sources configured with full row images. Verify your source has REPLICA IDENTITY FULL (PostgreSQL) or binlog_row_image = FULL (MySQL) before using changed in production.

Performance

The filter processor is pure Rust with no serialization overhead. Op and table checks are O(1) to O(patterns). Field predicate evaluation reads event.after directly. Regex patterns are compiled once at construction time. Put a filter early in the processor chain to reduce the batch size before heavier processors (JavaScript, flatten) run.

Examples

Drop deletes and snapshot reads:

- type: filter
  id: no-deletes
  ops: [create, update]

Alert stream - pass if status is failed OR retry count exhausted:

- type: filter
  id: alert-worthy
  match: any
  fields:
    - path: status
      op: eq
      value: "failed"
    - path: retry_count
      op: gte
      value: 3

Filter before enrichment to avoid paying JavaScript overhead on the full stream:

processors:
  - type: filter
    id: low-stock
    ops: [create, update]
    tables:
      include: ["inventory.products"]
    fields:
      - path: stock_qty
        op: lt
        value: 10
  - type: javascript
    id: enrich
    inline: |
      function processBatch(events) {
        return events.map(e => {
          e.after.alert_level = e.after.stock_qty === 0 ? "critical" : "warning";
          return e;
        });
      }

Only forward rows where a status column actually changed value (suppresses no-op updates):

- type: filter
  id: real-status-changes
  ops: [update]
  fields:
    - path: status
      op: changed

Processor Chain

Processors execute in order. Events flow through each processor sequentially:

Source → [Processor 1] → [Processor 2] → ... → Sinks

Each processor receives a Vec<Event> and returns a Vec<Event>. This means processors can:

  • Transform: Modify event fields in place
  • Filter: Return a subset of events (drop unwanted ones)
  • Fan-out: Return more events than received
  • Route: Set event.routing.topic to override sink defaults
  • Enrich: Add headers via event.routing.headers

Non-outbox events always pass through the outbox processor untouched, so it is safe to combine it with JavaScript processors in any order.

Adding Custom Processors

The processor interface is a simple trait:

#![allow(unused)]
fn main() {
#[async_trait]
pub trait Processor: Send + Sync {
    fn id(&self) -> &str;
    async fn process(&self, events: Vec<Event>) -> Result<Vec<Event>>;
}
}

To add a new processor:

  1. Implement the Processor trait in crates/processors
  2. Add a config variant to ProcessorCfg in deltaforge-config
  3. Register the build step in build_processors()

Sinks

Sinks receive batches from the coordinator after processors run. Each sink lives under spec.sinks and can be marked as required or best-effort via the required flag. Checkpoint behavior is governed by the pipeline’s commit policy.

Envelope and Encoding

All sinks support configurable envelope formats and wire encodings. See the Envelopes and Encodings page for detailed documentation.

OptionValuesDefaultDescription
envelopenative, debezium, cloudeventsnativeOutput JSON structure
encodingjson, avrojsonWire format (avro requires Schema Registry)

Quick example:

sinks:
  - type: kafka
    config:
      id: events-kafka
      brokers: localhost:9092
      topic: events
      envelope:
        type: cloudevents
        type_prefix: "com.example.cdc"
      encoding: json

Available Sinks

SinkDescription
kafkaKafka producer sink
natsNATS JetStream sink
redisRedis stream sink
httpHTTP/Webhook sink

Multiple sinks in one pipeline

You can combine multiple sinks in one pipeline to fan out events to different destinations. However, multi-sink pipelines introduce complexity that requires careful consideration.

Why multiple sinks are challenging

Different performance characteristics: Kafka might handle 100K events/sec while a downstream HTTP webhook processes 100/sec. The slowest sink becomes the bottleneck for the entire pipeline.

Independent failure modes: Each sink can fail independently. Redis might be healthy while Kafka experiences broker failures. Without proper handling, a single sink failure could block the entire pipeline or cause data loss.

No distributed transactions: DeltaForge cannot atomically commit across heterogeneous systems. If Kafka succeeds but Redis fails mid-batch, you face a choice: retry Redis (risking duplicates in Kafka) or skip Redis (losing data there).

Checkpoint semantics: The checkpoint represents “how far we’ve processed from the source.” With multiple sinks, when is it safe to advance? After one sink succeeds? All of them? A majority?

Read the required and commit_policy sections below for options to manage these challenges.

The required flag

The required flag on each sink determines whether that sink must acknowledge successful delivery before the checkpoint advances:

sinks:
  - type: kafka
    config:
      id: primary-kafka
      required: true    # Must succeed for checkpoint to advance
      
  - type: redis
    config:
      id: cache-redis
      required: false   # Best-effort; failures don't block checkpoint

When required: true (default): The sink must acknowledge the batch before the checkpoint can advance. If this sink fails, the pipeline blocks and retries until it succeeds or the operator intervenes.

When required: false: The sink is best-effort. Failures are logged but don’t prevent the checkpoint from advancing. Use this for non-critical destinations where some data loss is acceptable.

Commit policy

The commit_policy works with the required flag to determine checkpoint behavior:

PolicyBehavior
allEvery sink (regardless of required flag) must acknowledge
requiredOnly sinks with required: true must acknowledge (default)
quorumAt least N sinks must acknowledge
commit_policy:
  mode: required   # Only wait for required sinks

sinks:
  - type: kafka
    config:
      required: true   # Checkpoint waits for this
  - type: redis  
    config:
      required: false  # Checkpoint doesn't wait for this
  - type: nats
    config:
      required: true   # Checkpoint waits for this

Per-sink independent checkpoints

Each sink maintains its own checkpoint, committed independently after successful delivery. This means:

  • Faster sinks are not held back by slower ones — each sink advances its own checkpoint
  • The source replays from the minimum checkpoint across all sinks, so a slow sink only causes replay for itself, not re-delivery to sinks that are already ahead
  • Adding a new sink to an existing pipeline triggers replay from the source’s earliest position for that sink only; existing sinks are unaffected
  • Removing a sink cleans up its checkpoint automatically on the next pipeline patch

This architecture avoids the common CDC pitfall where the slowest sink becomes a bottleneck for all other sinks.

Delivery guarantee tiers

SinkGuaranteeMechanismConsumer action
Kafka (exactly_once: true)End-to-end exactly-onceKafka transactions (two-phase commit)Set isolation.level=read_committed
Kafka (exactly_once: false)At-least-once (idempotent)Retries deduped; crash-replay produces duplicatesDedup by event ID
NATS JetStreamAt-least-once + server dedupNats-Msg-Id header within duplicate_windowConfigure duplicate_window
Redis StreamsAt-least-once + consumer dedupidempotency_key field in XADD payloadCheck key before processing
HTTP/WebhookAt-least-onceRetry on 5xx/timeout; no server-side dedupConsumer must be idempotent (use event id)

“Exactly-once” means DeltaForge guarantees no duplicates without consumer cooperation. All other sinks are “at-least-once” with a stated dedup mechanism.

Practical patterns

Primary + secondary: One critical sink (Kafka for durability) marked required: true, with secondary sinks (Redis for caching, testing or experimentation) marked required: false.

Quorum for redundancy: Three sinks with commit_policy.mode: quorum and quorum: 2. Checkpoint advances when any two succeed, providing fault tolerance.

All-or-nothing: Use commit_policy.mode: all when every destination is critical and you need the strongest consistency guarantee (but affecting rate of delivery).

Multi-format fan-out

For sending the same events to different consumers that expect different formats:

sinks:
  # Kafka Connect expects Debezium format
  - type: kafka
    config:
      id: connect-sink
      brokers: ${KAFKA_BROKERS}
      topic: connect-events
      envelope:
        type: debezium
      required: true

  # Lambda expects CloudEvents
  - type: kafka
    config:
      id: lambda-sink
      brokers: ${KAFKA_BROKERS}
      topic: lambda-events
      envelope:
        type: cloudevents
        type_prefix: "com.acme.cdc"
      required: false

  # Analytics wants raw events
  - type: redis
    config:
      id: analytics-redis
      uri: ${REDIS_URI}
      stream: analytics
      envelope:
        type: native
      required: false

This allows each consumer to receive events in their preferred format without post-processing.

Redis

Redis sink

The Redis sink publishes events to a Redis Stream for real-time consumption with consumer groups.

When to use Redis

Redis Streams shine when you need low-latency event delivery with simple operational requirements and built-in consumer group support.

Real-world applications

Use CaseDescription
Real-time notificationsPush database changes instantly to WebSocket servers for live UI updates
Cache invalidationTrigger cache eviction when source records change; keep Redis cache consistent
Session synchronizationReplicate user session changes across application instances in real-time
Rate limiting stateStream counter updates for distributed rate limiting decisions
Live dashboardsFeed real-time metrics and KPIs to dashboard backends
Job queuingUse CDC events to trigger background job processing with consumer groups
Feature flagsPropagate feature flag changes instantly across all application instances

Pros and cons

ProsCons
Ultra-low latency - Sub-millisecond publish; ideal for real-time appsMemory-bound - All data in RAM; expensive for high-volume retention
Simple operations - Single binary, minimal configurationLimited retention - Not designed for long-term event storage
Consumer groups - Built-in competing consumers with acknowledgementsDurability trade-offs - AOF/RDB persistence has limitations
Familiar tooling - redis-cli, widespread client library supportSingle-threaded - CPU-bound for very high throughput
Versatile - Combine with caching, pub/sub, and data structuresNo native replay - XRANGE exists but no offset management
Atomic operations - MULTI/EXEC for transactional guaranteesCluster complexity - Sharding requires careful key design

Configuration

sinks:
  - type: redis
    config:
      id: orders-redis
      uri: ${REDIS_URI}
      stream: orders.events
      required: true
FieldTypeDefaultDescription
idstringSink identifier
uristringRedis connection URI
streamstringRedis stream key
requiredbooltrueGates checkpoints

Idempotency

Every XADD command includes an idempotency_key field in the stream entry payload. This key is deterministic — the same source event always produces the same key, even across replays.

Redis Streams does not deduplicate server-side (entry IDs are auto-generated), so consumers are responsible for dedup using this field. A simple approach:

# Check idempotency_key before processing
seen = redis.smembers("processed_keys")
if data[b"idempotency_key"] in seen:
    r.xack(stream, group, msg_id)  # Skip duplicate
    continue
process(event)
redis.sadd("processed_keys", data[b"idempotency_key"])

This makes Redis a Tier 2 delivery guarantee: at-least-once with idempotency keys for consumer-side dedup.

Consuming events

# Create consumer group
redis-cli XGROUP CREATE orders.events mygroup $ MKSTREAM

# Read as consumer (blocking)
redis-cli XREADGROUP GROUP mygroup consumer1 BLOCK 5000 COUNT 10 STREAMS orders.events >

# Acknowledge processing
redis-cli XACK orders.events mygroup 1234567890123-0

# Check pending (unacknowledged) messages
redis-cli XPENDING orders.events mygroup

Simple subscription

# Read latest entries
redis-cli XREAD COUNT 10 STREAMS orders.events 0-0

# Block for new entries
redis-cli XREAD BLOCK 5000 STREAMS orders.events $

Go consumer example

import "github.com/redis/go-redis/v9"

rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})

// Create consumer group (once)
rdb.XGroupCreateMkStream(ctx, "orders.events", "mygroup", "0")

for {
    streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
        Group:    "mygroup",
        Consumer: "worker1",
        Streams:  []string{"orders.events", ">"},
        Count:    10,
        Block:    5 * time.Second,
    }).Result()
    
    if err != nil {
        continue
    }
    
    for _, stream := range streams {
        for _, msg := range stream.Messages {
            var event Event
            json.Unmarshal([]byte(msg.Values["event"].(string)), &event)
            process(event)
            rdb.XAck(ctx, "orders.events", "mygroup", msg.ID)
        }
    }
}

Rust consumer example

#![allow(unused)]
fn main() {
use redis::AsyncCommands;

let client = redis::Client::open("redis://localhost:6379")?;
let mut con = client.get_async_connection().await?;

// Create consumer group
let _: () = redis::cmd("XGROUP")
    .arg("CREATE").arg("orders.events").arg("mygroup").arg("0").arg("MKSTREAM")
    .query_async(&mut con).await.unwrap_or(());

loop {
    let results: Vec<StreamReadReply> = con.xread_options(
        &["orders.events"],
        &[">"],
        &StreamReadOptions::default()
            .group("mygroup", "worker1")
            .count(10)
            .block(5000)
    ).await?;
    
    for stream in results {
        for msg in stream.ids {
            let event: Event = serde_json::from_str(&msg.map["event"])?;
            process(event);
            con.xack("orders.events", "mygroup", &[&msg.id]).await?;
        }
    }
}
}

Python consumer example

import redis
import json

r = redis.Redis.from_url("redis://localhost:6379")

# Create consumer group (once)
try:
    r.xgroup_create("orders.events", "mygroup", id="0", mkstream=True)
except redis.ResponseError:
    pass  # Group already exists

# Consume events
while True:
    events = r.xreadgroup("mygroup", "worker1", {"orders.events": ">"}, count=10, block=5000)
    for stream, messages in events:
        for msg_id, data in messages:
            event = json.loads(data[b"event"])
            process(event)
            r.xack("orders.events", "mygroup", msg_id)

Failure modes

FailureSymptomsDeltaForge behaviorResolution
Server unavailableConnection refusedRetries with backoff; blocks checkpointRestore Redis; check network
Authentication failureNOAUTH / WRONGPASSFails fast, no retryFix auth details in URI
OOM (Out of Memory)OOM command not allowedFails batch; retriesIncrease maxmemory; enable eviction or trim streams
Stream doesn’t existAuto-created by XADDNo failureN/A (XADD creates stream)
Connection timeoutCommand hangsTimeout after configured durationCheck network; increase timeout
Cluster MOVED/ASKRedirect errorsAutomatic redirect (if cluster mode)Ensure cluster client configured
Replication lagWrites to replica failFails with READONLYWrite to master only
Max stream lengthIf MAXLEN enforcedOldest entries trimmedExpected behavior; not a failure
Network partitionIntermittent timeoutsRetries; may have gapsRestore network

Failure scenarios and data guarantees

Redis OOM during batch delivery

  1. DeltaForge sends batch of 100 events via pipeline
  2. 50 events written, Redis hits maxmemory
  3. Pipeline fails atomically (all or nothing per pipeline)
  4. DeltaForge retries entire batch
  5. If OOM persists: batch blocked until memory available
  6. Checkpoint only saved after ALL events acknowledged

DeltaForge crash after XADD, before checkpoint

  1. Batch written to Redis stream successfully
  2. DeltaForge crashes before saving checkpoint
  3. On restart: replays from last checkpoint
  4. Result: Duplicate events in stream (at-least-once)
  5. Consumer must handle idempotently (check event.id)

Redis failover (Sentinel/Cluster)

  1. Master fails, Sentinel promotes replica
  2. In-flight XADD may fail with connection error
  3. DeltaForge reconnects to new master
  4. Retries failed batch
  5. Possible duplicates if original write succeeded

Handling duplicates in consumers

# Idempotent consumer using event ID
processed_ids = set()  # Or use Redis SET for distributed dedup

for msg_id, data in messages:
    event = json.loads(data[b"event"])
    event_id = event["id"]
    
    if event_id in processed_ids:
        r.xack("orders.events", "mygroup", msg_id)
        continue  # Skip duplicate
    
    process(event)
    processed_ids.add(event_id)
    r.xack("orders.events", "mygroup", msg_id)

Monitoring

DeltaForge exposes these metrics for Redis sink monitoring:

# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink}      # Events delivered
deltaforge_sink_batch_total{pipeline,sink}       # Batches delivered  
deltaforge_sink_latency_seconds{pipeline,sink}   # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"}  # Stage timing

For Redis server visibility, use Redis’s built-in monitoring:

# Monitor commands in real-time
redis-cli MONITOR

# Get server stats
redis-cli INFO stats

# Check memory usage
redis-cli INFO memory

# Stream-specific info
redis-cli XINFO STREAM orders.events
redis-cli XINFO GROUPS orders.events

Stream management

# Check stream length
redis-cli XLEN orders.events

# Trim to last 10000 entries (approximate)
redis-cli XTRIM orders.events MAXLEN ~ 10000

# Trim to exact length
redis-cli XTRIM orders.events MAXLEN 10000

# View consumer group info
redis-cli XINFO GROUPS orders.events

# Check pending messages
redis-cli XPENDING orders.events mygroup

# Claim stuck messages (after 60 seconds)
redis-cli XCLAIM orders.events mygroup worker2 60000 <message-id>

# Delete processed messages (careful!)
redis-cli XDEL orders.events <message-id>

Notes

  • Redis Streams provide at-least-once delivery with consumer group acknowledgements
  • Use MAXLEN ~ trimming to prevent unbounded memory growth (approximate is faster)
  • Consider Redis Cluster for horizontal scaling with multiple streams
  • Combine with Redis pub/sub for fan-out to ephemeral subscribers
  • For durability, enable AOF persistence with appendfsync everysec or always
  • Monitor memory usage closely; Redis will reject writes when maxmemory is reached

Kafka

Kafka sink

The Kafka sink publishes batches to a Kafka topic using rdkafka.

When to use Kafka

Kafka excels as the backbone for event-driven architectures where durability, ordering, and replay capabilities are critical.

Real-world applications

Use CaseDescription
Event sourcingStore all state changes as an immutable log; rebuild application state by replaying events
Microservices integrationDecouple services with async messaging; each service consumes relevant topics
Real-time analytics pipelinesFeed CDC events to Spark, Flink, or ksqlDB for streaming transformations
Data lake ingestionStream database changes to S3/HDFS via Kafka Connect for analytics and ML
Audit loggingCapture every database mutation for compliance, debugging, and forensics
Cross-datacenter replicationUse MirrorMaker 2 to replicate topics across regions for DR

Pros and cons

ProsCons
Durability - Configurable replication ensures no data lossOperational complexity - Requires ZooKeeper/KRaft, careful tuning
Ordering guarantees - Per-partition ordering with consumer groupsLatency - Batching and replication add milliseconds of delay
Replay capability - Configurable retention allows reprocessingResource intensive - High disk I/O and memory requirements
Ecosystem - Connect, Streams, Schema Registry, ksqlDBLearning curve - Partitioning, offsets, consumer groups to master
Throughput - Handles millions of messages per secondCold start - Cluster setup and topic configuration overhead
Exactly-once semantics - Transactions for critical workloadsCost - Managed services can be expensive at scale

Configuration

sinks:
  - type: kafka
    config:
      id: orders-kafka
      brokers: ${KAFKA_BROKERS}
      topic: orders
      required: true
      exactly_once: false
      client_conf:
        message.timeout.ms: "5000"
        acks: "all"
FieldTypeDefaultDescription
idstringSink identifier
brokersstringComma-separated broker list
topicstringDestination topic
requiredbooltrueGates checkpoints
exactly_onceboolfalseEnable EOS semantics
client_confmap{}librdkafka overrides

Exactly-once semantics

When exactly_once: true, the Kafka sink uses a transactional producer. Each batch is wrapped in a Kafka transaction (begin_transaction / commit_transaction), so either all events in the batch are committed atomically or none are.

sinks:
  - type: kafka
    config:
      id: orders-kafka
      brokers: ${KAFKA_BROKERS}
      topic: orders
      exactly_once: true

How it works

  1. DeltaForge assigns a stable transactional.id per pipeline-sink pair (deltaforge-{pipeline}-{sink_id})
  2. On startup, init_transactions() registers with the broker and fences any zombie producer from a previous instance
  3. Each batch: begin_transaction() → produce messages → commit_transaction()
  4. If commit fails, the transaction is aborted and the batch retried
  5. Consumers using isolation.level=read_committed only see committed batches

Requirements

  • Kafka 2.5+ (transaction support)
  • isolation.level=read_committed on consumers (otherwise they see uncommitted messages)
  • Only one DeltaForge instance per pipeline at a time (the broker will fence duplicates)

Transactional overrides

When exactly_once: true, DeltaForge automatically configures:

SettingValueWhy
transactional.iddeltaforge-{pipeline}-{sink_id}Broker fencing
transaction.timeout.ms60000Max transaction duration
enable.idempotencetrueRequired for transactions
acksallRequired for transactions
message.timeout.ms30000Must be ≤ transaction.timeout.ms
delivery.timeout.ms30000Must be ≤ transaction.timeout.ms

You do not need to set these in client_conf — they are applied automatically and cannot be overridden.

Fatal errors

If the broker fences the producer (another instance started with the same transactional.id), DeltaForge treats this as a fatal error and stops the pipeline. This is not retryable — resolve the duplicate instance before restarting.

Performance impact

Transactions add ~1-3ms overhead per batch for the two-phase commit. With properly sized batches (max_events=16000, max_bytes=16MB), throughput impact is ~7-11%. See the Performance guide for tuning details and benchmark results.

SettingRecommendedDescription
acksallWait for all replicas for durability
message.timeout.ms30000Total time to deliver a message
retries2147483647Retry indefinitely (with backoff)
enable.idempotencetruePrevent duplicates on retry
compression.typelz4Balance between CPU and bandwidth

Consuming events

Kafka CLI

# Consume from beginning
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --from-beginning

# Consume with consumer group
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --group deltaforge-consumers

Go consumer example

config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest

group, _ := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)

for {
    err := group.Consume(ctx, []string{"orders"}, &handler{})
    if err != nil {
        log.Printf("Consumer error: %v", err)
    }
}

type handler struct{}

func (h *handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        var event Event
        json.Unmarshal(msg.Value, &event)
        process(event)
        session.MarkMessage(msg, "")
    }
    return nil
}

Rust consumer example

#![allow(unused)]
fn main() {
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;

let consumer: StreamConsumer = ClientConfig::new()
    .set("bootstrap.servers", "localhost:9092")
    .set("group.id", "my-group")
    .set("auto.offset.reset", "earliest")
    .create()?;

consumer.subscribe(&["orders"])?;

loop {
    match consumer.recv().await {
        Ok(msg) => {
            let payload = msg.payload_view::<str>().unwrap()?;
            let event: Event = serde_json::from_str(payload)?;
            process(event);
            consumer.commit_message(&msg, CommitMode::Async)?;
        }
        Err(e) => eprintln!("Kafka error: {}", e),
    }
}
}

Python consumer example

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    event = message.value
    process(event)
    consumer.commit()

Failure modes

FailureSymptomsDeltaForge behaviorResolution
Broker unavailableConnection refused, timeoutRetries with backoff; blocks checkpointRestore broker; check network
Topic not foundUnknownTopicOrPartitionFails batch; retriesCreate topic or enable auto-create
Authentication failureSaslAuthenticationFailedFails fast, no retryFix credentials in config
Authorization failureTopicAuthorizationFailedFails fast, no retryGrant ACLs for producer
Message too largeMessageSizeTooLargeFails message permanentlyIncrease message.max.bytes or filter large events
Leader electionNotLeaderForPartitionAutomatic retry after metadata refreshWait for election; usually transient
Disk fullKafkaStorageExceptionRetries indefinitelyAdd disk space; purge old segments
Network partitionTimeouts, partial failuresRetries; may produce duplicatesRestore network; idempotence prevents dups
Producer fencedProducerFenced errorFatal — pipeline stops immediatelyEnsure only one instance per pipeline; restart after resolving
Transaction timeouttransaction.timeout.ms exceededTransaction aborted; batch retriedIncrease timeout or reduce batch size
SSL/TLS errorsHandshake failuresFails fastFix certificates, verify truststore

Failure scenarios and data guarantees

Broker failure during batch delivery

  1. DeltaForge sends batch of 100 events
  2. 50 events delivered, broker crashes
  3. rdkafka detects failure, retries remaining 50
  4. If idempotence enabled: no duplicates
  5. If not: possible duplicates of events near failure point
  6. Checkpoint only saved after ALL events acknowledged

DeltaForge crash after Kafka ack, before checkpoint

  1. Batch delivered to Kafka successfully
  2. DeltaForge crashes before saving checkpoint
  3. On restart: replays from last checkpoint
  4. Without exactly_once: duplicate events in Kafka (at-least-once); consumer must handle idempotently
  5. With exactly_once: replayed batch gets a new transaction; consumers with read_committed see duplicates but each batch is atomic

Monitoring recommendations

DeltaForge exposes these metrics for Kafka sink monitoring:

# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink}      # Events delivered
deltaforge_sink_batch_total{pipeline,sink}       # Batches delivered
deltaforge_sink_latency_seconds{pipeline,sink}   # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"}  # Stage timing

For deeper Kafka broker visibility, monitor your Kafka cluster directly:

  • Broker metrics via JMX or Kafka’s built-in metrics
  • Consumer lag via kafka-consumer-groups.sh
  • Topic throughput via broker dashboards

Note: Internal rdkafka producer statistics (message queues, broker RTT, etc.) are not currently exposed by DeltaForge. This is a potential future enhancement.

Notes

  • Combine Kafka with other sinks to fan out data; use commit policy to control checkpoint behavior
  • For exactly-once semantics, set exactly_once: true and ensure your Kafka cluster supports transactions (2.5+)
  • With exactly_once: false (default), idempotent production is still enabled — duplicates are prevented during retries but not across DeltaForge restarts
  • Adjust client_conf for durability (acks=all) or performance based on your requirements
  • Consider partitioning strategy for ordering guarantees within partitions

NATS

NATS sink

The NATS sink publishes events to a NATS JetStream stream for durable, at-least-once delivery.

When to use NATS

NATS JetStream is ideal when you need a lightweight, high-performance messaging system with persistence, without the operational overhead of Kafka.

Real-world applications

Use CaseDescription
Edge computingLightweight footprint perfect for IoT gateways and edge nodes syncing to cloud
Microservices meshRequest-reply and pub/sub patterns with automatic load balancing
Multi-cloud syncLeaf nodes and superclusters for seamless cross-cloud data replication
Kubernetes-native eventsNATS Operator for cloud-native deployment; sidecar-friendly architecture
Real-time gamingLow-latency state synchronization for multiplayer game servers
Financial data feedsStream market data with subject-based routing and wildcards
Command and controlDistribute configuration changes and commands to distributed systems

Pros and cons

ProsCons
Lightweight - Single binary ~20MB; minimal resource footprintSmaller ecosystem - Fewer connectors and integrations than Kafka
Simple operations - Zero external dependencies; easy clusteringYounger persistence - JetStream newer than Kafka’s battle-tested log
Low latency - Sub-millisecond message deliveryCommunity size - Smaller community than Kafka or Redis
Flexible patterns - Pub/sub, queues, request-reply, streamsTooling maturity - Fewer monitoring and management tools
Subject hierarchy - Powerful wildcard routing (orders.>, *.events)Learning curve - JetStream concepts differ from traditional queues
Multi-tenancy - Built-in accounts and security isolationLess enterprise adoption - Fewer case studies at massive scale
Cloud-native - Designed for Kubernetes and distributed systems

Configuration

sinks:
  - type: nats
    config:
      id: orders-nats
      url: ${NATS_URL}
      subject: orders.events
      stream: ORDERS
      required: true
      send_timeout_secs: 5
      batch_timeout_secs: 30
FieldTypeDefaultDescription
idstringSink identifier
urlstringNATS server URL
subjectstringSubject to publish to
streamstringJetStream stream name
requiredbooltrueGates checkpoints
send_timeout_secsint5Publish timeout
batch_timeout_secsint30Batch timeout
connect_timeout_secsint10Connection timeout

Authentication options

# Credentials file
credentials_file: /etc/nats/creds/user.creds

# Username/password
username: ${NATS_USER}
password: ${NATS_PASSWORD}

# Token
token: ${NATS_TOKEN}

Deduplication

DeltaForge sets the Nats-Msg-Id header on every published message using a deterministic idempotency key derived from the event’s source, transaction, and row identity. JetStream uses this header for server-side deduplication within the stream’s duplicate_window.

This means: if DeltaForge replays a batch after a crash, duplicate messages are automatically discarded by the server — no consumer-side dedup needed (within the dedup window).

Configure duplicate_window on the stream to match your maximum expected replay window:

nats stream add ORDERS \
  --subjects "orders.>" \
  --duplicate-window 5m    # Dedup messages replayed within 5 minutes

The default duplicate_window is 2 minutes. Increase this if DeltaForge might be down longer before restarting.

JetStream setup

Before using the NATS sink with JetStream, create a stream that captures your subject:

# Using NATS CLI
nats stream add ORDERS \
  --subjects "orders.>" \
  --retention limits \
  --storage file \
  --replicas 3 \
  --max-age 7d

# Verify stream
nats stream info ORDERS

Consuming events

NATS CLI

# Subscribe to subject (ephemeral)
nats sub "orders.>"

# Create durable consumer
nats consumer add ORDERS orders-processor \
  --pull \
  --ack explicit \
  --deliver all \
  --max-deliver 3 \
  --filter "orders.events"

# Consume messages
nats consumer next ORDERS orders-processor --count 10

Go consumer example

nc, _ := nats.Connect("nats://localhost:4222")
js, _ := nc.JetStream()

// Create or bind to consumer
sub, _ := js.PullSubscribe("orders.events", "orders-processor",
    nats.Durable("orders-processor"),
    nats.AckExplicit(),
)

for {
    msgs, _ := sub.Fetch(10, nats.MaxWait(5*time.Second))
    for _, msg := range msgs {
        var event Event
        json.Unmarshal(msg.Data, &event)
        process(event)
        msg.Ack()
    }
}

Rust consumer example

#![allow(unused)]
fn main() {
use async_nats::jetstream;

let client = async_nats::connect("nats://localhost:4222").await?;
let js = jetstream::new(client);

let stream = js.get_stream("ORDERS").await?;
let consumer = stream.get_consumer("orders-processor").await?;

let mut messages = consumer.messages().await?;
while let Some(msg) = messages.next().await {
    let msg = msg?;
    let event: Event = serde_json::from_slice(&msg.payload)?;
    process(event);
    msg.ack().await?;
}
}

Monitoring

DeltaForge exposes these metrics for NATS sink monitoring:

# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink}      # Events delivered
deltaforge_sink_batch_total{pipeline,sink}       # Batches delivered
deltaforge_sink_latency_seconds{pipeline,sink}   # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"}  # Stage timing

For NATS server visibility, use the NATS CLI or monitoring endpoint:

# Server info
nats server info

# JetStream account info
nats account info

# Stream statistics
nats stream info ORDERS

# Consumer statistics  
nats consumer info ORDERS orders-processor

# Real-time event monitoring
nats events

NATS also exposes a monitoring endpoint (default :8222) with JSON stats:

  • http://localhost:8222/varz - General server stats
  • http://localhost:8222/jsz - JetStream stats
  • http://localhost:8222/connz - Connection stats

Subject design patterns

PatternExampleUse Case
Hierarchicalorders.us.createdRegional routing
Wildcard singleorders.*.createdAny region, specific event
Wildcard multiorders.>All order events
Versionedv1.orders.eventsAPI versioning

Failure modes

FailureSymptomsDeltaForge behaviorResolution
Server unavailableConnection refusedRetries with backoff; blocks checkpointRestore NATS; check network
Stream not foundstream not found errorFails batch; no retryCreate stream or remove stream config
Authentication failureauthorization violationFails fast, no retryFix credentials
Subject mismatchno responders (core NATS)Fails if no subscribersAdd subscribers or use JetStream
JetStream disabledjetstream not enabledFails fastEnable JetStream on server
Storage fullinsufficient resourcesRetries; eventually failsAdd storage; adjust retention
Message too largemessage size exceeds maximumFails message permanentlyIncrease max_payload or filter large events
Cluster partitionIntermittent failuresRetries with backoffRestore network; wait for quorum
Slow consumerPublish backpressureSlows down; may timeoutScale consumers; increase buffer
TLS errorsHandshake failuresFails fastFix certificates

Failure scenarios and data guarantees

NATS server restart during batch delivery

  1. DeltaForge sends batch of 100 events
  2. 50 events published, server restarts
  3. async_nats detects disconnect, starts reconnecting
  4. After reconnect, DeltaForge retries remaining 50
  5. JetStream deduplication prevents duplicates (if enabled)
  6. Checkpoint only saved after ALL events acknowledged

DeltaForge crash after JetStream ack, before checkpoint

  1. Batch published to JetStream successfully
  2. DeltaForge crashes before saving checkpoint
  3. On restart: replays from last checkpoint
  4. Result: Duplicate events in stream (at-least-once)
  5. Consumer must handle idempotently (check event.id)

Stream storage exhausted

  1. JetStream stream hits max_bytes or max_msgs limit
  2. With discard: old → oldest messages removed, publish succeeds
  3. With discard: new → publish rejected
  4. DeltaForge retries on rejection
  5. Resolution: Increase limits or enable discard: old

JetStream acknowledgement levels

# Stream configuration affects durability
nats stream add ORDERS \
  --replicas 3 \           # R=3 for production
  --retention limits \     # or 'workqueue' for single consumer
  --discard old \          # Remove oldest when full
  --max-age 7d \           # Auto-expire after 7 days
  --storage file           # Persistent (vs memory)
ReplicasGuaranteeUse Case
R=1Single node; lost if node failsDevelopment, non-critical
R=3Survives 1 node failureProduction default
R=5Survives 2 node failuresCritical data

Handling duplicates in consumers

// Use event ID for idempotency
processedIDs := make(map[string]bool)  // Or use Redis/DB

for _, msg := range msgs {
    var event Event
    json.Unmarshal(msg.Data, &event)
    
    if processedIDs[event.ID] {
        msg.Ack()  // Already processed
        continue
    }
    
    if err := process(event); err == nil {
        processedIDs[event.ID] = true
    }
    msg.Ack()
}

Notes

  • When stream is specified, the sink verifies the stream exists at connection time
  • Without stream, events are published to core NATS (no persistence guarantees)
  • Connection pooling ensures efficient reuse across batches
  • Use replicated streams (--replicas 3) for production durability
  • Combine with other sinks to fan out data; use commit policy to control checkpoint behavior
  • JetStream provides at-least-once delivery with server-side deduplication via Nats-Msg-Id within the configured duplicate_window

HTTP/Webhook sink

The HTTP sink delivers CDC events via HTTP POST (or PUT) to any URL — internal services, serverless functions, third-party APIs, webhooks.

When to use HTTP

HTTP is the universal integration point. Use it when:

  • Your consumer doesn’t run Kafka/NATS/Redis
  • You need to call a REST API or webhook on every database change
  • You want the simplest possible setup (DeltaForge + any HTTP server)
  • You’re integrating with serverless functions (AWS Lambda, Cloud Functions)

Pros and cons

ProsCons
Works with any HTTP serverHigher latency than message queues
No infrastructure dependenciesNo built-in replay (consumer must be idempotent)
Simple auth (Bearer, Basic, headers)No consumer groups or partitioning
URL templates for per-table routingOne request per event (unless batch mode)

Configuration

sinks:
  - type: http
    config:
      id: my-webhook
      url: "https://api.example.com/events"
      method: POST
      headers:
        Authorization: "Bearer ${API_TOKEN}"
        X-Source: deltaforge
      batch_mode: false
      required: true
      send_timeout_secs: 10
FieldTypeDefaultDescription
idstringSink identifier
urlstringTarget URL (supports ${path} templates)
methodstringPOSTHTTP method (POST or PUT)
headersmap{}Static headers (values support ${ENV_VAR} expansion)
batch_modeboolfalseSend JSON array instead of per-event requests
requiredbooltrueGates checkpoints
send_timeout_secsint10Per-request timeout
batch_timeout_secsint30Batch timeout
connect_timeout_secsint5TCP connection timeout

Authentication

All auth is handled via the headers map. Values support ${ENV_VAR} shell expansion.

# Bearer token
headers:
  Authorization: "Bearer ${API_TOKEN}"

# Basic auth
headers:
  Authorization: "Basic ${BASIC_AUTH_B64}"

# Custom API key
headers:
  X-API-Key: "${MY_API_KEY}"

# HMAC signature (computed externally, injected via env)
headers:
  X-Signature: "${WEBHOOK_SIGNATURE}"

URL templates

Route events to different URLs based on event fields:

# Per-table endpoint
url: "https://api.example.com/cdc/${source.table}"
# → https://api.example.com/cdc/orders
# → https://api.example.com/cdc/customers

# Per-database endpoint
url: "https://${source.db}.api.internal/events"

# Static URL (most common)
url: "https://api.example.com/webhook"

Batch mode

By default, the sink sends one HTTP request per event. Enable batch_mode: true to send a JSON array of events in a single request:

# Per-event mode (default): one POST per event
batch_mode: false
# Body: {"id": "...", "op": "c", "after": {...}}

# Batch mode: one POST with JSON array
batch_mode: true
# Body: [{"id": "...", "op": "c", ...}, {"id": "...", "op": "u", ...}]

Batch mode reduces HTTP overhead but means the consumer must handle arrays.

Retry behavior

ConditionBehavior
2xx responseSuccess
408, 429Retry with backoff (100ms → 10s, 3 attempts)
5xxRetry with backoff
Connection errorRetry with backoff
TimeoutRetry with backoff
401, 403Auth error — fail immediately, no retry
Other 4xxPermanent failure — fail batch

Failure modes

FailureSymptomsDeltaForge behaviorResolution
Endpoint unavailableConnection refusedRetries with backoff; blocks checkpointRestore endpoint
Authentication failure401/403 responseFails fast, no retryFix credentials in headers
Rate limited429 responseRetries with backoffReduce throughput or increase rate limit
TimeoutRequest exceeds send_timeout_secsRetriesIncrease timeout or fix slow endpoint
URL template errorTemplate resolves to emptyEvent → DLQ (if enabled)Fix template or event data

Consuming events

Node.js / Express

app.post('/webhook', (req, res) => {
  const event = req.body;
  console.log(`${event.op} on ${event.source.table}: ${JSON.stringify(event.after)}`);
  res.sendStatus(200);
});

Python / Flask

@app.route('/webhook', methods=['POST'])
def webhook():
    event = request.json
    process(event)
    return '', 200

Go

http.HandleFunc("/webhook", func(w http.ResponseWriter, r *http.Request) {
    var event Event
    json.NewDecoder(r.Body).Decode(&event)
    process(event)
    w.WriteHeader(http.StatusOK)
})

Notes

  • Connection pooling is automatic — reqwest reuses TCP connections to the same host
  • The Content-Type header is set to application/json by default
  • At-least-once delivery: on crash, events may be re-sent. Consumers should be idempotent.
  • For per-event dedup, use the id field in the event payload (UUID v7, stable across replays)

Envelopes and Encodings

DeltaForge supports configurable envelope formats and wire encodings for sink output. This allows you to match the output format expected by your downstream consumers without forcing code changes on them.

Overview

Every CDC event flows through two stages before being written to a sink:

Event -> Envelope (structure) -> Encoding (bytes) -> Sink
  • Envelope: Controls the JSON structure of the output (what fields exist, how they’re nested)
  • Encoding: Controls the wire format (JSON bytes, Avro binary with Schema Registry)

Envelope Formats

Native (default)

The native envelope serializes events directly with minimal overhead. This is DeltaForge’s own format, optimized for efficiency and practical use cases.

Note

The native envelope format may evolve over time as we adapt to user needs and optimize for the lowest possible overhead. If you need a stable, standardized format, consider using debezium or cloudevents envelopes which follow their respective established specifications.

sinks:
  - type: kafka
    config:
      id: events-kafka
      brokers: localhost:9092
      topic: events
      envelope:
        type: native

Output:

{
  "before": null,
  "after": {"id": 1, "name": "Alice", "email": "alice@example.com"},
  "source": {
    "version": "0.1.0",
    "connector": "mysql",
    "name": "orders-db",
    "ts_ms": 1700000000000,
    "db": "shop",
    "table": "customers",
    "server_id": 1,
    "file": "mysql-bin.000003",
    "pos": 12345
  },
  "op": "c",
  "ts_ms": 1700000000000
}

When to use:

  • Maximum performance with lowest overhead
  • Custom consumers that parse the payload directly
  • When format stability is less important than efficiency
  • Internal systems where you control both producer and consumer

Debezium

The Debezium envelope wraps the event in a {"schema": null, "payload": ...} structure, following the Debezium event format specification.

This uses schemaless mode (schema: null), which is equivalent to Debezium’s JsonConverter with schemas.enable=false. This is the recommended configuration for most production deployments as it avoids the overhead of inline schemas.

sinks:
  - type: kafka
    config:
      id: events-kafka
      brokers: localhost:9092
      topic: events
      envelope:
        type: debezium

Output:

{
  "schema": null,
  "payload": {
    "before": null,
    "after": {"id": 1, "name": "Alice", "email": "alice@example.com"},
    "source": {
      "version": "0.1.0",
      "connector": "mysql",
      "name": "orders-db",
      "ts_ms": 1700000000000,
      "db": "shop",
      "table": "customers"
    },
    "op": "c",
    "ts_ms": 1700000000000
  }
}

When to use:

  • Kafka Connect consumers expecting full Debezium format
  • Existing Debezium-based pipelines you’re migrating from
  • Tools that specifically parse the payload wrapper
  • When you need a stable, well-documented format with broad ecosystem support

Note: When using Avro encoding with Schema Registry, schema handling is at the encoding layer — schema IDs are embedded in the Confluent wire format.

CloudEvents

The CloudEvents envelope restructures events to the CloudEvents 1.0 specification, a CNCF project that defines a vendor-neutral format for event data. This format strictly follows the CloudEvents spec and is guaranteed to remain compliant.

sinks:
  - type: kafka
    config:
      id: events-kafka
      brokers: localhost:9092
      topic: events
      envelope:
        type: cloudevents
        type_prefix: "com.example.cdc"

Output:

{
  "specversion": "1.0",
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "source": "deltaforge/orders-db/shop.customers",
  "type": "com.example.cdc.created",
  "time": "2024-01-15T10:30:00.000Z",
  "datacontenttype": "application/json",
  "subject": "shop.customers",
  "data": {
    "before": null,
    "after": {"id": 1, "name": "Alice", "email": "alice@example.com"},
    "op": "c"
  }
}

The type field is constructed from your type_prefix plus the operation:

  • com.example.cdc.created (INSERT)
  • com.example.cdc.updated (UPDATE)
  • com.example.cdc.deleted (DELETE)
  • com.example.cdc.snapshot (READ/snapshot)
  • com.example.cdc.truncated (TRUNCATE)

When to use:

  • AWS EventBridge, Azure Event Grid, or other CloudEvents-native platforms
  • Serverless architectures (Lambda, Cloud Functions)
  • Event-driven microservices using CloudEvents SDKs
  • Standardized event routing based on type field
  • When you need a vendor-neutral, CNCF-backed standard format

Wire Encodings

JSON (default)

Standard UTF-8 JSON encoding. Human-readable and widely supported.

sinks:
  - type: kafka
    config:
      id: events-kafka
      brokers: localhost:9092
      topic: events
      encoding: json

Content-Type: application/json

When to use:

  • Development and debugging
  • Consumers that expect JSON
  • When human readability matters
  • Most use cases (good default)

Avro (with Schema Registry)

Avro encoding produces compact binary payloads using the Confluent wire format:

[0x00][4-byte schema ID (big-endian)][Avro binary payload]

This format is natively understood by Kafka Connect, ksqlDB, Apache Flink, and any Confluent-compatible consumer.

sinks:
  - type: kafka
    config:
      id: events-kafka
      brokers: localhost:9092
      topic: events
      encoding:
        type: avro
        schema_registry_url: "http://schema-registry:8081"
        subject_strategy: topic_name   # default

Content-Type: application/avro

Configuration

FieldTypeDefaultDescription
schema_registry_urlstringConfluent-compatible Schema Registry URL
subject_strategystringtopic_nameSubject naming strategy (see below)
usernamestringBasic auth username for Schema Registry
passwordstringBasic auth password for Schema Registry
unsigned_bigint_modestringstringHow to map MySQL BIGINT UNSIGNED (string or long)
enum_modestringstringHow to map ENUM types (string or enum)
naive_timestamp_modestringstringHow to map naive timestamps (string or timestamp)

Subject naming strategies

StrategySubject patternUse case
topic_name{topic}-valueOne schema per Kafka topic (default, most common)
record_name{record_name}One schema per record type, shared across topics
topic_record_name{topic}-{record_name}Per-topic, per-record schema

Schema source

When source DDL is available (MySQL INFORMATION_SCHEMA, PostgreSQL pg_catalog), DeltaForge derives precise Avro schemas from the actual column types and nullability. This is the recommended path for production.

When DDL is not available (e.g., processor-created synthetic events), DeltaForge falls back to inferring the Avro schema from the JSON event structure. This is less precise (no distinction between int/bigint, all fields nullable).

Schema IDs are cached per subject — only the first event per table triggers a Schema Registry HTTP call.

Type conversion policies

DeltaForge defaults to safe type mappings that prioritize correctness over convenience:

Source typeDefault Avro typeWhyOverride
MySQL BIGINT UNSIGNEDstringValues ≥ 2^63 overflow Avro longunsigned_bigint_mode: long
MySQL/PG ENUMstringAvro enum symbol changes break compatibilityenum_mode: enum
MySQL DATETIMEstring (ISO-8601)Not a UTC instant — Avro timestamp-millis is semantically wrongnaive_timestamp_mode: timestamp
PG timestamp (no tz)string (ISO-8601)Same as above — naive local time, not an instantnaive_timestamp_mode: timestamp
MySQL TIMESTAMPtimestamp-millisStored as UTC — safe to use logical type
PG timestamptztimestamp-microsStored as UTC — safe to use logical type
DECIMAL(p,s)decimal logical typeUses exact precision/scale from DDL
PG numeric (no precision)stringUnbounded precision can’t be expressed in Avro decimal

Full type mapping tables for MySQL and PostgreSQL are documented in the Avro Schema Registry RFC.

Schema evolution

When the source table schema changes (column added/removed/altered), DeltaForge derives a new Avro schema and registers it as a new version. The Schema Registry’s compatibility rules control acceptance:

  • Default: BACKWARD — new schema can read old data (consumers can upgrade first)
  • DeltaForge respects existing subject compatibility settings in the SR

Schema Registry failure handling

ConditionBehavior
SR unavailable, schema cachedContinue encoding with cached schema ID. Metric: deltaforge_avro_sr_cache_fallback_total
SR unavailable, no cacheFail the batch — cannot encode without a schema ID
SR rejects new schema (compatibility)Try encoding with cached schema; if encoding fails → DLQ

When to use

  • Kafka Connect sinks expecting Avro (JDBC Sink, S3 Sink, Elasticsearch Sink)
  • ksqlDB streams and tables
  • Apache Flink CDC consumers
  • When you need compact binary payloads (~40-60% smaller than JSON)
  • When you want schema evolution enforcement via Schema Registry compatibility rules
  • Production pipelines where schema governance matters

Configuration Examples

Kafka with CloudEvents

sinks:
  - type: kafka
    config:
      id: orders-kafka
      brokers: ${KAFKA_BROKERS}
      topic: order-events
      envelope:
        type: cloudevents
        type_prefix: "com.acme.orders"
      encoding: json
      required: true

Redis with Debezium envelope

sinks:
  - type: redis
    config:
      id: orders-redis
      uri: ${REDIS_URI}
      stream: orders
      envelope:
        type: debezium
      encoding: json

NATS with native envelope

sinks:
  - type: nats
    config:
      id: orders-nats
      url: ${NATS_URL}
      subject: orders.events
      stream: ORDERS
      envelope:
        type: native
      encoding: json

Kafka with Avro encoding

sinks:
  - type: kafka
    config:
      id: events-avro
      brokers: ${KAFKA_BROKERS}
      topic: cdc-events
      envelope:
        type: debezium
      encoding:
        type: avro
        schema_registry_url: "http://schema-registry:8081"
        subject_strategy: topic_name
      required: true

Multi-sink with different formats

Different consumers may expect different formats. Configure each sink independently:

sinks:
  # Kafka Connect expects Debezium format
  - type: kafka
    config:
      id: connect-sink
      brokers: ${KAFKA_BROKERS}
      topic: connect-events
      envelope:
        type: debezium
      required: true

  # Lambda expects CloudEvents
  - type: kafka
    config:
      id: lambda-sink
      brokers: ${KAFKA_BROKERS}
      topic: lambda-events
      envelope:
        type: cloudevents
        type_prefix: "com.acme.cdc"
      required: false

  # Analytics wants raw events
  - type: redis
    config:
      id: analytics-redis
      uri: ${REDIS_URI}
      stream: analytics
      envelope:
        type: native

Operation Mapping

DeltaForge uses Debezium-compatible operation codes:

OperationCodeDescription
Create/InsertcNew row inserted
UpdateuExisting row modified
DeletedRow deleted
ReadrSnapshot read (initial load)
TruncatetTable truncated

These codes appear in the op field regardless of envelope format.

Performance Considerations

EnvelopeOverheadFormat StabilityUse Case
NativeBaseline (minimal)May evolveHigh-throughput, internal systems
Debezium~14 bytesStable (follows Debezium spec)Kafka Connect, Debezium ecosystem
CloudEvents~150-200 bytesStable (follows CNCF spec)Serverless, event-driven architectures
EncodingSize vs JSONCPU costSchema governance
JSONBaselineLowestNone
Avro~40-60% smallerModerate (schema lookup + binary encoding)Schema Registry enforced

The native envelope is recommended for maximum throughput when you control both ends of the pipeline. For interoperability with external systems or when format stability is critical, use debezium or cloudevents.

Defaults

If not specified, sinks use:

  • Envelope: native
  • Encoding: json

The native envelope provides the lowest overhead for high-throughput scenarios. If you need format stability guarantees, use debezium or cloudevents which adhere to their respective established specifications.

For Kafka pipelines where schema governance and compact payloads matter, use avro encoding with the Debezium envelope — this is the standard pattern for production Kafka Connect integration.

Dynamic Routing

Dynamic routing controls where each CDC event is delivered - which Kafka topic, Redis stream, or NATS subject receives it. By default, all events go to the single destination configured in the sink (static routing). With dynamic routing, events can be split across destinations based on their content or other attributes of events, pipeline and etc.

Overview

There are two routing mechanisms, and they compose naturally:

  1. Template strings in sink config - resolve per-event from event fields
  2. JavaScript ev.route() - programmatic per-event routing in processors

When both are used, ev.route() overrides take highest priority, then template resolution, then the static config value.

Template Routing

Replace static topic/stream/subject strings with templates containing ${...} variables. Templates are compiled once at startup and resolved per-event with zero regex overhead.

Kafka

sinks:
  - type: kafka
    config:
      id: kafka-routed
      brokers: ${KAFKA_BROKERS}
      topic: "cdc.${source.db}.${source.table}"
      key: "${after.customer_id}"
      envelope:
        type: debezium

Events from shop.orders -> topic cdc.shop.orders, partitioned by customer_id.

Redis

sinks:
  - type: redis
    config:
      id: redis-routed
      uri: ${REDIS_URI}
      stream: "events:${source.table}"
      key: "${after.id}"

Events from orders -> stream events:orders. The key value appears as the df-key field in each stream entry.

NATS

sinks:
  - type: nats
    config:
      id: nats-routed
      url: ${NATS_URL}
      subject: "cdc.${source.db}.${source.table}"
      key: "${after.id}"
      stream: CDC

Events from shop.orders -> subject cdc.shop.orders. The key value appears as the df-key NATS header.

Available Variables

VariableDescriptionExample value
${source.table}Table nameorders
${source.db}Database nameshop
${source.schema}Schema name (PostgreSQL)public
${source.connector}Source typemysql
${op}Operation codec, u, d, r, t
${after.<field>}Field from after image42, cust-abc
${before.<field>}Field from before imageold-value
${tenant_id}Pipeline tenant IDacme

Missing fields resolve to an empty string. A warning is logged once per unique template, not per event.

Static strings (no ${...}) are detected at parse time and have zero overhead on the hot path - no allocation, no resolution.

Env Vars vs Templates

Both use ${...} syntax. The config loader expands environment variables first. Unknown variables pass through as templates for runtime resolution:

brokers: ${KAFKA_BROKERS}      # env var - expanded at load time
topic: "cdc.${source.table}"   # template - passed through to runtime
key: "${after.customer_id}"    # template - resolved per-event

JavaScript Routing

For routing logic that goes beyond field substitution, use ev.route() in a JavaScript processor. This lets you make conditional routing decisions based on event content.

processors:
  - type: javascript
    id: smart-router
    inline: |
      function processBatch(events) {
        for (const ev of events) {
          if (!ev.after) continue;

          if (ev.after.total_amount > 10000) {
            ev.route({
              topic: "orders.priority",
              key: String(ev.after.customer_id),
              headers: {
                "x-tier": "high-value",
                "x-amount": String(ev.after.total_amount)
              }
            });
          } else {
            ev.route({
              topic: "orders.standard",
              key: String(ev.after.customer_id)
            });
          }
        }
        return events;
      }

ev.route() fields

FieldTypeDescription
topicstringOverride destination (topic, stream, or subject)
keystringOverride message/partition key
headersobjectKey-value pairs added to the message

All fields are optional. Only set fields override; omitted fields fall through to config templates or static values.

Calling ev.route() replaces any previous routing on that event - it does not merge.

How headers are delivered

SinkKey deliveryHeader delivery
KafkaKafka message keyKafka message headers
Redisdf-key field in stream entrydf-headers field (JSON)
NATSdf-key NATS headerIndividual NATS headers

Resolution Order

For each event, the destination is resolved in priority order:

ev.route() override  →  config template  →  static config value

Specifically:

  1. If the event has routing.topic set (via ev.route() or programmatically), use it
  2. If the sink config contains a template (has ${...}), resolve it from event fields
  3. Otherwise, use the static config string

The same order applies independently to key and headers.

Examples

See the complete example configurations:

Outbox Pattern

The transactional outbox pattern guarantees that domain events are published whenever the corresponding database change is committed - no two-phase commit required. DeltaForge supports this natively for both MySQL and PostgreSQL with zero application-side polling.

How It Works

┌─────────────────┐     ┌──────────────┐     ┌──────────────────┐     ┌──────┐
│  Application    │────>│   Database   │────>│   DeltaForge     │────>│ Sink │
│  (writes data   │     │  (outbox     │     │  (captures +     │     │      │
│   + outbox msg) │     │   table/WAL) │     │   transforms)    │     │      │
└─────────────────┘     └──────────────┘     └──────────────────┘     └──────┘
  1. Your application writes business data and an outbox message in the same transaction.
  2. DeltaForge captures the outbox event through the database’s native replication stream.
  3. The OutboxProcessor extracts the payload, resolves the destination topic, and sets routing headers.
  4. The transformed event flows to sinks like any other CDC event.

Because the outbox write is part of the application transaction, the event is guaranteed to be published if and only if the transaction commits.

Source Configuration

Each database uses its native mechanism — there is nothing to install or poll.

PostgreSQL — WAL Messages

PostgreSQL uses pg_logical_emit_message() to write messages directly into the WAL. No table is needed.

-- In your application transaction:
BEGIN;
INSERT INTO orders (id, total) VALUES (42, 99.99);
SELECT pg_logical_emit_message(
  true,     -- transactional: tied to the enclosing TX
  'outbox', -- prefix: matched by DeltaForge
  '{"aggregate_type":"Order","aggregate_id":"42","event_type":"OrderCreated","payload":{"total":99.99}}'
);
COMMIT;

Source config:

source:
  type: postgres
  config:
    id: orders-pg
    dsn: ${POSTGRES_DSN}
    slot: deltaforge_orders
    publication: orders_pub
    tables: [public.orders]
    outbox:
      prefixes: [outbox]
FieldTypeDescription
outbox.prefixesarrayWAL message prefixes to capture. Supports glob patterns: outbox (exact), outbox_% (prefix match), * (all).

The message prefix becomes source.table on the event, so processors can filter by prefix when multiple outbox channels share a pipeline.

MySQL — Table Inserts

MySQL uses a regular table. For production, use the BLACKHOLE storage engine so rows are written to the binlog but never stored on disk.

-- Create outbox table (BLACKHOLE = no disk storage, binlog only)
CREATE TABLE outbox (
  id        INT AUTO_INCREMENT PRIMARY KEY,
  aggregate_type VARCHAR(64),
  aggregate_id   VARCHAR(64),
  event_type     VARCHAR(64),
  payload        JSON
) ENGINE=BLACKHOLE;
-- In your application transaction:
BEGIN;
INSERT INTO orders (id, total) VALUES (42, 99.99);
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES ('Order', '42', 'OrderCreated', '{"total": 99.99}');
COMMIT;

Source config:

source:
  type: mysql
  config:
    id: orders-mysql
    dsn: ${MYSQL_DSN}
    tables:
      - shop.orders
      - shop.outbox
    outbox:
      tables: ["shop.outbox"]
FieldTypeDescription
outbox.tablesarrayTable patterns to tag as outbox events. Supports globs: shop.outbox (exact), *.outbox (any database), shop.outbox_% (prefix).

Note: The outbox table must be included in the source’s tables list so DeltaForge subscribes to its binlog events. Only INSERTs are captured - UPDATE and DELETE on the outbox table are ignored.

Outbox Processor

The OutboxProcessor transforms raw outbox events into routed, sink-ready events. Add it to your processors list:

processors:
  - type: outbox
    topic: "${aggregate_type}.${event_type}"
    default_topic: "events.unrouted"
FieldTypeDefaultDescription
idstring"outbox"Processor identifier
tablesarray[]Filter: only process outbox events matching these patterns. Empty = all outbox events.
topicstring-Topic template resolved against the raw payload using ${field} placeholders
default_topicstring-Fallback topic when template resolution fails and no topic column exists
columnsobject(see below)Column name mappings for extracting outbox fields
additional_headersmap{}Forward extra payload fields as routing headers. Key = header name, value = column name.
raw_payloadboolfalseWhen true, deliver the extracted payload as-is to sinks, bypassing envelope wrapping (native/debezium/cloudevents). Metadata is still available via routing headers.
keystring-Key template resolved against raw payload. Sets routing.key for sink partitioning. Default: aggregate_id value.
strictboolfalseWhen true, fail the batch if required fields are missing (topic, payload, aggregate_type, aggregate_id, event_type). When false, missing fields are silently skipped.

Column Mappings

Column mappings control header extraction and payload rewriting - they tell the processor which fields correspond to aggregate_type, aggregate_id, etc. for setting df-* headers. The topic template resolves directly against the raw payload, so you reference your actual column names there.

ColumnDefaultHeaderDescription
payload"payload"-Event body. Extracted and promoted to event.after.
aggregate_type"aggregate_type"df-aggregate-typeAggregate root type (e.g. Order).
aggregate_id"aggregate_id"df-aggregate-idAggregate root ID. Also used as default routing key.
event_type"event_type"df-event-typeDomain event type (e.g. OrderCreated).
topic"topic"-Per-row topic override (used when template is absent).
event_id"id"df-event-idEvent identity for idempotency/dedup.

If your outbox payload uses non-default field names, override them:

processors:
  - type: outbox
    topic: "${aggregate_type}.${event_type}"
    columns:
      payload: data           # default: "payload"
      aggregate_type: type    # default: "aggregate_type"
      aggregate_id: key       # default: "aggregate_id"
      event_type: action      # default: "event_type"
      topic: destination      # default: "topic"
      event_id: uuid          # default: "id"

Additional Headers

Forward arbitrary payload fields as routing headers. This is useful when migrating from Debezium’s table.fields.additional.placement or when downstream consumers need extra metadata:

processors:
  - type: outbox
    topic: "${aggregate_type}.${event_type}"
    additional_headers:
      x-trace-id: trace_id
      x-correlation-id: correlation_id
      x-source-region: region

Each key becomes a header name, each value is the column name in the outbox payload. Missing columns are silently skipped - no error if a row doesn’t contain the field.

Typed Extraction

Header values are extracted as strings regardless of the source JSON type. Numeric IDs, booleans, and string values are all stringified automatically:

JSON valueHeader value
"abc-123"abc-123
4242
truetrue
null / missing(skipped)
{} / [](skipped)

What the Processor Does

  1. Identifies outbox events by the __outbox sentinel on source.schema (set by the source).
  2. Extracts aggregate_type, aggregate_id, event_type, and payload from event.after.
  3. Resolves the topic using a three-step cascade:
    • Template resolved against the raw payload (e.g. ${domain}.${action} — use your actual column names)
    • Column value (a topic field in the payload, configurable via columns.topic)
    • default_topic fallback
  4. Rewrites event.after to just the payload content.
  5. Sets routing headers: df-event-id, df-aggregate-type, df-aggregate-id, df-event-type, plus any additional_headers mappings.
  6. Sets routing key using the key template (or falls back to aggregate_id).
  7. Marks raw delivery if raw_payload: true — sinks serialize event.after directly, skipping envelope wrapping.
  8. Clears the __outbox sentinel so the event looks like a normal CDC event to sinks.
  9. Drops non-INSERT outbox events (UPDATE/DELETE on the outbox table are meaningless).
  10. Validates in strict mode (strict: true): fails the batch with an error if any required field is missing (topic, payload, aggregate_type, aggregate_id, event_type). The error names the missing fields so operators can fix the schema. In lenient mode (default), missing fields are silently skipped.
  11. Passes through all non-outbox events unchanged.

Multi-Outbox Routing

When a source captures multiple outbox channels, use the tables filter to scope each processor:

processors:
  - type: outbox
    tables: [orders_outbox]
    topic: "orders.${event_type}"

  - type: outbox
    tables: [payments_outbox]
    topic: "payments.${event_type}"
    columns:
      payload: data

Complete Examples

PostgreSQL → Kafka

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: order-events
  tenant: acme
spec:
  source:
    type: postgres
    config:
      id: pg1
      dsn: ${POSTGRES_DSN}
      publication: orders_pub
      slot: orders_slot
      tables: [public.orders]
      outbox:
        prefixes: [outbox]

  processors:
    - type: outbox
      topic: "${aggregate_type}.${event_type}"
      default_topic: "events.unrouted"
      raw_payload: true

  sinks:
    - type: kafka
      config:
        id: k1
        brokers: ${KAFKA_BROKERS}
        topic: "events.fallback"

The raw_payload: true means outbox events hit the wire as the extracted payload JSON. Regular CDC events (from public.orders) still use the sink’s configured envelope.

MySQL → Kafka (Multi-Outbox)

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: shop-events
  tenant: acme
spec:
  source:
    type: mysql
    config:
      id: m1
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders
        - shop.orders_outbox
        - shop.payments_outbox
      outbox:
        tables: ["shop.*_outbox"]

  processors:
    - type: outbox
      tables: [orders_outbox]
      topic: "orders.${event_type}"
      raw_payload: true

    - type: outbox
      tables: [payments_outbox]
      topic: "payments.${event_type}"
      raw_payload: true

  sinks:
    - type: kafka
      config:
        id: k1
        brokers: ${KAFKA_BROKERS}
        topic: "events.default"

Migrating from Debezium

If you’re using Debezium’s outbox event router with a custom schema, DeltaForge’s column mappings and additional_headers map directly. For example, given this Debezium-style outbox table:

CREATE TABLE outbox_events (
  id UUID PRIMARY KEY,
  aggregatetype VARCHAR(64),
  aggregateid VARCHAR(64),
  type VARCHAR(64),
  payload JSONB,
  traceid VARCHAR(64),
  tenant VARCHAR(32)
);

Debezium config:

transforms.outbox.table.field.event.id=id
transforms.outbox.table.field.event.key=aggregateid
transforms.outbox.table.field.event.type=type
transforms.outbox.table.field.event.payload=payload
transforms.outbox.table.fields.additional.placement=traceid:header,tenant:header

DeltaForge equivalent:

processors:
  - type: outbox
    topic: "${aggregatetype}.${type}"
    key: "${aggregateid}"
    raw_payload: true
    columns:
      event_id: id
      aggregate_type: aggregatetype
      aggregate_id: aggregateid
      event_type: type
      payload: payload
    additional_headers:
      x-trace-id: traceid
      x-tenant: tenant

Key differences from Debezium:

  • Topic template references raw column names directly — ${aggregatetype}, not ${aggregate_type}
  • raw_payload: true delivers the extracted payload as-is — same behavior as Debezium’s outbox event router
  • Column mappings only affect header extraction (df-* headers) and payload rewriting
  • additional_headers replaces table.fields.additional.placement
  • No SMT chain — everything is in one processor config

Observability

The outbox processor emits Prometheus-compatible metrics:

MetricLabelsDescription
deltaforge_outbox_transformed_total-Events successfully transformed
deltaforge_outbox_dropped_totalreasonEvents dropped or rejected

Drop reasons:

ReasonMeaning
non_insertUPDATE/DELETE on outbox table (expected, harmless)
non_objectevent.after is not a JSON object
null_payloadevent.after is null
strict_missing_fieldsStrict mode: required field missing (batch fails)

In strict mode, strict_missing_fields increments the counter and returns an error that halts the batch - the pipeline will not silently lose events.

Tips

  • PostgreSQL WAL messages are lightweight. No table, no index, no vacuum - just a single WAL entry per message. Prefer this over table-based outbox when using PostgreSQL.
  • MySQL BLACKHOLE engine avoids storing outbox rows on disk. The row is written to the binlog and immediately discarded. Use it in production to avoid unbounded table growth.
  • Outbox events coexist with normal CDC. The processor passes through all non-outbox events untouched, so you can mix regular table capture and outbox in the same pipeline.
  • Topic templates use ${field} syntax, same as dynamic routing. The template resolves directly against the raw outbox payload columns - use your actual column names like ${domain}.${action}, no remapping needed.
  • At-least-once delivery applies to outbox events just like regular CDC events. Downstream consumers should be idempotent - use the df-event-id header for idempotency, or aggregate_id + event_type as a composite dedup key.
  • Malformed events are logged and dropped by default. Non-INSERT operations, null payloads, and non-object payloads produce a WARN-level log with the table name and reason. They do not propagate to sinks. Enable strict: true to fail the batch instead of dropping - this ensures operators are alerted to schema issues before events are lost.

Guarantees & Correctness

This page defines DeltaForge’s data delivery guarantees, ordering model, transaction semantics, failure handling, and operational boundaries. Every claim here is backed by the implementation — no aspirational statements.

Delivery Guarantees

Per-sink delivery tiers

SinkDelivery guaranteeDedup mechanismConsumer action required
Kafka (exactly_once: true)End-to-end exactly-onceKafka two-phase commit per batchSet isolation.level=read_committed
Kafka (default)At-least-once (idempotent producer)Retries are deduped by rdkafka; crash-replay produces duplicatesDedup by event ID or idempotency key
NATS JetStreamAt-least-once + server-side dedupNats-Msg-Id header within duplicate_windowConfigure duplicate_window on stream
Redis StreamsAt-least-once + consumer-side dedupidempotency_key field in XADD payloadCheck idempotency_key before processing

Terminology rule: “exactly-once” is used only when DeltaForge guarantees no duplicates without consumer cooperation. All other sinks are “at-least-once” with a stated dedup mechanism. This distinction matters — calling NATS or Redis “exactly-once” would be misleading because dedup depends on server configuration or consumer behavior outside DeltaForge’s control.

What “at-least-once” means

  • No data loss: every event from the source is delivered to the sink at least once. Checkpoints are saved only after the sink acknowledges delivery — never before.
  • Duplicates on crash recovery: if DeltaForge crashes after delivering a batch but before saving the checkpoint, that batch is replayed on restart. Consumers must handle duplicates (see Consumer Guidance below).
  • No silent drops: events are never discarded. If delivery fails, the batch is retried with exponential backoff until it succeeds or a fatal error stops the pipeline.

What “exactly-once” means (Kafka)

With exactly_once: true, each batch is wrapped in a Kafka transaction (begin_transaction / commit_transaction). Consumers using isolation.level=read_committed only see committed batches — no partial deliveries. If a transaction fails, it is aborted and retried from the same checkpoint position.

Exactly-once overhead is ~7-11% with properly tuned batch sizes. See the Performance guide for benchmark details.

Ordering Model

Within a source

Events are emitted in the source’s native order:

  • MySQL: binlog file + position order. WriteRowsEvent batches preserve row order within each binlog event.
  • PostgreSQL: LSN (Log Sequence Number) order. One WAL message per row change.
  • Turso: change_id order (monotonically increasing).

DeltaForge does not reorder events. The source order is preserved through the pipeline.

Within a batch

All events in a batch maintain their source order. The delivery task processes batches in FIFO order from a bounded channel — no reordering between batches.

Per-primary-key ordering (the core guarantee)

DeltaForge guarantees per-primary-key ordering within a table under non-sharded operation. This means: for any single row identified by its primary key, all changes (INSERT, UPDATE, DELETE) are delivered to the sink in the exact order they occurred in the source database.

For Kafka specifically: the default message key is the serialized primary key, so events for the same row always go to the same partition and arrive in order. With dynamic routing (key template), ordering follows the resolved key — events with the same key are ordered; different keys may land in different partitions.

Cross-table ordering

There is no global ordering across tables. Events from different tables may be interleaved across batches. This is by design — enforcing global ordering would require single-threaded delivery, which would cap throughput.

However, when batch.respect_source_tx: true (the default), all rows from a single database transaction are kept in the same batch (see Transaction Boundaries below). This preserves causal ordering within a transaction.

Ordering under retries

When a batch delivery fails and is retried, the batch is re-delivered as a unit in the same order. No reordering occurs within or across retries. The max_inflight=1 setting (default) ensures strict ordering; with max_inflight > 1, batches are still delivered in FIFO order by the single-threaded delivery task.

Cross-sink ordering

All sinks receive the same batch simultaneously. The relative order of events is identical across all sinks.

Transaction Boundaries

How it works

When batch.respect_source_tx: true (the default), the coordinator checks each event’s tx_end flag before splitting a batch:

  • MySQL: tx_end is set on the last row of each XID (transaction commit) event.
  • PostgreSQL: tx_end is set on the COMMIT WAL record.
  • Turso: each change is its own transaction (tx_end always true).

The batch accumulator will not split a batch at a point that would separate rows from the same transaction. If the batch limit (max_events or max_bytes) is reached mid-transaction, the batch grows beyond the limit to include all remaining rows in that transaction.

What this guarantees

  • All rows from one database transaction appear in the same batch.
  • Each batch is delivered atomically to each sink (all events in the batch succeed or fail together).
  • With Kafka exactly_once: true, the entire batch is committed as a single Kafka transaction — consumers see all rows from the DB transaction atomically.
  • Cross-table transactions: a transaction spanning tables A and B is emitted as a single batch containing events for both tables, tagged with the same tx_id. The batch is delivered atomically. This is stronger than “tagged but not grouped” — all events from one DB transaction are in one batch and delivered as a unit.

Precise transaction semantics

To avoid ambiguity, here is exactly what DeltaForge guarantees about transactions:

  • Events from one source transaction are emitted contiguously within a single batch.
  • Multi-table transactions preserve commit grouping — all rows from tables A and B in one DB transaction appear in the same batch.
  • Within a single sink, events from one transaction are delivered atomically (the batch succeeds or fails as a unit).
  • Across heterogeneous sinks, DeltaForge does not guarantee atomic commit. Kafka may commit a transaction while Redis is still retrying. Each sink’s checkpoint tracks its own progress independently.
  • Retries do not break transaction grouping — a retried batch contains the same events in the same order.
  • Under non-sharded operation, no sink may observe partial progress within a source transaction (the batch is the commit unit).

Edge cases

  • A single database transaction that exceeds max_events or max_bytes is still kept in one batch. The limits are exceeded rather than the transaction being split.
  • With respect_source_tx: false, batches are split purely by size/time limits regardless of transaction boundaries. Cross-table transaction atomicity is not preserved in this mode.

Failure Isolation

Per-sink independence

All sinks deliver concurrently. One sink’s failure does not block other sinks:

  1. The coordinator dispatches the same batch to all sinks simultaneously.
  2. Each sink’s delivery result is collected independently.
  3. Only sinks that delivered successfully get their checkpoints advanced.
  4. Failed sinks remain at their prior checkpoint position — they will receive the same batch again on retry or restart.

Required vs. optional sinks

Each sink is marked required: true (default) or required: false:

  • Required: must succeed for the pipeline to consider the batch delivered. If a required sink fails, no checkpoint advances for any sink.
  • Optional (best-effort): failures are logged but don’t prevent the pipeline from advancing. Optional sinks that fail will catch up on restart via replay from their own checkpoint.

Commit policy

The commit policy determines when checkpoints advance:

PolicyBehavior
required (default)All required: true sinks must acknowledge
allEvery sink (required and optional) must acknowledge
quorumAt least N sinks must acknowledge

The policy is checked before any checkpoint is committed. If the policy isn’t satisfied, no sink advances — this prevents optional sinks from getting ahead of failed required sinks.

Per-sink checkpoints

Each sink maintains its own checkpoint key ({source_id}::sink::{sink_id}). On restart, the source replays from the minimum checkpoint across all sinks. This means:

  • A fast sink is never held back by a slow one during normal operation.
  • A slow or failed sink only causes replay for itself, not re-delivery to sinks that are already ahead.
  • Adding a new sink triggers replay from the source’s earliest available position for that sink only.

Fatal errors

Some errors are unrecoverable and stop the pipeline immediately:

  • Kafka ProducerFenced: another producer instance started with the same transactional.id. The broker fences the old producer permanently.
  • Permanent auth revocation: credentials are invalid and retrying won’t help.

Fatal errors return SinkError::Fatal and are not retried. The pipeline stops and requires operator intervention.

Error Classification & Retry

Retry behavior by sink

All sinks use exponential backoff with jitter. The classification determines whether an error is retried:

Kafka:

ErrorClassificationBehavior
Queue fullRetryableBackoff, retry (100ms base, 10s max, 3 attempts)
Message timeoutRetryableBackoff, retry
Broker connection failureRetryableBackoff, retry
Authentication failureNon-retryableFail immediately
Message too largeNon-retryableFail immediately
Producer fencedFatalPipeline stops
Transaction commit failure (fatal)FatalPipeline stops

NATS:

ErrorClassificationBehavior
Connection failureRetryableBackoff, retry (50ms base, 5s max, 3 attempts)
Publish timeoutRetryableBackoff, retry
Authentication failureNon-retryableFail immediately
No respondersNon-retryableFail immediately

Redis:

ErrorClassificationBehavior
Connection failureRetryableBackoff, retry (50ms base, 5s max, 3 attempts)
Command timeoutRetryableBackoff, retry
NOAUTH / WRONGPASSNon-retryableFail immediately
Permission deniedNon-retryableFail immediately

After retry exhaustion

If all retry attempts fail for a retryable error, the error is propagated to the coordinator. The coordinator’s behavior depends on the commit policy:

  • Required sink: the batch is not committed, and the pipeline will retry the entire batch on the next cycle.
  • Optional sink: the failure is logged, and the pipeline continues with other sinks.

Checkpoint Semantics

When checkpoints are saved

The checkpoint commit follows a strict sequence:

1. Accumulate events from source into a batch
2. Run processors (transform, filter)
3. Deliver batch to ALL sinks concurrently
4. Check commit policy (required/all/quorum)
5. Commit per-sink checkpoints (only for successful sinks)

Key invariant: a checkpoint is saved only after the sink has acknowledged delivery AND the commit policy is satisfied. This is the foundation of at-least-once delivery.

On crash recovery

  1. DeltaForge reads per-sink checkpoints from the checkpoint store.
  2. The source resumes from the minimum checkpoint across all sinks.
  3. Sinks that were already ahead of the minimum position receive duplicate events — they must handle these idempotently (or use exactly-once mode).
  4. Sinks that were behind receive their missing events.

Checkpoint storage

Checkpoints are stored in SQLite (default) with WAL mode and synchronous=NORMAL for durability. The checkpoint store survives SIGKILL — no graceful shutdown required for checkpoint safety.

Backpressure

DeltaForge implements end-to-end backpressure without dropping events:

Source → [event channel] → Accumulator → [batch channel (max_inflight)] → Delivery → Sinks
  1. Sink slow: delivery task blocks waiting for sink acknowledgement.
  2. Batch channel full: accumulator blocks waiting to enqueue the next batch (bounded by max_inflight).
  3. Event channel full: source blocks waiting to enqueue the next event.
  4. Source slows: the database connection idles until the channel has capacity.

No events are dropped at any stage. Backpressure propagates from the slowest sink all the way back to the source connection.

max_inflight controls the pipeline depth: higher values allow overlapping batch building with delivery (better throughput), lower values reduce memory usage and latency.

Consumer Guidance

Idempotency key

Every event has a deterministic idempotency key in the format:

{tenant}|{db}.{table}|{tx_id}|{event_id}

This key is identical across replays — the same source event always produces the same key.

  • Kafka with exactly_once: true: set isolation.level=read_committed on consumers. No application-level dedup needed.
  • Kafka without exactly_once: use the event’s id field (UUID v7) or the idempotency key to detect duplicates.
  • NATS JetStream: server-side dedup via Nats-Msg-Id header. Configure duplicate_window on the stream to cover your maximum expected downtime (default: 2 minutes).
  • Redis Streams: check the idempotency_key field in the stream entry before processing. Use a Redis SET or application-level tracking to remember processed keys.

Dedup window

How long should consumers remember processed event IDs? Match your maximum expected DeltaForge downtime:

ScenarioRecommended window
Normal operation (no crashes)No dedup needed (at-most-once per run)
Planned restarts5 minutes
Unplanned crashes with auto-restart15-30 minutes
Disaster recoveryMatch your RPO

Correctness Test Matrix

Every guarantee is backed by a test. This matrix maps guarantees to their verification:

GuaranteeTestTypeStatus
No data loss (at-least-once)crash_recovery chaos scenarioChaosExists
Kafka end-to-end exactly-onceexactly_once chaos scenario + kafka_sink_exactly_once_*Chaos + IntegrationExists
Producer fencing detectionkafka_sink_exactly_once_producer_fencingIntegrationExists
Per-primary-key orderingEvents keyed by PK → same Kafka partitionBy designVerified via Kafka partition assignment
Transaction boundary preservationrespect_source_tx + check_and_split coordinator logicUnitExists
Per-sink checkpoint independencetest_per_sink_checkpoint_only_advances_on_successUnitExists
Per-sink checkpoint legacy fallbackper_sink_proxy_falls_back_to_legacy_keyUnitExists
Commit policy gate before checkpointtest_per_sink_checkpoint_only_advances_on_successUnitExists
DLQ routes per-event failurestest_dlq_routes_failed_events_and_pipeline_continuesUnitExists
DLQ all-fail batchtest_dlq_all_events_fail_no_sendUnitExists
DLQ overflow (drop_oldest)dlq::overflow_drop_oldestUnitExists
DLQ overflow (reject)dlq::overflow_reject_drops_newUnitExists
DLQ overflow (block)dlq::overflow_block_waits_for_ackUnitExists
DLQ cleanup expireddlq::cleanup_expired_removes_old_entriesUnitExists
Partial batch timer flushtest_partial_batch_flushed_by_timerUnitExists
Network partition recoverynetwork_partition chaos scenarioChaosExists
Sink outage recoverysink_outage chaos scenarioChaosExists
Schema drift handlingschema_drift chaos scenarioChaosExists
MySQL failover detectionfailover chaos scenarioChaosExists
Postgres failover detectionpg_failover chaos scenarioChaosExists
Binlog purge detectionbinlog_purge chaos scenarioChaosExists
Replication slot drop detectionslot_dropped chaos scenarioChaosExists
NATS dedup within windowVerify Nats-Msg-Id prevents duplicatesIntegrationPlanned
Redis idempotency keyVerify consumer-side dedup via keyIntegrationPlanned
Snapshot → CDC handoffNo gaps or duplicates at boundaryIntegrationPlanned

Limitations

These are not guaranteed and are documented honestly:

  • No cross-table global ordering — events from different tables may be interleaved. This is by design; enforcing global order would require single-threaded delivery and cap throughput. Use respect_source_tx: true to preserve ordering within database transactions.
  • No stateful stream processing — DeltaForge does not support joins, aggregations, or windowing. For stateful processing, consume DeltaForge’s output with Apache Flink, ksqlDB, or Kafka Streams.
  • Dead letter queue — when journal.enabled: true, poison events (serialization/routing failures) are routed to a DLQ instead of blocking the pipeline. Without DLQ enabled, a single bad event will still block. See the DLQ page.
  • No schema registry integration (yet) — schema sensing detects structural drift and can halt on breaking changes, but there is no Confluent Schema Registry or Avro/Protobuf encoding support. Planned for a future release.
  • Snapshot consistency — initial snapshots use lock-free parallel reads. The snapshot is eventually consistent with the CDC stream; there may be a brief overlap period where both snapshot rows and CDC events for the same row are delivered. Consumers should use the event timestamp or idempotency key to resolve.

Dead Letter Queue

The Dead Letter Queue (DLQ) routes poison events — events that fail serialization, exceed size limits, or have invalid routing — to a durable queue instead of blocking the pipeline.

How it works

  1. The coordinator dispatches a batch to each sink
  2. Each sink pre-serializes events individually. If an event fails serialization or routing, it is flagged as a DLQ failure instead of failing the entire batch
  3. The remaining (healthy) events are sent to the sink normally
  4. Failed events are written to the DLQ queue with error context
  5. The pipeline continues — one bad event does not block thousands of good ones

Only per-event attributable failures go to the DLQ:

ErrorDLQ eligibleWhy
Serialization failureYesEvent can’t be encoded — specific to this event’s data
Routing failureYesTemplate resolves to empty/invalid for this event’s fields
Message too largeYesThis specific event exceeds the sink’s max message size
Connection failureNoEntire sink is down — not caused by one event
Auth failureNoCredentials invalid — affects all events
Timeout / backpressureNoTransient — will resolve with retry
Producer fencedNoFatal — pipeline stops

Configuration

DLQ is opt-in. It is configured under journal in the pipeline spec — the journal is DeltaForge’s internal event storage system that backs the DLQ (and will support replay in a future release). Enabling the journal with a dlq stream activates per-event failure routing:

spec:
  journal:
    enabled: true
    max_event_bytes: 262144      # 256KB — truncate larger payloads
    dlq:
      max_entries: 10000         # bounded queue size
      max_age_secs: 604800       # 7 days — auto-purge older entries
      overflow_policy: drop_oldest

Overflow policies

When the DLQ reaches max_entries:

PolicyBehavior
drop_oldest (default)Evict the oldest DLQ entry to make room for the new one. Most recent failures are usually most valuable for investigation.
rejectReject the new DLQ entry. The failed event is lost — it is not stored in the DLQ, not retried, and not delivered to the sink. The pipeline continues processing other events normally. An error is logged and deltaforge_dlq_rejected_total is incremented.
blockBlock the pipeline until space is available. No events (good or bad) are processed until the operator acks DLQ entries via the REST API. Visible as degraded on the health endpoint.

Payload truncation

Events that caused MessageTooLarge may also be too large for the DLQ. If the event payload exceeds max_event_bytes (default 256KB), the before and after fields are truncated and payload_truncated: true is set. All event metadata (source, table, op, id, timestamp) is always preserved.

REST API

EndpointMethodDescription
GET /pipelines/{name}/journal/dlqGETPeek entries (oldest first). Params: ?limit=50&sink_id=...&error_kind=...
GET /pipelines/{name}/journal/dlq/countGETCount of unacked entries
POST /pipelines/{name}/journal/dlq/ackPOSTDismiss (remove) entries up to seq. Body: {"up_to_seq": 42}. Dismissed entries are permanently deleted — they are not retried or reprocessed.
DELETE /pipelines/{name}/journal/dlqDELETEPurge all entries

Filters (sink_id, error_kind) affect listing only. Ack is always cumulative from the queue head — it removes all entries up to up_to_seq, regardless of filters used when viewing.

Example: inspect DLQ entries

# Peek the first 10 DLQ entries
curl -s http://localhost:8080/pipelines/my-pipeline/journal/dlq?limit=10 | jq .

# Filter by sink
curl -s "http://localhost:8080/pipelines/my-pipeline/journal/dlq?sink_id=kafka-primary&limit=5"

# Check DLQ size
curl -s http://localhost:8080/pipelines/my-pipeline/journal/dlq/count
# {"count": 42}

# Ack (remove) entries up to sequence 100
curl -s -X POST http://localhost:8080/pipelines/my-pipeline/journal/dlq/ack \
  -H "Content-Type: application/json" \
  -d '{"up_to_seq": 100}'
# {"acked": 12}

# Purge all entries
curl -s -X DELETE http://localhost:8080/pipelines/my-pipeline/journal/dlq
# {"purged": 42}

Example DLQ entry

{
  "seq": 42,
  "timestamp": 1743350400,
  "pipeline": "orders-pipeline",
  "stream": "dlq",
  "event_id": "01961234-5678-7abc-def0-123456789abc",
  "source_cursor": {"file": "mysql-bin.000005", "pos": 12345},
  "payload_truncated": false,
  "event": {
    "id": "01961234-5678-7abc-def0-123456789abc",
    "source": {"db": "orders", "table": "events"},
    "op": "c",
    "after": {"id": 99, "metadata": "<invalid bytes>"}
  },
  "meta": {
    "sink_id": "kafka-primary",
    "error_kind": "serialization error",
    "error_message": "failed to serialize field 'metadata': invalid UTF-8 sequence",
    "attempts": 1
  }
}

Metrics

MetricTypeLabelsPurpose
deltaforge_dlq_events_totalcounterpipeline, sink, error_kindEvents routed to DLQ
deltaforge_dlq_entriesgaugepipelineCurrent unacked entries
deltaforge_dlq_evicted_totalcounterpipelineEvicted by drop_oldest
deltaforge_dlq_rejected_totalcounterpipelineRejected by reject policy
deltaforge_dlq_write_failures_totalcounterpipelineDLQ storage write failures
deltaforge_dlq_saturation_ratiogaugepipelineCurrent / max_entries (0.0-1.0)

Health signals:

  • Warning log at 80% saturation
  • Error log at 95% saturation

Storage

The DLQ is built on DeltaForge’s existing StorageBackend queue primitives (queue_push, queue_peek, queue_ack). It automatically uses whatever storage backend your pipeline is configured with (SQLite, PostgreSQL, or memory). No additional infrastructure is needed.

Background Cleanup

When max_age_secs is configured (default: 7 days), a background task runs every 60 seconds and removes entries older than the threshold. Age is calculated from insertion time, not source event time. A best-effort startup cleanup pass (bounded to 5 seconds) runs when the pipeline starts.

Cleanup and overflow eviction are independent — drop_oldest may remove entries before age-based cleanup fires.

Operator Workflow

DLQ events are not retried automatically. The intended workflow is:

  1. Alert: monitor deltaforge_dlq_events_total or deltaforge_dlq_entries in Grafana
  2. Inspect: GET /pipelines/{name}/journal/dlq?sink_id=... to see what failed and why
  3. Fix: resolve the root cause (schema mismatch, oversized column, broken routing template)
  4. Dismiss: POST /pipelines/{name}/journal/dlq/ack to remove reviewed entries — dismissed entries are permanently deleted

Future versions may add a retry endpoint (POST .../dlq/retry) that re-injects events into the pipeline, but this raises ordering and idempotency concerns and is deferred.

Limitations

  • DLQ is per-pipeline, not per-sink. Use the sink_id filter to view entries for a specific sink.
  • The block overflow policy blocks the entire pipeline, not just the failing sink.

Architecture

This document describes DeltaForge’s internal architecture, design decisions, and how the major components interact.

Design Principles

Source-Owned Semantics

DeltaForge avoids imposing a universal data model on all sources. Instead, each database source defines and owns its schema semantics:

  • MySQL captures MySQL-specific types, collations, and engine information
  • PostgreSQL captures PostgreSQL-specific types, OIDs, and replica identity
  • Future sources (MongoDB, ClickHouse, TiDB) will capture their native semantics

This approach means downstream consumers receive schemas that accurately reflect the source database rather than a lowest-common-denominator normalization.

Delivery Guarantees First

The checkpoint system is designed around a single invariant:

Checkpoints are only saved after events have been successfully delivered.

This ordering guarantees at-least-once delivery. A crash between checkpoint and delivery would lose events; DeltaForge prevents this by always checkpointing after sink acknowledgment.

Configuration Over Code

Pipelines are defined declaratively in YAML. This enables:

  • Version-controlled pipeline definitions
  • Environment-specific configuration via variable expansion
  • Rapid iteration without recompilation

Component Overview

┌─────────────────────────────────────────────────────────────────┐
│                        DeltaForge Runtime                       │
├─────────────┬─────────────┬─────────────┬─────────────┬─────────┤
│   Sources   │   Schema    │ Coordinator │    Sinks    │ Control │
│             │  Registry   │  + Batch    │             │  Plane  │
├─────────────┼─────────────┼─────────────┼─────────────┼─────────┤
│ MySQL       │ Durable     │ Batching    │ Kafka       │ REST API│
│ PostgreSQL  │ Schema      │ Commit      │ Redis       │ Metrics │
│             │ Registry    │ Policy      │ NATS        │ Health  │
└─────────────┴──────┬──────┴─────────────┴─────────────┴─────────┘
                     │
          ┌──────────┴──────────┐
          │   Storage Backend   │
          │  (SQLite / PG /     │
          │   Memory)           │
          ├─────────────────────┤
          │ KV · Log · Slot     │
          │ Queue               │
          └─────────────────────┘

Data Flow

Event Lifecycle

1. Source reads from database log (binlog/WAL)
        │
        ▼
2. Schema loader maps table_id to schema
        │
        ▼
3. Event constructed with before/after images
        │
        ▼
4. Event sent to coordinator via channel
        │
        ▼
5. Coordinator batches events
        │
        ▼
6. Processors transform batch (JavaScript)
        │
        ▼
7. Sinks deliver batch concurrently
        │
        ▼
8. Commit policy evaluated
        │
        ▼
9. Checkpoint saved (if policy satisfied)

Event Structure

Every CDC event shares a common structure:

#![allow(unused)]
fn main() {
pub struct Event {
    pub source_id: String,          // Source identifier
    pub database: String,           // Database name
    pub table: String,              // Table name
    pub op: Op,                     // Insert, Update, Delete, Ddl
    pub tx_id: Option<u64>,         // Source transaction ID
    pub before: Option<Value>,      // Previous row state
    pub after: Option<Value>,       // New row state
    pub schema_version: Option<String>,  // Schema fingerprint
    pub schema_sequence: Option<u64>,    // For replay lookups
    pub ddl: Option<Value>,         // DDL payload if op == Ddl
    pub timestamp: DateTime<Utc>,   // Event timestamp
    pub checkpoint: Option<CheckpointMeta>,  // Position info
    pub size_bytes: usize,          // For batching
}
}

Schema Registry

Role

The schema registry serves three purposes:

  1. Map table IDs to schemas: Binlog events reference tables by ID; the registry resolves these to full schema metadata
  2. Detect schema changes: Fingerprint comparison identifies when DDL has modified a table
  3. Enable replay: Sequence numbers correlate events with the schema active when they were produced

Schema Sensing

At startup, the schema loader auto-discovers tables via pattern expansion and loads their schemas from the live database catalog before any CDC events arrive.

Pattern expansion supports wildcards:

PatternMatches
db.tableExact table
db.*All tables in database
db.prefix%Tables matching prefix
%.tableTable in any database

DDL detection works through cache invalidation. When the binlog delivers a QueryEvent (DDL), the affected database’s cache is cleared. On the next row event for that table, the schema is re-fetched from INFORMATION_SCHEMA, fingerprinted, and registered as a new version. No separate DDL history log is maintained — the live catalog is always the source of truth.

Failover reconciliation (verifying schemas against a new primary after server identity change) is planned but not yet implemented.

Schema versions are persisted via DurableSchemaRegistry, which uses the StorageBackend Log primitive. On startup the log is replayed to populate an in-memory cache, so hot-path reads have the same performance as the previous in-memory implementation. Cold-start reconstruction is always possible from the log alone.

Schema Registration Flow

1. Schema loader fetches from INFORMATION_SCHEMA
        │
        ▼
2. Compute fingerprint (SHA-256 of structure)
        │
        ▼
3. Check registry for existing schema with same fingerprint
        │
        ├── Found: Return existing version (idempotent)
        │
        └── Not found: Allocate new version number
                │
                ▼
4. Store with: version, fingerprint, JSON, timestamp, sequence, checkpoint

Sequence Numbers

The registry maintains a global monotonic counter. Each schema version receives a sequence number at registration. Events carry this sequence, enabling accurate schema lookup during replay:

Timeline:
─────────────────────────────────────────────────────────────►
     │              │                    │
Schema v1      Schema v2           Schema v3
(seq=1)        (seq=15)            (seq=42)
     │              │                    │
     └──events 1-14─┘──events 15-41─────┘──events 42+──►

Replay at seq=20: Use schema v2 (registered at seq=15, before seq=42)

Checkpoint Store

Timing Guarantee

The checkpoint is saved only after sinks acknowledge delivery:

┌────────┐   events   ┌────────┐   ack    ┌────────────┐
│ Source │ ─────────▶ │  Sink  │ ───────▶ │ Checkpoint │
└────────┘            └────────┘          │   Store    │
                                          └────────────┘

If the process crashes after sending to sink but before checkpoint, events will be replayed. This is the “at-least-once” guarantee — duplicates are possible, but loss is not.

Storage Backends

Checkpoints are stored via BackendCheckpointStore, a thin adapter over the StorageBackend KV primitive. See Storage for backend configuration and the full namespace map.

BackendPersistenceUse Case
SqliteStorageBackendSQLite fileSingle-instance production
MemoryStorageBackendNoneTesting, ephemeral deployments
PostgresStorageBackendExternal DBHA, multi-instance

Checkpoint-Schema Correlation

When registering schemas, the current checkpoint can be attached:

#![allow(unused)]
fn main() {
registry.register_with_checkpoint(
    tenant, db, table,
    &fingerprint,
    &schema_json,
    Some(&checkpoint_bytes),  // Current binlog position
).await?;
}

This creates a link between schema versions and source positions, enabling accurate schema lookup during replay.

Coordinator

The coordinator orchestrates event flow between source and sinks:

Batching

Events are accumulated until a threshold triggers flush:

  • max_events: Event count limit
  • max_bytes: Total serialized size limit
  • max_ms: Time since batch started
  • respect_source_tx: Never split source transactions

Commit Policy

When multiple sinks are configured, the commit policy determines when the checkpoint advances:

#![allow(unused)]
fn main() {
match policy {
    All => required_acks == total_sinks,
    Required => required_acks == sinks.filter(|s| s.required).count(),
    Quorum(n) => required_acks >= n,
}
}

Processor Pipeline

Processors run in declared order, transforming batches:

events ──▶ Processor 1 ──▶ Processor 2 ──▶ ... ──▶ transformed events

Each processor can filter, transform, or enrich events. The JavaScript processor uses deno_core for sandboxed execution.

Hot Paths

Critical performance paths have been optimized:

  1. Event construction - Minimal allocations, reuse buffers
  2. Checkpoint serialization - Opaque bytes avoid repeated JSON encoding
  3. Sink delivery - Batch operations reduce round trips
  4. Schema lookup - In-memory cache with stable fingerprints

Benchmarking

Performance is tracked via:

  • Micro-benchmarks for specific operations
  • End-to-end benchmarks using the Coordinator component
  • Regression detection in CI

Future Architecture

Planned enhancements:

  • Initial snapshot/backfill using the Slot primitive for cursor tracking
  • Event store: time-based replay and schema evolution using the Log primitive
  • Distributed coordination: leader election via the Slot primitive with TTL-based leases
  • Additional sources: MongoDB, SQL Server, TiDB
  • PostgreSQL storage validation: chaos/recovery testing to bring it to production parity with SQLite

Checkpoints

Checkpoints record pipeline progress so ingestion can resume from the last successfully delivered position. DeltaForge guarantees at-least-once delivery by saving checkpoints only after events have been acknowledged by sinks.

Core Guarantee: At-Least-Once Delivery

Checkpoints are only saved after events have been successfully delivered to sinks.

If a checkpoint were saved before delivery, a crash between those two points would silently lose events. DeltaForge prevents this by always checkpointing after sink acknowledgment.

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Source    │────▶│  Processor  │────▶│    Sink     │────▶│ Checkpoint  │
│   (read)    │     │ (transform) │     │  (deliver)  │     │   (save)    │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘

On crash: events since the last checkpoint are replayed. Consumers should be idempotent or use deduplication.

Storage

Checkpoints are stored via the unified StorageBackend - see Storage for backend configuration. All pipelines share the same storage backend; each pipeline’s checkpoint is keyed by its source ID.

BackendPersistenceUse Case
SqliteStorageBackendSQLite file on diskSingle-instance production
MemoryStorageBackendNone (lost on restart)Testing
PostgresStorageBackendExternal databaseHA, multi-instance

Checkpoint Contents

MySQL

Tracks binlog position:

file:     binlog.000042
pos:      12345
gtid_set: (optional, if GTID replication is enabled)

PostgreSQL

Tracks replication stream LSN:

lsn:    0/1A2B3C4D
tx_id:  (optional)

Checkpoint-Schema Correlation

When a schema change is detected, the current checkpoint position is recorded alongside the new schema version. This ensures that during replay, each event is interpreted with the schema that was active when it was produced - even if the table has since been altered.

Commit Policy

When multiple sinks are configured, the commit policy controls when the checkpoint advances:

PolicyBehaviour
allAll sinks must acknowledge
requiredOnly sinks marked required: true must acknowledge
quorum(n)At least n sinks must acknowledge

Set required: true only on sinks where delivery is mandatory for correctness. Optional sinks can fail without blocking the checkpoint.

Operations

Inspecting Checkpoints

sqlite3 ./data/deltaforge.db \
  "SELECT key, length(value) FROM kv WHERE namespace = 'checkpoints';"

Resetting a Pipeline

To force a pipeline to re-read from the beginning, delete its checkpoint:

# Via API
curl -X DELETE http://localhost:8080/pipelines/{name}/checkpoint

# Directly in SQLite
sqlite3 ./data/deltaforge.db \
  "DELETE FROM kv WHERE namespace = 'checkpoints' AND key = '{source-id}';"

Best Practices

  • Back up deltaforge.db regularly - it contains both checkpoints and schema history
  • Monitor checkpoint lag to detect stuck pipelines
  • Use smaller batch sizes for more frequent checkpoints at the cost of throughput
  • Test recovery by killing the process and verifying no events are lost or duplicated

Storage

DeltaForge uses a unified storage layer that backs all runtime state: checkpoints, schema registry, FSM state, leases, and quarantine queues. A single StorageBackend instance is shared across subsystems, so there is one operational concern instead of many.

Backends

BackendPersistenceUse Case
MemoryStorageBackendNone (lost on restart)Testing, ephemeral deployments
SqliteStorageBackendSQLite file on diskSingle-instance production
PostgresStorageBackendExternal databaseHA, multi-instance deployments

Configuration

The backend is selected via CLI flags:

# SQLite (default for production)
deltaforge --config pipeline.yaml --storage-backend sqlite --storage-path ./data/deltaforge.db

# In-memory (testing only — all state lost on restart)
deltaforge --config pipeline.yaml --storage-backend memory

Or via the config file:

storage:
  backend: sqlite
  path: ./data/deltaforge.db

# or:
storage:
  backend: postgres
  dsn: "host=localhost dbname=deltaforge_storage user=df password=secret"

The SQLite backend creates the database file and parent directories automatically on first start. The PostgreSQL backend runs schema migrations on first connect - no manual table creation is needed.

PostgreSQL backend is implemented and available under the postgres feature flag, but has not yet received the same chaos/recovery validation as the SQLite backend. Treat it as beta for production use.

Primitives

The StorageBackend trait exposes four primitives. All operations are namespaced - different subsystems share the same backend without key collisions.

KV (Key-Value with optional TTL)

General-purpose persistent key-value store. Used for checkpoints, FSM state, and leases.

#![allow(unused)]
fn main() {
backend.kv_put("checkpoints", "mysql-pipe1", &bytes).await?;
backend.kv_put_with_ttl("leases", "pipe1", b"alive", ttl_secs).await?;
backend.kv_get("checkpoints", "mysql-pipe1").await?;
backend.kv_delete("checkpoints", "mysql-pipe1").await?;
backend.kv_list("checkpoints", Some("mysql-")).await?; // prefix filter
}

Log (Append-only, globally sequenced)

Immutable append-only log with a global monotonic sequence counter. Used for the schema registry. The sequence is global across all keys in the namespace - interleaved appends to different keys produce strictly increasing sequence numbers.

#![allow(unused)]
fn main() {
let seq = backend.log_append("schemas", "acme/shop/orders", &entry).await?;
let all  = backend.log_list("schemas", "acme/shop/orders").await?;
let tail = backend.log_since("schemas", "acme/shop/orders", since_seq).await?;
}

Slot (Versioned, compare-and-swap)

Single versioned value with CAS semantics. Used for snapshot cursors and leader election. Concurrent CAS operations with the same expected version are serialized - exactly one wins.

#![allow(unused)]
fn main() {
let ver = backend.slot_upsert("snapshots", "pipe/orders", &state).await?;
let won = backend.slot_cas("snapshots", "pipe/orders", ver, &new_state).await?;
let cur = backend.slot_get("snapshots", "pipe/orders").await?; // (version, bytes)
}

Queue (Ordered, ack-based)

Ordered queue with explicit acknowledgement. Used for quarantine and DLQ. Items are not removed until acked; oldest items can be dropped when the queue is full.

#![allow(unused)]
fn main() {
let id = backend.queue_push("quarantine", "pipe/orders", &event_bytes).await?;
let items = backend.queue_peek("quarantine", "pipe/orders", 10).await?;
backend.queue_ack("quarantine", "pipe/orders", id).await?;
backend.queue_drop_oldest("quarantine", "pipe/orders", n).await?;
let len = backend.queue_len("quarantine", "pipe/orders").await?;
}

Namespace Map

Each subsystem owns a fixed namespace. Keys within a namespace never collide across subsystems.

NamespacePrimitiveUsed byKey format
checkpointsKVBackendCheckpointStore{source-id}
schemasLogDurableSchemaRegistry{tenant}/{db}/{table}
schemasKVDurableSchemaRegistry{tenant}/{db}/{table} (index)
snapshotsSlotSnapshot/backfill (planned){pipeline}/{table}
fsmKVFSM state (planned){pipeline}
leasesKV + TTLLeader election (planned){pipeline}
quarantineQueueQuarantine (planned){pipeline}/{table}
dlqQueueDead-letter queue (planned){pipeline}

Adapters

Existing pipeline code uses higher-level interfaces - it never touches the StorageBackend directly.

BackendCheckpointStore

Implements the CheckpointStore trait on top of KV. Drop-in replacement for the old FileCheckpointStore and SqliteCheckpointStore.

#![allow(unused)]
fn main() {
let store = BackendCheckpointStore::new(Arc::clone(&backend));
// CheckpointStore trait methods work unchanged
store.put_raw("mysql-pipe1", &bytes).await?;
store.get_raw("mysql-pipe1").await?;
}

DurableSchemaRegistry

Implements the schema registry on top of the Log primitive. On startup it replays the log to populate an in-memory cache, so hot-path reads are identical in performance to the old InMemoryRegistry. Writes go through the log, enabling cold-start reconstruction.

#![allow(unused)]
fn main() {
// Production: async construction with log replay
let registry = DurableSchemaRegistry::new(Arc::clone(&backend)).await?;

// Tests: sync construction, no replay (MemoryStorageBackend, empty log)
let registry = DurableSchemaRegistry::for_testing();
}

PipelineManager Wiring

PipelineManager::with_backend() wires both subsystems from a single backend:

#![allow(unused)]
fn main() {
let backend: ArcStorageBackend = SqliteStorageBackend::open("./data/deltaforge.db")?;
let manager = PipelineManager::with_backend(backend).await?;
}

Internally this creates a BackendCheckpointStore and a DurableSchemaRegistry from the same backend instance. All pipelines share the same storage file.

Schema Registry

DeltaForge’s schema registry tracks table schemas across time, enabling accurate event interpretation during replay and providing change detection for DDL operations.

Design Philosophy: Source-Owned Schemas

DeltaForge takes a fundamentally different approach to schema handling than many CDC tools. Rather than normalizing all database schemas into a universal type system, each source defines and owns its schema semantics.

This means:

  • MySQL schemas capture MySQL semantics - column types like bigint(20) unsigned, varchar(255), and json are preserved exactly as MySQL defines them
  • PostgreSQL schemas capture PostgreSQL semantics - arrays, custom types, and pg-specific attributes remain intact
  • No lossy normalization - you don’t lose precision or database-specific information by forcing everything into a common format

This design avoids the maintenance burden of keeping a universal type system synchronized across all databases, and it ensures that downstream consumers receive schemas that accurately reflect the source database’s capabilities and constraints.

The SourceSchema Trait

Every CDC source implements the SourceSchema trait, which provides a common interface for fingerprinting and column access while allowing source-specific schema representations:

#![allow(unused)]
fn main() {
pub trait SourceSchema: Serialize + DeserializeOwned + Send + Sync {
    /// Source type identifier (e.g., "mysql", "postgres", "mongodb").
    fn source_kind(&self) -> &'static str;

    /// Content-addressable fingerprint for change detection.
    /// Two schemas with the same fingerprint are identical.
    fn fingerprint(&self) -> String;

    /// Column/field names in ordinal order.
    fn column_names(&self) -> Vec<&str>;

    /// Primary key column names.
    fn primary_key(&self) -> Vec<&str>;

    /// Human-readable description.
    fn describe(&self) -> String;
}
}

Fingerprinting

Schema fingerprints use SHA-256 hashing over JSON-serialized content to provide:

  • Stability - the same schema always produces the same fingerprint
  • Change detection - any structural change produces a different fingerprint
  • Content-addressability - fingerprints can be used as cache keys or deduplication identifiers
#![allow(unused)]
fn main() {
pub fn compute_fingerprint<T: Serialize>(value: &T) -> String {
    let json = serde_json::to_vec(value).unwrap_or_default();
    let hash = Sha256::digest(&json);
    format!("sha256:{}", hex::encode(hash))
}
}

The fingerprint only includes structurally significant fields. For MySQL, this means columns and primary key are included, but engine and charset are excluded since they don’t affect how CDC events should be interpreted.

MySQL Schema Implementation

The MySqlTableSchema struct captures comprehensive MySQL table metadata:

#![allow(unused)]
fn main() {
pub struct MySqlTableSchema {
    /// Columns in ordinal order
    pub columns: Vec<MySqlColumn>,
    
    /// Primary key column names
    pub primary_key: Vec<String>,
    
    /// Storage engine (InnoDB, MyISAM, etc.)
    pub engine: Option<String>,
    
    /// Default charset
    pub charset: Option<String>,
    
    /// Default collation
    pub collation: Option<String>,
}

pub struct MySqlColumn {
    pub name: String,
    pub column_type: String,      // e.g., "bigint(20) unsigned"
    pub data_type: String,        // e.g., "bigint"
    pub nullable: bool,
    pub ordinal_position: u32,
    pub default_value: Option<String>,
    pub extra: Option<String>,    // e.g., "auto_increment"
    pub comment: Option<String>,
    pub char_max_length: Option<i64>,
    pub numeric_precision: Option<i64>,
    pub numeric_scale: Option<i64>,
}
}

Schema information is fetched from INFORMATION_SCHEMA at startup and cached for the pipeline’s lifetime.

Schema Registry Architecture

The schema registry serves three core functions:

  1. Version tracking - maintains a history of schema versions per table
  2. Change detection - compares fingerprints to detect DDL changes
  3. Replay correlation - associates schemas with checkpoint positions for accurate replay

Schema Versions

Each registered schema version includes:

FieldDescription
versionPer-table version number (starts at 1)
hashContent fingerprint for deduplication
schema_jsonFull schema as JSON
registered_atRegistration timestamp
sequenceGlobal monotonic sequence number
checkpointSource checkpoint at registration time

Sequence Numbers for Replay

The registry maintains a global monotonic sequence counter. When a schema is registered, it receives the next sequence number. Events carry this sequence number, enabling the replay engine to look up the correct schema version:

#![allow(unused)]
fn main() {
// During replay: find schema active at event's sequence
let schema = registry.get_at_sequence(tenant, db, table, event.schema_sequence);
}

This ensures events are always interpreted with the schema that was active when they were produced, even if the table structure has since changed.

Checkpoint Correlation

Schemas can be registered with an associated checkpoint, creating a correlation between schema versions and source positions:

#![allow(unused)]
fn main() {
registry.register_with_checkpoint(
    tenant, db, table, 
    &fingerprint, 
    &schema_json,
    Some(checkpoint_bytes),  // Optional: binlog position when schema was observed
).await?;
}

This correlation supports scenarios like:

  • Replaying events from a specific checkpoint with the correct schema
  • Determining which schema was active at a particular binlog position
  • Rolling back schema state along with checkpoint rollback

Schema Loader

The MySqlSchemaLoader handles schema discovery and caching:

Pattern Expansion

Tables are specified using patterns that support wildcards:

PatternDescription
db.tableExact match
db.*All tables in database
db.prefix%Tables starting with prefix
%.tableTable in any database

Preloading

At startup, the loader expands patterns and preloads all matching schemas:

#![allow(unused)]
fn main() {
let schema_loader = MySqlSchemaLoader::new(dsn, registry, tenant);
let tracked_tables = schema_loader.preload(&["shop.orders", "shop.order_%"]).await?;
}

This ensures schemas are available before the first CDC event arrives.

Caching

Loaded schemas are cached to avoid repeated INFORMATION_SCHEMA queries:

#![allow(unused)]
fn main() {
// Fast path: return cached schema
if let Some(cached) = cache.get(&(db, table)) {
    return Ok(cached.clone());
}

// Slow path: fetch from database, register, cache
let schema = fetch_schema(db, table).await?;
let version = registry.register(...).await?;
cache.insert((db, table), loaded_schema);
}

DDL Handling

When the binlog contains DDL events, the schema loader responds:

DDL TypeAction
CREATE TABLESchema loaded on first row event
ALTER TABLECache invalidated, reloaded on next row
DROP TABLECache entry removed
TRUNCATENo schema change
RENAME TABLEOld removed, new loaded on first row

DDL detection uses the QueryEvent type in the binlog. On DDL, the entire database’s schema cache is invalidated since MySQL doesn’t always specify the exact table in DDL events.

API Endpoints

Reload Schemas

Force reload schemas from the database:

curl -X POST http://localhost:8080/pipelines/{name}/schemas/reload

This clears the cache and re-fetches schemas for all tracked tables.

List Cached Schemas

View currently cached schemas:

curl http://localhost:8080/pipelines/{name}/schemas

Limitations

  • In-memory registry - Schema versions are lost on restart. Persistent backends (SQLite, then PostgreSQL for HA) are planned.
  • No cross-pipeline sharing - Each pipeline maintains its own registry instance
  • Pattern expansion at startup - New tables matching patterns require pipeline restart or reload

Best Practices

  1. Use explicit table patterns in production to avoid accidentally capturing unwanted tables
  2. Monitor schema reload times - slow reloads may indicate overly broad patterns
  3. Trigger schema reload after DDL if your deployment process modifies schemas
  4. Include schema version in downstream events for consumers that need schema evolution awareness

Troubleshooting

Unknown table_id Errors

WARN write_rows for unknown table_id, table_id=42

The binlog contains row events for a table not in the table_map. This happens when:

  • A table was created after the CDC stream started
  • Table patterns don’t match the table

Solution: Trigger a schema reload via the REST API.

Schema Fetch Returned 0 Columns

WARN schema fetch returned 0 columns, db=shop, table=orders

Usually indicates:

  • Table doesn’t exist
  • MySQL user lacks SELECT privilege on INFORMATION_SCHEMA
  • Table was dropped between detection and schema load

Slow Schema Loading

WARN slow schema fetch, db=shop, table=orders, ms=350

Consider:

  • Narrowing table patterns to reduce the number of tables
  • Using exact table names instead of wildcards
  • Verifying network latency to the MySQL server

Confluent Schema Registry Integration

DeltaForge supports the Confluent Schema Registry as the external schema store for Avro encoding. This is separate from DeltaForge’s internal schema registry (described above) which tracks source table schemas.

How it works

When a sink is configured with encoding: { type: avro, schema_registry_url: "..." }:

  1. Schema derivation: DeltaForge derives an Avro schema from the first event’s JSON structure
  2. Registration: The schema is registered with the Confluent Schema Registry (idempotent — same schema returns the same ID)
  3. Caching: The schema ID is cached per subject, so subsequent events skip the HTTP call
  4. Wire format: Events are encoded as [0x00][schema_id:4][avro_binary] (Confluent wire format)

Relationship to internal schema registry

AspectDeltaForge Internal RegistryConfluent Schema Registry
PurposeTrack source table schema evolutionStore Avro schemas for consumers
SchemasSource-owned (MySQL/Postgres types)Avro schemas (derived from event structure)
StorageEmbedded (SQLite/Postgres)External HTTP service
Used bySchema sensing, replay correlationKafka Connect, ksqlDB, Flink

The two registries serve complementary purposes — the internal registry tracks what the source looks like, while the Confluent registry stores what consumers need to decode the wire format.

Configuration

See the Envelopes and Encodings page for full Avro encoding configuration, including subject naming strategies and authentication.

Schema Sensing

Schema sensing automatically infers and tracks schema structure from JSON event payloads. This complements the schema registry by discovering schema from data rather than database metadata.

When to Use Schema Sensing

Schema sensing is useful when:

  • Source doesn’t provide schema: Some sources emit JSON without metadata
  • JSON columns: Database JSON/JSONB columns have dynamic structure
  • Schema evolution tracking: Detect when payload structure changes over time
  • Downstream integration: Generate JSON Schema for consumers
  • Dynamic map keys: Session IDs, trace IDs, or other high-cardinality keys in JSON

How It Works

┌──────────────┐     ┌─────────────────┐     ┌──────────────────┐
│    Event     │────▶│  Schema Sensor  │────▶│  Inferred Schema │
│   Payload    │     │   (sampling)    │     │   + Fingerprint  │
└──────────────┘     └─────────────────┘     └──────────────────┘
                              │
                              ▼
                     ┌─────────────────┐
                     │ Structure Cache │
                     │ + HC Classifier │
                     └─────────────────┘
  1. Observation: Events flow through the sensor during batch processing
  2. Sampling: Not every event is fully analyzed (configurable rate)
  3. Deep inspection: Nested JSON structures are recursively analyzed
  4. High-cardinality detection: Dynamic map keys are classified and normalized
  5. Fingerprinting: Schema changes are detected via SHA-256 fingerprints
  6. Caching: Repeated structures skip full analysis for performance

High-Cardinality Key Handling

JSON payloads often contain dynamic keys like session IDs, trace IDs, or user-generated identifiers:

{
  "id": 1,
  "sessions": {
    "sess_abc123": {"user_id": 42, "started_at": 1700000000},
    "sess_xyz789": {"user_id": 43, "started_at": 1700000001}
  }
}

Without special handling, each unique key (sess_abc123, sess_xyz789) triggers a “schema evolution” event, causing:

  • 0% cache hit rate
  • Constant false evolution alerts
  • Unbounded schema growth

How It Works

DeltaForge uses probabilistic data structures (HyperLogLog, SpaceSaving) to classify fields:

ClassificationDescriptionExample
Stable fieldsAppear in most eventsid, type, timestamp
Dynamic fieldsUnique per event, high cardinalitysess_*, trace_*, uuid_*

When dynamic fields are detected, the schema sensor:

  1. Normalizes keys: Replaces sess_abc123 with <dynamic> placeholder
  2. Uses adaptive hashing: Structure cache ignores dynamic key names
  3. Produces stable fingerprints: Same schema despite different keys

Results

ScenarioWithout HCWith HC
Nested dynamic keys100% evolution rate<1% evolution rate
Top-level dynamic keys0% cache hits>99% cache hits
Stable structsBaseline~20% overhead during warmup, then ~0%

Configuration

Example

spec:
  schema_sensing:
    enabled: true
    
    deep_inspect:
      enabled: true
      max_depth: 3
      max_sample_size: 500
    
    sampling:
      warmup_events: 50
      sample_rate: 5
      structure_cache: true
      structure_cache_size: 50
    
    high_cardinality:
      enabled: true
      min_events: 100
      stable_threshold: 0.5
      min_dynamic_fields: 5
      confidence_threshold: 0.7
      reevaluate_interval: 10000

Options

FieldTypeDefaultDescription
enabledboolfalseEnable schema sensing
deep_inspect
enabledboolfalseInspect nested JSON
max_depthint3Max nesting depth
max_sample_sizeint500Max events for deep analysis
sampling
warmup_eventsint50Full analysis before sampling
sample_rateint5After warmup, analyze 1 in N
structure_cachebooltrueCache structure hashes
structure_cache_sizeint50Max cached per table
high_cardinality
enabledbooltrueDetect dynamic map keys
min_eventsint100Events before classification
stable_thresholdfloat0.5Frequency for stable fields
min_dynamic_fieldsint5Min unique for map detection
confidence_thresholdfloat0.7Required confidence
reevaluate_intervalint10000Re-check interval (0=never)

Inferred Types

Schema sensing infers these JSON types:

TypeDescription
nullJSON null value
booleantrue/false
integerWhole numbers
numberFloating point numbers
stringText values
arrayJSON arrays (element types tracked)
objectNested objects (fields recursively analyzed)

For fields with varying types across events, all observed types are recorded.

Schema Evolution

When schema structure changes, the sensor:

  1. Detects change: Fingerprint differs from previous version
  2. Increments sequence: Monotonic version number increases
  3. Logs evolution: Emits structured log with old/new fingerprints
  4. Updates cache: New structure becomes current

Evolution events are available via the REST API and can trigger alerts.

Stabilization

After observing enough events, a schema “stabilizes”:

  • Warmup phase completes
  • Structure stops changing
  • Sampling rate takes effect
  • Cache hit rate increases

Stabilized schemas have stabilized: true in API responses.

API Access

List Inferred Schemas

curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas

Get Schema Details

curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders

Export as JSON Schema

curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders/json-schema

Cache Statistics

curl http://localhost:8080/pipelines/my-pipeline/sensing/stats

Dynamic Map Classifications

curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders/classifications

Response:

{
  "table": "orders",
  "paths": {
    "": {"stable_fields": ["id", "type", "timestamp"], "has_dynamic_fields": false},
    "sessions": {"stable_fields": [], "has_dynamic_fields": true, "unique_keys": 1523},
    "metadata": {"stable_fields": ["version"], "has_dynamic_fields": true, "unique_keys": 42}
  }
}

Drift Detection

Schema sensing integrates with drift detection to compare:

  • Expected schema: From database metadata (schema registry)
  • Observed schema: From event payloads (schema sensing)

When mismatches occur, drift events are recorded:

Drift TypeDescription
unexpected_nullNon-nullable column has null values
type_mismatchObserved type differs from declared type
undeclared_columnField in data not in schema
missing_columnSchema field never seen in data
json_structure_changeJSON column structure changed

Access drift data via:

curl http://localhost:8080/pipelines/my-pipeline/drift

Performance Considerations

Sampling Tradeoffs

SettingEffect
Higher warmup_eventsBetter initial accuracy, slower stabilization
Higher sample_rateLower CPU usage, slower evolution detection
Larger structure_cache_sizeMore memory, better hit rate

High-throughput pipelines (>10k events/sec):

sampling:
  warmup_events: 100
  sample_rate: 10
  structure_cache: true
  structure_cache_size: 100
high_cardinality:
  enabled: true
  min_events: 200

Schema evolution monitoring:

sampling:
  warmup_events: 25
  sample_rate: 2
  structure_cache: true
high_cardinality:
  enabled: true
  min_events: 50

Payloads with dynamic keys (session stores, feature flags):

sampling:
  structure_cache: true
  structure_cache_size: 50
high_cardinality:
  enabled: true
  min_events: 100
  min_dynamic_fields: 3
  stable_threshold: 0.5

Development/debugging:

sampling:
  warmup_events: 10
  sample_rate: 1  # Analyze every event
high_cardinality:
  enabled: true
  min_events: 20  # Faster classification

Example: JSON Column Sensing

For tables with JSON columns, sensing reveals the internal structure:

# Database schema shows: metadata JSON
# Sensing reveals:
fields:
  - name: "metadata.user_agent"
    types: ["string"]
    nullable: false
  - name: "metadata.ip_address"
    types: ["string"]
    nullable: true
  - name: "metadata.tags"
    types: ["array"]
    array_element_types: ["string"]

This enables downstream consumers to understand JSON column structure without manual documentation.

Metrics

Schema sensing emits these Prometheus metrics:

MetricTypeLabelsDescription
deltaforge_schema_events_totalCountertableTotal events observed
deltaforge_schema_cache_hits_totalCountertableStructure cache hits
deltaforge_schema_cache_misses_totalCountertableStructure cache misses
deltaforge_schema_evolutions_totalCountertableSchema evolutions detected
deltaforge_schema_tables_totalGauge-Tables with detected schemas
deltaforge_schema_dynamic_maps_totalGauge-Paths classified as dynamic maps
deltaforge_schema_sensing_secondsHistogramtablePer-event sensing latency

Example Queries

# Cache hit rate per table
sum(rate(deltaforge_schema_cache_hits_total[5m])) by (table)
/
sum(rate(deltaforge_schema_events_total[5m])) by (table)

# Schema evolution rate (should be near zero after warmup)
sum(rate(deltaforge_schema_evolutions_total[5m])) by (table)

# P99 sensing latency
histogram_quantile(0.99, rate(deltaforge_schema_sensing_seconds_bucket[5m]))

Failover Handling

DeltaForge detects database failover automatically and resumes streaming on the new primary without operator intervention. This page explains how detection works, what happens during reconciliation, and how to configure behaviour when the new primary has a different schema.

How Detection Works

Every time the source reconnects - at startup or after a transient error - it queries the server’s stable identity:

  • MySQL: @@server_uuid from performance_schema.replication_group_members
  • PostgreSQL: system_identifier from pg_control_system()

The result is compared against the value stored in DeltaForge’s storage backend. Three outcomes are possible:

ResultMeaningAction
FirstSeenNo identity stored yetStore and continue
SameSame server as beforeVerify checkpoint GTID is still reachable, then continue
ChangedServer identity differsRun failover reconciliation

Identity is written to the durable storage backend (SQLite or PostgreSQL), so it survives process restarts and is correctly preserved across pipeline reloads.

What Happens During Failover

When a Changed identity is detected, DeltaForge runs reconciliation before allowing any events to flow. Reconciliation is idempotent - if the process dies mid-run, it will re-execute correctly on the next startup.

1. Position reachability check

DeltaForge verifies that the checkpoint position from the old primary still exists on the new primary:

  • MySQL: checks whether the GTID set from the last checkpoint is present in B’s executed GTID history or purged range
  • PostgreSQL: checks whether the replication slot’s confirmed_flush_lsn is reachable

If the position is confirmed lost, the source stops immediately with an error and /health returns 503. This covers two distinct cases:

  • Server changed (Changed): B’s GTID history does not contain A’s checkpoint (e.g. B was a lagging replica).
  • Same server, history wiped (Same): RESET BINARY LOGS AND GTIDS was run on the same server, clearing all GTID state without changing the server UUID. DeltaForge detects this on the first reconnect by checking GTID_SUBSET(checkpoint, @@gtid_executed).

In both cases the error message is:

position lost: <reason>. Re-snapshot required.

Silently skipping data is worse than halting. Restart the pipeline with a fresh snapshot to recover.

If reachability cannot be determined (e.g. the health query fails transiently), DeltaForge logs a warning and continues — it does not halt on uncertainty.

2. Schema drift detection

DeltaForge compares the schema last registered from the old primary against the live catalog on the new primary. Any column additions, removals, or renames are recorded as a ReconcileRecord in the storage backend.

If drift is found, the schema cache is invalidated so the next row event triggers a fresh load with the correct column mapping.

3. Resume

After reconciliation, DeltaForge stores B’s identity and resumes streaming. The first events from B use the updated schema.

Position Adjustment

A subtle but critical detail: simply reconnecting at A’s checkpoint position can cause data loss on its own, before reconciliation even runs.

MySQL: B rejects A’s GTID set at the protocol level with “purged required binary logs”. DeltaForge detects the identity change before opening the binlog stream, resolves B’s current binlog tail via SHOW BINARY LOG STATUS, and connects there instead. A’s original GTID checkpoint is preserved separately for the reachability check.

PostgreSQL: START_REPLICATION at A’s LSN immediately advances the slot’s confirmed_flush_lsn to max(A_checkpoint, slot_lsn). If B’s slot was created at an LSN behind A’s checkpoint, any changes B committed in that gap are permanently discarded - even if you reconnect at the correct LSN afterwards. DeltaForge detects the identity change before opening the replication stream and fetches the slot’s actual confirmed_flush_lsn to use as the start position instead.

In both cases the original checkpoint is preserved for the reachability check, separate from the adjusted streaming position.

Schema Drift Policy

By default DeltaForge adapts to the new primary’s schema and continues streaming. This is safe for additive drift (B has a new column A didn’t have) but can be risky if B is missing columns that A had - row events encoded against A’s schema may decode incorrectly against B’s.

The on_schema_drift field controls this behaviour:

source:
  type: mysql
  config:
    id: my-pipeline
    dsn: ${MYSQL_DSN}
    tables: [shop.orders]
    on_schema_drift: halt   # default: adapt
ValueBehaviour
adaptRecord drift, reload schema cache, continue streaming. Default.
haltStop the source when any schema drift is detected. Requires operator intervention.

When halt fires, the reconciliation record is persisted before the source stops - you can inspect what changed before restarting:

schema drift detected after failover and on_schema_drift=halt.
Verify B's schema and apply any missing migrations before restarting.

Use halt when your failover environments do not guarantee DDL sync to replicas before promotion.

What DeltaForge Does Not Handle

DSN switching is external. DeltaForge detects a new server by comparing identities, not by monitoring cluster topology. The DSN must already point to B before the pipeline reconnects - this is typically handled by a load balancer VIP, DNS failover, or connection proxy. If the DSN still resolves to A, the pipeline will retry A’s dead connection rather than discovering B.

Data loss from replica lag is not recoverable. If B was a lagging replica and never received transactions that A committed before failing, those rows are gone at the database level. DeltaForge can detect the position gap but cannot reconstruct missing data. A re-snapshot from B is required in this case.

Mid-flight DDL during active streaming is handled separately by the normal schema reload mechanism, not by failover reconciliation. Failover reconciliation only runs when the server identity changes.

Infrastructure Requirements

For clean automatic failover:

  • MySQL: GTID mode must be enabled (gtid_mode=ON, enforce_gtid_consistency=ON). Without GTID, DeltaForge falls back to file/position coordinates which are meaningless across servers.
  • PostgreSQL: The replication slot must exist on B before the pipeline connects to it. Slots are not automatically transferred during failover - use a slot-aware HA tool (e.g. Patroni with permanent_slots) or pre-create the slot on standbys.
  • Both: The CDC user must exist on B with the same privileges as on A.

Deployment

Docker

docker run --rm \
  -e MYSQL_USER=cdc_user \
  -e MYSQL_PASSWORD=s3cret \
  -v $(pwd)/pipeline.yaml:/etc/deltaforge/pipeline.yaml:ro \
  ghcr.io/vnvo/deltaforge:latest \
  --config /etc/deltaforge/pipeline.yaml

Docker Compose

See the chaos testing environment for a full Docker Compose example with MySQL, Kafka, Prometheus, Grafana, and Loki.

Kubernetes (Helm)

Install

helm install deltaforge ./deploy/helm/deltaforge \
  --set secrets.create=true \
  --set secrets.data.MYSQL_USER=cdc_user \
  --set secrets.data.MYSQL_PASSWORD=s3cret

What it deploys

ResourcePurpose
StatefulSetDeltaForge pod with stable identity
ConfigMapPipeline YAML configuration
PVCPersistent storage for checkpoints + DLQ
ServiceClusterIP with API (8080) and metrics (9000) ports
ServiceAccountPod identity
ServiceMonitorPrometheus Operator integration (optional)
SecretCredentials (optional, dev only)

Secrets

Secrets contain only credentials (username, password, tokens). Connection details (host, port, topic) stay in the pipeline config. Pipeline config uses ${VAR_NAME} shell expansion at startup:

# Pipeline config (in ConfigMap)
dsn: "mysql://${MYSQL_USER}:${MYSQL_PASSWORD}@mysql-primary:3306/orders"
brokers: "kafka:9092"     # not secret — stays in config

For production, create K8s Secrets separately and reference them:

secrets:
  existingSecrets:
    - name: mysql-creds           # keys: MYSQL_USER, MYSQL_PASSWORD
    - name: kafka-sasl-creds      # keys: KAFKA_SASL_USER, KAFKA_SASL_PASSWORD

Health probes

The chart configures liveness and readiness probes automatically:

  • Liveness (/health): returns 200 when healthy, 503 when a pipeline has failed. Triggers pod restart on prolonged failure.
  • Readiness (/ready): returns 200 with pipeline status JSON. Controls traffic routing.

Monitoring

Prometheus annotations are enabled by default (prometheus.io/scrape: "true"). For Prometheus Operator, enable the ServiceMonitor:

serviceMonitor:
  enabled: true
  interval: 15s

Storage

Development/testing: SQLite on a PersistentVolume (default). Simple, no external dependencies.

Production: PostgreSQL is recommended. Benefits:

  • Survives pod rescheduling without PVC migration
  • Proper backup/restore via pg_dump
  • Supports multiple replicas sharing state (future operator/sharding)
  • Better concurrency under high checkpoint commit rates
storage:
  backend: postgres

persistence:
  enabled: false    # no PVC needed with Postgres

secrets:
  existingSecrets:
    - name: deltaforge-storage    # must contain key: STORAGE_DSN

The STORAGE_DSN Secret should contain a PostgreSQL connection string:

postgresql://deltaforge:password@postgres.infra:5432/deltaforge

Create the database and user beforehand:

CREATE USER deltaforge WITH PASSWORD 'password';
CREATE DATABASE deltaforge OWNER deltaforge;

DeltaForge creates its tables automatically on first connection.

Full values reference

See the Helm chart README for all configurable values.

Observability playbook

DeltaForge already ships a Prometheus exporter, structured logging, and a panic hook. The runtime now emits source ingress counters, batching/processor histograms, and sink latency/throughput so operators can build production dashboards immediately. The tables below capture what is wired today and the remaining gaps to make the platform production ready for data and infra engineers.

What exists today

  • Prometheus endpoint served at /metrics (default 0.0.0.0:9000) with descriptors for pipeline counts, source/sink counters, and a stage latency histogram. The recorder is installed automatically when metrics are enabled.
  • Structured logging via tracing_subscriber with JSON output by default, optional targets, and support for RUST_LOG overrides.
  • Panic hook increments a deltaforge_panics_total counter and logs captured panics before delegating to the default hook.

Instrumentation gaps and recommendations

The sections below call out concrete metrics and log events to add per component. All metrics should include pipeline, tenant, and component identifiers where applicable so users can aggregate across fleets.

Sources (MySQL/Postgres)

StatusMetric/logRationale
✅ Implementeddeltaforge_source_events_total{pipeline,source,table} counter increments when MySQL events are handed to the coordinator.Surfaces ingress per table and pipeline.
✅ Implementeddeltaforge_source_reconnects_total{pipeline,source} counter when binlog reads reconnect.Makes retry storms visible.
✅ Implementeddeltaforge_source_lag_seconds{pipeline} gauge — replication lag based on last event timestamp vs. wall clock.Alert when sources fall behind.
✅ Implementeddeltaforge_source_table_lag_seconds{pipeline,table} gauge — per-table replication lag within each batch.Identify which tables are lagging.
🚧 Gapdeltaforge_source_idle_seconds{pipeline,source} gauge updated when no events arrive within the inactivity window.Catch stuck readers before downstream backlogs form.

Coordinator and batching

StatusMetric/logRationale
✅ Implementeddeltaforge_batch_events{pipeline} and deltaforge_batch_bytes{pipeline} histograms in Coordinator::process_deliver_and_maybe_commit.Tune batching policies with data.
✅ Implementeddeltaforge_bytes_total{pipeline} counter — cumulative bytes processed through the pipeline. rate() gives bytes/s throughput.Monitor data volume and bandwidth utilization.
✅ Implementeddeltaforge_source_bytes_total{pipeline,source} counter — cumulative raw bytes received from the source (WAL/binlog).Track source-side data ingestion rate.
✅ Implementeddeltaforge_stage_latency_seconds{pipeline,stage,trigger} histogram for processor stage.Provides batch timing per trigger (timer/limits/shutdown).
✅ Implementeddeltaforge_processor_latency_seconds{pipeline,processor} histogram around every processor invocation.Identify slow user functions.
🚧 Gapdeltaforge_pipeline_channel_depth{pipeline} gauge from mpsc::Sender::capacity()/len().Detect backpressure between sources and coordinator.
✅ Implementeddeltaforge_checkpoints_total{pipeline} counter — successful checkpoint commits.Monitor checkpoint throughput.

Sinks (Kafka/Redis/custom)

StatusMetric/logRationale
✅ Implementeddeltaforge_sink_events_total{pipeline,sink} counter and deltaforge_sink_latency_seconds{pipeline,sink} histogram around each send.Throughput and responsiveness per sink.
✅ Implementeddeltaforge_sink_batch_total{pipeline,sink} counter for send.Number of batches sent per sink.
✅ Implementeddeltaforge_sink_errors_total{pipeline,sink} counter with per-sink error tracking.Alert on sink failures.
✅ Implementeddeltaforge_sink_txn_commits_total{pipeline,sink} counter — Kafka transaction commits/s.Track exactly-once throughput.
✅ Implementeddeltaforge_sink_txn_aborts_total{pipeline,sink} counter — Kafka transaction aborts/s. Should be ~0.Detect fencing or broker issues.
✅ Implementeddeltaforge_sink_checkpoint_status{pipeline,sink} gauge (1=ok, 0=behind).Per-sink checkpoint health.
✅ Implementeddeltaforge_sink_last_checkpoint_ts{pipeline,sink} epoch timestamp.Per-sink checkpoint age.
🚧 GapBackpressure gauge for client buffers (rdkafka queue, Redis pipeline depth).Early signal before errors occur.

Avro encoding (when encoding: avro is configured)

StatusMetric/logRationale
✅ Implementeddeltaforge_avro_encode_total{path} counter — path=ddl (DDL-derived schema) or path=inferred (JSON fallback).Track which schema source is being used. DDL is preferred.
✅ Implementeddeltaforge_avro_schema_registrations_total counter — successful Schema Registry registrations.Monitor schema registration activity.
✅ Implementeddeltaforge_avro_sr_cache_fallback_total counter — events encoded with cached schema because SR was unavailable.Alert on SR connectivity issues.
✅ Implementeddeltaforge_avro_encode_failure_total{reason} counter — reason=sr_unavailable (no cache, can’t encode) or reason=schema_mismatch (DDL changed, old schema can’t encode event).Alert on encoding failures; events with these errors are routed to DLQ.

Pipeline lifecycle

StatusMetric/logRationale
✅ Implementeddeltaforge_pipeline_status{pipeline} gauge reflecting the current lifecycle state of each pipeline.Single gauge to alert on stopped or failed pipelines and drive dashboards.
✅ Implementeddeltaforge_e2e_latency_seconds{pipeline} histogram measuring wall-clock time from when an event was received by DeltaForge to when it was delivered to the sink.Measures pipeline delivery latency independently of source clock precision.
✅ Implementeddeltaforge_source_lag_seconds{pipeline} gauge — replication lag based on event timestamp vs. wall clock.Alert when the source is behind real time.
✅ Implementeddeltaforge_checkpoints_total{pipeline} counter — checkpoint commits/s.Monitor checkpoint throughput.
✅ Implementeddeltaforge_last_checkpoint_ts{pipeline} epoch timestamp — pipeline-level checkpoint age.Alert on stale checkpoints.

deltaforge_pipeline_status value semantics

The gauge uses a numeric encoding so operators can alert on any non-running state with a single threshold:

ValueStateMeaning
1.0runningPipeline is active and processing events
0.5pausedSource connection alive; event processing suspended
0.0stoppedDisconnected from source; checkpoint saved; resumable
< 0failedUnrecoverable error (position lost, server changed, etc.)

Example PromQL:

# Count pipelines in each state
count(deltaforge_pipeline_status == 1)    # running
count(deltaforge_pipeline_status == 0.5) # paused
count(deltaforge_pipeline_status == 0)   # stopped
count(deltaforge_pipeline_status < 0)    # failed

# Alert if any pipeline is not running
count(deltaforge_pipeline_status != 1) > 0

deltaforge_e2e_latency_seconds note

E2E latency is measured from the wall-clock time the event was received and parsed by DeltaForge, not from the binlog header.timestamp. MySQL binlog timestamps have one-second precision, which would introduce up to 1 s of phantom latency in the histogram. Using the internal receive time gives sub-millisecond accuracy regardless of source clock granularity.

The replication lag metric (separate from E2E latency) uses the binlog timestamp and measures how far behind the source is relative to real time — that one-second precision is acceptable for lag alerting.

Dead Letter Queue

StatusMetric/logRationale
✅ Implementeddeltaforge_dlq_events_total{pipeline,sink,error_kind} counter.Track rate of events routed to DLQ.
✅ Implementeddeltaforge_dlq_entries{pipeline} gauge — current unacked entries.Monitor DLQ backlog size.
✅ Implementeddeltaforge_dlq_saturation_ratio{pipeline} gauge (0.0-1.0).Alert at 80% (warning) and 95% (critical).
✅ Implementeddeltaforge_dlq_evicted_total{pipeline} counter — entries lost to drop_oldest overflow.Track data loss from overflow.
✅ Implementeddeltaforge_dlq_rejected_total{pipeline} counter — entries lost to reject overflow.Track data loss from rejection.
✅ Implementeddeltaforge_dlq_write_failures_total{pipeline} counter — DLQ storage failures.Alert on DLQ infrastructure issues.

Control plane and health endpoints

NeedSuggested metric/logRationale
API request accountingdeltaforge_api_requests_total{route,method,status} counter and latency histogram using Axum middleware.Production-grade visibility of operator actions.
Ready/Liveness transitionsLogs with pipeline counts and per-pipeline status when readiness changes.Explain probe failures in log aggregation.
Pipeline lifecycle countersCounters for create/patch/stop/resume actions with success/error labels.Auditable control-plane operations.

Grafana Dashboard

A production-ready Grafana dashboard is included in the repository, optimized for fleet operations with hundreds of pipelines:

Download: deltaforge.json

Import it via Grafana UI → Dashboards → Import → Upload JSON file.

What’s included

RowPanelsPurpose
Fleet OverviewRunning/unhealthy count, total events/s, total data/s, max lag, DLQ total, reconnects, txn aborts, sink errorsOne-glance health across all pipelines
Top PipelinesTop 10 laggiest, top 10 throughput, top 10 DLQ backlogsIdentify outliers without drowning in 300 series
ThroughputAggregate events/s, per-pipeline events/s, data throughputCapacity planning and anomaly detection
Latency & LagE2E latency p50/p95, source lag, per-table lag (top 10)SLA monitoring, identify slow tables
Checkpoints & EOSPer-sink status, commit rate, txn commits/abortsExactly-once health, checkpoint freshness
Dead Letter QueueEntries, events/s, saturation, overflow rateDLQ monitoring and alerting
Errors & ReliabilitySink errors, reconnects, pipeline state timelineIncident detection
Batching & KafkaBatch size, batch bytes, sink latency (collapsed)Tuning reference
InfrastructureContainer CPU, memory (collapsed)Resource monitoring

Template variables

The dashboard includes dropdown filters at the top:

  • Instance — select DeltaForge instances
  • Tenant — filter by tenant (from deltaforge_pipeline_info metric)
  • Pipeline — select specific pipelines
  • Sink — filter by sink

Prerequisites

  • Prometheus scraping DeltaForge metrics on port 9000 (/metrics)
  • Prometheus datasource configured in Grafana as “Prometheus”
  • For container metrics: cAdvisor scraping enabled

Performance Tuning

This guide covers throughput optimization for DeltaForge CDC pipelines, based on profiling and benchmarking with the chaos test suite.

Note: These results and recommendations are a starting point. Every deployment has unique requirements — hardware, network topology, database workload patterns, event sizes, and downstream consumer capacity all affect real-world throughput. Profile your own workload and iterate.

Benchmark Results

Measured on Docker containers on a single developer machine (not dedicated infrastructure), draining a 1-10M row backlog to a single-node Kafka broker.

batch.max_events=16000, batch.max_bytes=16MB, batch.max_inflight=4, linger.ms=0

SourceModeAvg (events/s)Peak (events/s)
MySQLat-least-once151K159K
MySQLexactly-once134K143K
Postgresat-least-once57K64K
Postgresexactly-once53K55K

Exactly-once overhead is ~7-11% when batch sizes are properly tuned.

Why max_bytes matters

These results show the impact of a small max_bytes (3MB) with max_events=8000:

SourceModeAvg (events/s)Peak (events/s)
MySQLat-least-once110K122K
MySQLexactly-once48K57K

A 3MB byte limit caps batches at ~6,000 events regardless of max_events, making transaction commits proportionally expensive. The default max_bytes is 16MB — sufficient for batches up to ~32K events at typical event sizes.

Your numbers will differ based on hardware, network latency, event size, and Kafka/database configuration.

Key Tuning Parameters

Batch Size (batch.max_events + batch.max_bytes)

The single most impactful setting. Larger batches amortize per-batch overhead (Kafka produce, transaction commit, checkpoint write, metrics recording) across more events.

Important: max_events and max_bytes both cap batch size — whichever triggers first wins. If you set max_bytes too low, it will silently cap your batches regardless of max_events. The default (16MB) accommodates batch sizes up to ~32K events at typical event sizes.

Recommended for high-throughput drain/catch-up:

spec:
  batch:
    max_events: 16000
    max_bytes: 16777216   # 16MB — room for 16K events
    max_ms: 100
    max_inflight: 4

For steady-state pipelines with lower latency requirements, the defaults (max_events=2000, max_bytes=3MB) are fine.

Kafka Linger (linger.ms)

Controls how long rdkafka waits before sending a produce request. This is the most common throughput bottleneck when left at high values.

  • linger.ms=20: each small batch waits 20ms before sending, capping throughput
  • linger.ms=5 (sink built-in default): good balance for steady-state
  • linger.ms=0: maximum throughput for drain/catch-up/high-intensity workloads

The internal coordinator enqueues entire batches (hundreds to thousands of messages) in a tight loop, so rdkafka batches naturally without needing linger time. Higher linger values only add idle wait per produce.

Override via client_conf in the sink config:

sinks:
  - type: kafka
    config:
      client_conf:
        linger.ms: "0"

Batch Pipelining (batch.max_inflight)

Controls how many batches can be queued between the accumulation loop and the delivery task. Higher values overlap batch building with Kafka delivery.

  • max_inflight=1: sequential delivery (default)
  • max_inflight=4: recommended for high-throughput workloads (adjust per need)

The delivery task processes batches in FIFO order, so checkpoint and event delivery ordering is always preserved regardless of the inflight setting. This config essentially allows the read from source to continue without waiting for processing in other parts of the pipline.

spec:
  batch:
    max_events: 4000
    max_ms: 100
    max_inflight: 4

Schema Sensing

Disable schema sensing during drain/catch-up for maximum throughput:

spec:
  schema_sensing:
    enabled: false

Re-enable for steady-state operation when schema tracking is needed. Be mindful, schema sensing is a CPU-intensive task.

Proxy Bypass - Chaos/Bench Testing

When running with Toxiproxy (chaos testing), use --no-proxy to bypass the proxy for direct database and Kafka connections. The proxy adds measurable overhead to throughput.

Source-Specific Tuning

MySQL

MySQL binlog is inherently efficient because WriteRowsEvent batches multiple rows into a single event.

MySQL server settings that affect CDC throughput:

  • binlog-row-image=FULL — required for CDC but sends all columns per row. If your use case allows it, MINIMAL reduces binlog event size significantly (only changed columns are sent).
  • binlog_transaction_dependency_tracking=WRITESET — enables parallel replication metadata. While DeltaForge reads sequentially, this can reduce replication lag on replicas feeding DeltaForge.
  • max_allowed_packet — increase if you have large blob/text columns. The default (64MB) is usually sufficient.
  • binlog_expire_logs_seconds — set high enough that DeltaForge can recover from outages without losing its checkpoint position. 7 days is a safe starting point.

DeltaForge settings for MySQL:

  • tables — be specific. Subscribing to *.* forces DeltaForge to process table map events for every table, even those it discards.
  • snapshot.chunk_size — for initial snapshots of large tables, increase chunk size (default 10,000) to reduce round trips.

PostgreSQL

Postgres logical replication (pgoutput) sends one WAL message per row change, making it more per-message intensive than MySQL binlog.

PostgreSQL server settings that affect CDC throughput:

  • wal_level=logical — required. No throughput impact vs. replica.
  • max_wal_senders — ensure enough slots for DeltaForge plus any replicas. Default (10) is usually sufficient.
  • wal_sender_timeout — increase from the default (60s) if DeltaForge pauses processing for extended periods (e.g., during pipeline restarts). 300s is a safer value.
  • wal_keep_size — set large enough to cover outage windows. If DeltaForge disconnects and WAL is recycled, the replication slot becomes invalid and requires re-snapshot.
  • Replica identityALTER TABLE ... REPLICA IDENTITY FULL sends full row images for updates/deletes. DEFAULT (primary key only) reduces WAL message size but limits the before image in CDC events.
  • Publication scope — create publications with explicit table lists (FOR TABLE ...) rather than FOR ALL TABLES to reduce WAL decoding overhead on the server.

DeltaForge settings for PostgreSQL:

  • Batch writes in transactions — if your writer can group inserts into BEGIN; INSERT ...; INSERT ...; COMMIT, the server sends fewer BEGIN/COMMIT WAL messages, reducing per-event overhead.
  • tables — use specific patterns. Broad patterns force schema loading and filtering for unneeded tables.

The throughput gap between Postgres and MySQL is primarily due to protocol-level differences (one WAL message per row vs. batched rows), not code inefficiency.

Exactly-Once Delivery Overhead

Enabling exactly_once: true on sinks adds per-batch transaction overhead:

Kafka Transactional Producer

Each batch is wrapped in begin_transaction() / commit_transaction(). The transaction commit adds a constant ~1-3ms per batch (broker-side two-phase commit), so larger batches amortize the cost better.

Measured overhead (Docker containers, single developer machine, 1-10M row drain):

Sourceat-least-onceexactly-onceOverhead
MySQL151K events/s134K events/s~11%
Postgres57K events/s53K events/s~7%

These numbers use tuned batch settings (max_events=16000, max_bytes=16MB). With a small max_bytes (e.g. 3MB), batches are capped at ~6K events regardless of max_events, making the transaction commit disproportionately expensive. The default max_bytes (16MB) is sufficient for most workloads.

Key considerations:

  • transaction.timeout.ms (set to 60s by DeltaForge): if a batch takes longer than this to deliver, the broker aborts the transaction. Increase for very large batches or high-latency networks.
  • transactional.id must be unique per pipeline-sink pair. DeltaForge sets this automatically to deltaforge-{pipeline_id}-{sink_id}.
  • Producer fencing: if two producers share the same transactional.id, the broker fences (kills) the older one. This is detected as a Fatal error and stops the pipeline. Ensure only one instance of each pipeline runs at a time.

NATS JetStream

Deduplication uses the Nats-Msg-Id header (server-side). No client-side transaction overhead, but the server maintains a dedup window. Configure duplicate_window on the stream to match your replay window (default 2 minutes).

Redis Streams

Idempotency keys are embedded in the XADD payload. No transaction overhead on the Redis side — deduplication is the consumer’s responsibility. This is a Tier 2 guarantee (at-least-once with idempotency keys for consumer-side dedup).

Per-Sink Checkpoints

Each sink maintains its own checkpoint, committed independently after successful delivery. The source replays from the minimum checkpoint across all sinks. This means:

  • Faster sinks are not held back by slower ones — they advance their own checkpoints independently.
  • Adding a new sink to an existing pipeline triggers a replay from the source’s earliest available position for that sink only.
  • Checkpoint storage overhead scales linearly with the number of sinks (one key per sink per source).

Profiling

Use the chaos UI’s built-in CPU profiler to capture flamegraphs during drain runs:

  1. Start a drain scenario from the chaos UI
  2. Once the drain phase starts (step 5/6), click Record on the target container
  3. The generated flamegraph SVG includes pipeline config, batch settings, and connection mode in the subtitle automatically

Or from the command line:

# Start drain in terminal 1
cargo run -p chaos --release -- --scenario backlog-drain --source mysql --no-proxy

# Capture flamegraph in terminal 2 (after drain phase starts)
docker exec <container-name> perf record -F 99 -p 1 -g --call-graph dwarf -o /tmp/perf.data -- sleep 30

Requires the profiling image (deltaforge:dev-profile) which includes perf and debug symbols.

Key areas to watch in flamegraphs:

AreaWhat it means
serialize_event / format_escaped_strJSON serialization — consider smaller batches if dominant
recv / [unknown] kernel stacksI/O wait for source data — protocol-bound
_rjem_je_*jemalloc allocation pressure — large batches increase this
rd_kafka_* / LZ4_compressKafka produce and compression overhead
check_and_splitCoordinator batch accumulation
epoll_wait / park_timeoutIdle time — pipeline is I/O bound, not CPU bound

Running the Drain Benchmark

The backlog drain benchmark measures catch-up throughput: how fast DeltaForge replays a pre-built backlog of 1M rows.

# MySQL — requires the soak compose profile
docker compose -f docker-compose.chaos.yml --profile soak up -d
cargo run -p chaos --release -- --scenario backlog-drain --source mysql --no-proxy \
  --drain-max-events 4000 --drain-max-ms 100 --drain-kafka-conf linger.ms=0

# Postgres — requires the pg-soak compose profile
docker compose -f docker-compose.chaos.yml --profile pg-soak up -d
cargo run -p chaos --release -- --scenario backlog-drain --source postgres --no-proxy \
  --drain-max-events 4000 --drain-max-ms 100 --drain-kafka-conf linger.ms=0

The benchmark:

  1. Stops the pipeline and saves its checkpoint
  2. Writes 1M rows to the source database using 32 concurrent writers
  3. Resumes the pipeline and measures how fast events appear in Kafka
  4. Reports avg/p50/peak events/s with full configuration in the output

Tune --drain-max-events, --drain-max-ms, and --drain-kafka-conf to experiment with different settings. The chaos UI also exposes these as form fields for interactive tuning.

Avro Encoding Performance

Avro encoding trades slightly higher producer CPU for significantly smaller payloads.

Expected impact

MetricJSON (baseline)AvroNotes
Payload sizeBaseline~40-60% smallerNo field names, binary encoding
Producer CPUBaseline~10-20% higherSchema lookup + Avro binary encoding
Kafka broker disk/networkBaseline~40-60% lessSignificant at scale
First-event latencyInstant+5-50msOne-time Schema Registry call per table
Steady-state latencyBaselineComparableSchema cached after first event

The system-level throughput (events/sec end-to-end) usually stays the same or improves because the bottleneck is typically network/disk I/O, not producer CPU. Smaller payloads reduce broker-side pressure.

Comparing JSON vs Avro

Run both soak tests side by side (different containers, same source):

docker compose -f docker-compose.chaos.yml \
  --profile base --profile mysql-infra --profile kafka-infra \
  --profile soak --profile avro-soak up -d

# JSON baseline
cargo run -p chaos -- --scenario soak-stable --duration-mins 30

# Avro comparison
cargo run -p chaos -- --scenario soak-stable-avro --duration-mins 30

Compare in Grafana: rate(deltaforge_sink_events_total[1m]) filtered by instance port 9001 (JSON) vs 9006 (Avro).

Avro-specific flamegraph areas

AreaWhat it means
encode_event / encode_with_schemaAvro encoding path — schema lookup + binary encoding
register_schemaSchema Registry HTTP call (only on first event per table or DDL change)
json_to_avro / to_avro_datumJSON → Avro value conversion + binary serialization
resolve (in avro module)Schema cache lookup — should be near-instant

Examples

Complete pipeline configurations demonstrating common DeltaForge use cases. Each example is ready to run with minimal modifications.

Available Examples

ExampleSourceSink(s)Key Features
MySQL to RedisMySQLRedisJavaScript processor, PII redaction
Turso to KafkaTurso/libSQLKafkaNative CDC, CloudEvents envelope
PostgreSQL to NATSPostgreSQLNATSLogical replication, CloudEvents
Multi-Sink Fan-OutMySQLKafka + Redis + NATSMultiple envelopes, selective checkpointing
Event FilteringMySQLKafkaJavaScript filtering, PII redaction
Schema SensingPostgreSQLKafkaJSON schema inference, drift detection
Production KafkaPostgreSQLKafkaSASL/SSL auth, exactly-once, tuning
Cache InvalidationMySQLRedisCDC stream for cache invalidation workers
Audit TrailPostgreSQLKafkaCompliance logging, PII redaction
Analytics PreprocessingMySQLKafka + RedisMetrics enrichment, analytics stream
Outbox PatternMySQLKafkaTransactional outbox, raw payload, per-aggregate routing

Quick Start

  1. Set environment variables for your database and sink connections
  2. Copy the example to a .yaml file
  3. Run DeltaForge:
    cargo run -p runner -- --config your-pipeline.yaml
    

Examples by Category

Getting Started

ExampleDescription
MySQL to RedisSimple pipeline with JavaScript transformation
Turso to KafkaEdge database to Kafka with CloudEvents
PostgreSQL to NATSPostgreSQL logical replication to NATS

Production Patterns

ExampleDescription
Production KafkaAuthentication, exactly-once, performance tuning
Multi-Sink Fan-OutMultiple sinks with different formats
Cache InvalidationCDC stream for cache invalidation
Outbox PatternTransactional outbox with raw payload delivery

Data Processing

ExampleDescription
Event FilteringFilter, drop, and redact events
Schema SensingAutomatic JSON schema discovery
Analytics PreprocessingPrepare events for analytics platforms

Compliance & Auditing

ExampleDescription
Audit TrailSOC2/HIPAA/GDPR-compliant change tracking

Examples by Feature

Envelope Formats

JavaScript Processors

Use CaseExample
PII RedactionMySQL to Redis, Audit Trail
Event FilteringEvent Filtering
EnrichmentTurso to Kafka, Analytics Preprocessing
Cache Key GenerationCache Invalidation
Audit MetadataAudit Trail

Multi-Sink Patterns

PatternExample
Fan-out with different formatsMulti-Sink Fan-Out
Primary + best-effort secondaryMulti-Sink Fan-Out, Analytics Preprocessing

Customizing Examples

Change Envelope Format

All sinks support configurable envelopes. Add to any sink config:

sinks:
  - type: kafka
    config:
      # ... other config
      envelope:
        type: cloudevents          # or: native, debezium
        type_prefix: "com.example" # required for cloudevents
      encoding: json

See Envelopes and Encodings for details.

Add Multiple Sinks

Fan out to multiple destinations with different formats:

sinks:
  - type: kafka
    config:
      id: primary-kafka
      envelope:
        type: debezium
      required: true    # Must succeed for checkpoint
  - type: redis
    config:
      id: cache-redis
      envelope:
        type: native
      required: false   # Best-effort, won't block checkpoint

See Sinks documentation for multi-sink patterns.

Enable Schema Sensing

Automatically discover JSON structure in your data:

schema_sensing:
  enabled: true
  deep_inspect:
    enabled: true
    max_depth: 3
  sampling:
    warmup_events: 100
    sample_rate: 10

See Schema Sensing for configuration options.

More Resources

MySQL to Redis

This example streams MySQL binlog events into a Redis stream with an inline JavaScript transformation for PII redaction.

Overview

ComponentConfiguration
SourceMySQL binlog CDC
ProcessorJavaScript email redaction
SinkRedis Streams
EnvelopeNative (configurable)

Pipeline Configuration

metadata:
  name: orders-mysql-to-redis
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders

  processors:
    - type: javascript
      id: redact-email
      inline: |
        function processBatch(events) {
          return events.map((event) => {
            if (event.after && event.after.email) {
              event.after.email = "[redacted]";
            }
            return event;
          });
        }
      limits:
        timeout_ms: 500

  sinks:
    - type: redis
      config:
        id: orders-redis
        uri: ${REDIS_URI}
        stream: orders
        envelope:
          type: native
        encoding: json
        required: true

  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 1000

  commit_policy:
    mode: required

Running the Example

1. Set Environment Variables

export MYSQL_DSN="mysql://user:password@localhost:3306/shop"
export REDIS_URI="redis://localhost:6379"

2. Start DeltaForge

# Save config as mysql-redis.yaml
cargo run -p runner -- --config mysql-redis.yaml

3. Insert Test Data

INSERT INTO shop.orders (email, total, status)
VALUES ('alice@example.com', 99.99, 'pending');

4. Verify in Redis

./dev.sh redis-read orders 10

You should see the event with the email redacted:

{
  "before": null,
  "after": {
    "id": 1,
    "email": "[redacted]",
    "total": 99.99,
    "status": "pending"
  },
  "op": "c",
  "ts_ms": 1700000000000
}

Variations

With Debezium Envelope

For Kafka Connect compatibility downstream:

sinks:
  - type: redis
    config:
      id: orders-redis
      uri: ${REDIS_URI}
      stream: orders
      envelope:
        type: debezium

Multi-Sink Fan-Out

Add Kafka alongside Redis for durability:

sinks:
  - type: kafka
    config:
      id: orders-kafka
      brokers: ${KAFKA_BROKERS}
      topic: orders
      envelope:
        type: debezium
      required: true    # Critical path

  - type: redis
    config:
      id: orders-redis
      uri: ${REDIS_URI}
      stream: orders
      envelope:
        type: native
      required: false   # Best-effort

With this configuration, checkpoints only wait for Kafka. Redis failures won’t block the pipeline.

With Schema Sensing

Automatically track schema changes:

spec:
  # ... source and sinks config ...

  schema_sensing:
    enabled: true
    deep_inspect:
      enabled: true
      max_depth: 3
    sampling:
      warmup_events: 100
      sample_rate: 10

Key Concepts Demonstrated

  • JavaScript Processors: Transform events in-flight with custom logic
  • PII Redaction: Mask sensitive data before it reaches downstream systems
  • Envelope Configuration: Choose output format based on consumer needs
  • Commit Policy: Control checkpoint behavior with required flag

Example: Turso to Kafka

This example demonstrates streaming changes from a Turso database to Kafka with schema sensing enabled.

Use Case

You have a Turso database (or local libSQL) and want to:

  • Stream table changes to a Kafka topic
  • Automatically detect schema structure from JSON payloads
  • Transform events with JavaScript before publishing

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: turso2kafka
  tenant: acme

spec:
  source:
    type: turso
    config:
      id: turso-main
      # Local libSQL for development
      url: "http://127.0.0.1:8080"
      # For Turso cloud:
      # url: "libsql://your-db.turso.io"
      # auth_token: "${TURSO_AUTH_TOKEN}"
      tables: ["users", "orders", "order_items"]
      poll_interval_ms: 1000
      cdc_mode: auto

  processors:
    - type: javascript
      id: enrich
      inline: |
        function processBatch(events) {
          return events.map(event => {
            // Add custom metadata to events
            event.source_type = "turso";
            event.processed_at = new Date().toISOString();
            return event;
          });
        }

  sinks:
    - type: kafka
      config:
        id: kafka-main
        brokers: "${KAFKA_BROKERS}"
        topic: turso.changes
        required: true
        exactly_once: false
        client_conf:
          message.timeout.ms: "5000"
          acks: "all"

  batch:
    max_events: 100
    max_bytes: 1048576
    max_ms: 500
    respect_source_tx: false
    max_inflight: 2

  commit_policy:
    mode: required

  schema_sensing:
    enabled: true
    deep_inspect:
      enabled: true
      max_depth: 3
    sampling:
      warmup_events: 50
      sample_rate: 5
      structure_cache: true

Running the Example

1. Start Infrastructure

# Start Kafka and other services
./dev.sh up

# Create the target topic
./dev.sh k-create turso.changes 6

2. Start Local libSQL (Optional)

For local development without Turso cloud:

# Using sqld (libSQL server)
sqld --http-listen-addr 127.0.0.1:8080

# Or with Docker
docker run -p 8080:8080 ghcr.io/libsql/sqld:latest

3. Create Test Tables

CREATE TABLE users (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  name TEXT NOT NULL,
  email TEXT UNIQUE,
  metadata TEXT  -- JSON column
);

CREATE TABLE orders (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  user_id INTEGER NOT NULL,
  total REAL NOT NULL,
  status TEXT DEFAULT 'pending',
  created_at TEXT DEFAULT CURRENT_TIMESTAMP
);

4. Run DeltaForge

# Save config as turso-kafka.yaml
cargo run -p runner -- --config turso-kafka.yaml

5. Insert Test Data

INSERT INTO users (name, email, metadata) 
VALUES ('Alice', 'alice@example.com', '{"role": "admin", "tags": ["vip"]}');

INSERT INTO orders (user_id, total, status) 
VALUES (1, 99.99, 'completed');

6. Verify Events in Kafka

./dev.sh k-consume turso.changes --from-beginning

You should see events like:

{
  "event_id": "550e8400-e29b-41d4-a716-446655440000",
  "tenant_id": "acme",
  "table": "users",
  "op": "insert",
  "after": {
    "id": 1,
    "name": "Alice",
    "email": "alice@example.com",
    "metadata": "{\"role\": \"admin\", \"tags\": [\"vip\"]}"
  },
  "source_type": "turso",
  "processed_at": "2025-01-15T10:30:00.000Z",
  "timestamp": "2025-01-15T10:30:00.123Z"
}

Monitoring

Check Pipeline Status

curl http://localhost:8080/pipelines/turso2kafka

View Inferred Schemas

# List all inferred schemas
curl http://localhost:8080/pipelines/turso2kafka/sensing/schemas

# Get details for users table
curl http://localhost:8080/pipelines/turso2kafka/sensing/schemas/users

# Export as JSON Schema
curl http://localhost:8080/pipelines/turso2kafka/sensing/schemas/users/json-schema

Check Drift Detection

curl http://localhost:8080/pipelines/turso2kafka/drift

Turso Cloud Configuration

For production with Turso cloud:

source:
  type: turso
  config:
    id: turso-prod
    url: "libsql://mydb-myorg.turso.io"
    auth_token: "${TURSO_AUTH_TOKEN}"
    tables: ["*"]
    cdc_mode: native
    poll_interval_ms: 1000
    native_cdc:
      level: data

Set the auth token via environment variable:

export TURSO_AUTH_TOKEN="your-token-here"

Notes

  • CDC Mode: auto tries native CDC first, then falls back to triggers or polling
  • Poll Interval: Lower values reduce latency but increase database load
  • Schema Sensing: Automatically discovers JSON structure in text columns
  • Exactly Once: Set to false for higher throughput; use true if Kafka cluster supports EOS

PostgreSQL to NATS

This one streams PostgreSQL logical replication changes to NATS JetStream with CloudEvents envelope format, for serverless architectures for example.

Overview

ComponentConfiguration
SourcePostgreSQL logical replication
ProcessorNone (passthrough)
SinkNATS JetStream
EnvelopeCloudEvents 1.0

Use Case

You have a PostgreSQL database and want to:

  • Stream changes to NATS for event-driven microservices
  • Use CloudEvents format for AWS Lambda, Azure Functions, or Knative
  • Leverage JetStream for durable, replay-capable event streams

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: users-postgres-to-nats
  tenant: acme

spec:
  source:
    type: postgres
    config:
      id: users-postgres
      dsn: ${POSTGRES_DSN}
      slot: deltaforge_users
      publication: users_pub
      tables:
        - public.users
        - public.profiles
        - public.user_sessions
      start_position: earliest

  sinks:
    - type: nats
      config:
        id: users-nats
        url: ${NATS_URL}
        subject: users.events
        stream: USERS
        envelope:
          type: cloudevents
          type_prefix: "com.acme.users"
        encoding: json
        required: true
        send_timeout_secs: 5
        batch_timeout_secs: 30

  batch:
    max_events: 500
    max_ms: 500
    respect_source_tx: true

  commit_policy:
    mode: required

Prerequisites

PostgreSQL Setup

-- Enable logical replication (postgresql.conf)
-- wal_level = logical

-- Create publication for the tables
CREATE PUBLICATION users_pub FOR TABLE users, profiles, user_sessions;

-- Verify publication
SELECT * FROM pg_publication_tables WHERE pubname = 'users_pub';

NATS JetStream Setup

# Start NATS with JetStream enabled
./dev.sh up

# Create the stream
./dev.sh nats-stream-add USERS 'users.>'

Running the Example

1. Set Environment Variables

export POSTGRES_DSN="postgres://user:password@localhost:5432/mydb"
export NATS_URL="nats://localhost:4222"

2. Start DeltaForge

cargo run -p runner -- --config postgres-nats.yaml

3. Insert Test Data

INSERT INTO users (name, email, created_at)
VALUES ('Alice', 'alice@example.com', NOW());

UPDATE users SET email = 'alice.new@example.com' WHERE name = 'Alice';

4. Verify in NATS

./dev.sh nats-sub 'users.>'

You should see CloudEvents formatted messages:

{
  "specversion": "1.0",
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "source": "deltaforge/users-postgres/public.users",
  "type": "com.acme.users.created",
  "time": "2025-01-15T10:30:00.000Z",
  "datacontenttype": "application/json",
  "subject": "public.users",
  "data": {
    "before": null,
    "after": {
      "id": 1,
      "name": "Alice",
      "email": "alice@example.com",
      "created_at": "2025-01-15T10:30:00.000Z"
    },
    "op": "c"
  }
}

Variations

With Debezium Envelope

For compatibility with existing Debezium consumers:

sinks:
  - type: nats
    config:
      id: users-nats
      url: ${NATS_URL}
      subject: users.events
      stream: USERS
      envelope:
        type: debezium

With Authentication

sinks:
  - type: nats
    config:
      id: users-nats
      url: ${NATS_URL}
      subject: users.events
      stream: USERS
      envelope:
        type: cloudevents
        type_prefix: "com.acme.users"
      credentials_file: /path/to/nats.creds
      # Or use username/password:
      # username: ${NATS_USER}
      # password: ${NATS_PASS}

Starting from Latest

Skip existing data and only capture new changes:

source:
  type: postgres
  config:
    id: users-postgres
    dsn: ${POSTGRES_DSN}
    slot: deltaforge_users
    publication: users_pub
    tables:
      - public.users
    start_position: latest

Key Concepts Demonstrated

  • PostgreSQL Logical Replication: Production-ready CDC with slot management
  • CloudEvents Format: Standard envelope for cloud-native event routing
  • JetStream Durability: Replay-capable event streams with consumer acknowledgment
  • Transaction Preservation: respect_source_tx: true keeps related changes together

Multi-Sink Fan-Out

This example demonstrates streaming changes to multiple destinations simultaneously, each with a different envelope format tailored to its consumers.

Overview

ComponentConfiguration
SourceMySQL binlog CDC
ProcessorJavaScript enrichment
SinksKafka (Debezium) + Redis (Native) + NATS (CloudEvents)
PatternFan-out with format adaptation

Use Case

You have a MySQL database and need to:

  • Send to Kafka Connect (requires Debezium format)
  • Populate a Redis cache (native format for efficiency)
  • Trigger serverless functions via NATS (CloudEvents format)
  • Handle sink failures independently without blocking the pipeline

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-fan-out
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders
        - shop.order_items

  processors:
    - type: javascript
      id: enrich
      inline: |
        function processBatch(events) {
          return events.map(event => {
            // Add routing hints via tags (tags is a valid Event field)
            event.tags = event.tags || [];
            
            if (event.table.includes('orders')) {
              event.tags.push('entity:order');
              // High-value orders get priority tag
              if (event.after && event.after.total > 1000) {
                event.tags.push('priority:high');
              }
            }
            
            event.tags.push('enriched');
            return event;
          });
        }
      limits:
        timeout_ms: 500

  sinks:
    # Primary: Kafka for data warehouse / Kafka Connect
    # Uses Debezium format for ecosystem compatibility
    - type: kafka
      config:
        id: warehouse-kafka
        brokers: ${KAFKA_BROKERS}
        topic: orders.cdc
        envelope:
          type: debezium
        encoding: json
        required: true           # Must succeed for checkpoint
        exactly_once: false
        client_conf:
          acks: "all"

    # Secondary: Redis for real-time cache
    # Uses native format for minimal overhead
    - type: redis
      config:
        id: cache-redis
        uri: ${REDIS_URI}
        stream: orders:cache
        envelope:
          type: native
        encoding: json
        required: false          # Best-effort, won't block pipeline

    # Tertiary: NATS for serverless triggers
    # Uses CloudEvents for Lambda/Functions compatibility
    - type: nats
      config:
        id: serverless-nats
        url: ${NATS_URL}
        subject: orders.events
        stream: ORDERS
        envelope:
          type: cloudevents
          type_prefix: "com.acme.orders"
        encoding: json
        required: false          # Best-effort

  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 500
    respect_source_tx: true

  commit_policy:
    mode: required    # Only wait for required sinks (Kafka)

How It Works

Checkpoint Behavior

With commit_policy.mode: required:

Source Event
    │
    ▼
┌─────────────────────────────────────────────────────┐
│              Parallel Sink Delivery                 │
├─────────────────┬─────────────────┬─────────────────┤
│ Kafka           │ Redis           │ NATS            │
│ required: true  │ required: false │ required: false │
│                 │                 │                 │
│ ✓ Must succeed  │ ✓ Best-effort   │ ✓ Best-effort   │
└────────┬────────┴────────┬────────┴──────┬──────────┘
         │                 │               │
         ▼                 │               │
    Checkpoint <───────────┘               │
    advances                               │
    (even if Redis/NATS fail)              │

Output Formats

Kafka (Debezium):

{
  "payload": {
    "before": null,
    "after": {"id": 1, "total": 1500.00, "status": "pending"},
    "source": {"connector": "mysql", "db": "shop", "table": "orders"},
    "op": "c",
    "ts_ms": 1700000000000
  }
}

Redis (Native):

{
  "before": null,
  "after": {"id": 1, "total": 1500.00, "status": "pending"},
  "source": {"connector": "mysql", "db": "shop", "table": "orders"},
  "op": "c",
  "ts_ms": 1700000000000
}

NATS (CloudEvents):

{
  "specversion": "1.0",
  "id": "evt-123",
  "source": "deltaforge/orders-mysql/shop.orders",
  "type": "com.acme.orders.created",
  "time": "2025-01-15T10:30:00.000Z",
  "data": {
    "before": null,
    "after": {"id": 1, "total": 1500.00, "status": "pending"},
    "op": "c"
  }
}

Running the Example

1. Set Environment Variables

export MYSQL_DSN="mysql://user:password@localhost:3306/shop"
export KAFKA_BROKERS="localhost:9092"
export REDIS_URI="redis://localhost:6379"
export NATS_URL="nats://localhost:4222"

2. Start Infrastructure

./dev.sh up
./dev.sh k-create orders.cdc 6
./dev.sh nats-stream-add ORDERS 'orders.>'

3. Start DeltaForge

cargo run -p runner -- --config fan-out.yaml

4. Insert Test Data

INSERT INTO shop.orders (customer_id, total, status)
VALUES (1, 1500.00, 'pending');

5. Verify Each Sink

# Kafka
./dev.sh k-consume orders.cdc --from-beginning

# Redis
./dev.sh redis-read orders:cache 10

# NATS
./dev.sh nats-sub 'orders.>'

Variations

Quorum Mode

Require 2 of 3 sinks to succeed:

sinks:
  - type: kafka
    config:
      id: kafka-1
      required: true   # Counts toward quorum
  - type: redis
    config:
      id: redis-1
      required: true   # Counts toward quorum
  - type: nats
    config:
      id: nats-1
      required: true   # Counts toward quorum

commit_policy:
  mode: quorum
  quorum: 2            # Any 2 must succeed

All-or-Nothing

Require all sinks to succeed (strongest consistency):

commit_policy:
  mode: all

⚠️ Warning: mode: all means any sink failure blocks the entire pipeline. Use only when all destinations are equally critical.

Key Concepts Demonstrated

  • Multi-Sink Fan-Out: Single source to multiple destinations
  • Format Adaptation: Different envelope per consumer requirement
  • Selective Checkpointing: required flag controls which sinks gate progress
  • Failure Isolation: Non-critical sinks don’t block the pipeline
  • Tag-Based Enrichment: Use event.tags for routing metadata

Processor Constraints: JavaScript processors can only modify event.before, event.after, and event.tags. Arbitrary top-level fields would be lost during serialization.

Event Filtering with JavaScript

This example demonstrates using JavaScript processors to filter and selectively drop events before they reach sinks.

Overview

ComponentConfiguration
SourceMySQL binlog CDC
ProcessorJavaScript filter + redaction
SinkKafka
PatternEvent filtering and transformation

Use Case

You have a MySQL database and want to:

  • Filter out events from certain tables or with specific conditions
  • Drop low-value events to reduce downstream load
  • Redact sensitive fields conditionally

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: filtered-orders
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders
        - shop.order_items
        - shop.audit_log        # We'll filter most of these out

  processors:
    - type: javascript
      id: filter-and-redact
      inline: |
        function processBatch(events) {
          return events
            // 1. Filter out audit_log events except errors
            .filter(event => {
              if (event.table === 'shop.audit_log') {
                // Only keep error-level audit events
                return event.after && event.after.level === 'error';
              }
              return true;
            })
            
            // 2. Filter out soft-deleted records
            .filter(event => {
              if (event.after && event.after.deleted_at !== null) {
                return false;  // Drop soft-deleted records
              }
              return true;
            })
            
            // 3. Filter out test/staging data
            .filter(event => {
              if (event.after && event.after.email) {
                // Drop test accounts
                if (event.after.email.endsWith('@test.local')) {
                  return false;
                }
              }
              return true;
            })
            
            // 4. Transform remaining events
            .map(event => {
              // Redact PII for non-admin tables
              if (event.after) {
                if (event.after.email) {
                  event.after.email = maskEmail(event.after.email);
                }
                if (event.after.phone) {
                  event.after.phone = '[redacted]';
                }
              }
              
              // Add filter tag (tags is a valid Event field)
              event.tags = (event.tags || []).concat(['filtered']);
              
              return event;
            });
        }
        
        // helper: mask email keeping domain
        function maskEmail(email) {
          const [local, domain] = email.split('@');
          if (!domain) return '[invalid-email]';
          const masked = local.charAt(0) + '***' + local.charAt(local.length - 1);
          return masked + '@' + domain;
        }
      limits:
        timeout_ms: 1000
        mem_mb: 128

  sinks:
    - type: kafka
      config:
        id: filtered-kafka
        brokers: ${KAFKA_BROKERS}
        topic: orders.filtered
        envelope:
          type: native
        encoding: json
        required: true

  batch:
    max_events: 500
    max_ms: 500
    respect_source_tx: true

  commit_policy:
    mode: required

Filter Patterns

Drop Events (Return Empty Array Element)

.filter(event => {
  // return false to drop the event
  if (event.table === 'internal_logs') {
    return false;
  }
  return true;
})

Conditional Field-Based Filtering

.filter(event => {
  // drop events where status is 'draft'
  if (event.after && event.after.status === 'draft') {
    return false;
  }
  return true;
})

Drop by Operation Type

.filter(event => {
  // only keep inserts and updates, drop deletes
  return event.op === 'c' || event.op === 'u';
})

Sample Events (Rate Limiting)

// keep only 10% of events (for high-volume tables)
.filter(event => {
  if (event.table === 'high_volume_table') {
    return Math.random() < 0.1;
  }
  return true;
})

Running the Example

1. Set Environment Variables

export MYSQL_DSN="mysql://user:password@localhost:3306/shop"
export KAFKA_BROKERS="localhost:9092"

2. Start DeltaForge

cargo run -p runner -- --config filtered-orders.yaml

3. Insert Test Data

-- This will be captured and transformed
INSERT INTO shop.orders (customer_email, total, status)
VALUES ('alice@example.com', 99.99, 'pending');

-- This will be filtered out (test account)
INSERT INTO shop.orders (customer_email, total, status)
VALUES ('test@test.local', 50.00, 'pending');

-- This will be filtered out (soft-deleted)
INSERT INTO shop.orders (customer_email, total, status, deleted_at)
VALUES ('bob@example.com', 75.00, 'pending', NOW());

-- Audit log - will be filtered out (not error level)
INSERT INTO shop.audit_log (level, message)
VALUES ('info', 'User logged in');

-- Audit log - will be kept (error level)
INSERT INTO shop.audit_log (level, message)
VALUES ('error', 'Payment failed');

4. Verify Filtered Output

./dev.sh k-consume orders.filtered --from-beginning

You should only see:

  • Alice’s order (with masked email: a***e@example.com)
  • The error-level audit log entry

Performance Considerations

Tip: Filtering early reduces downstream load. If you’re filtering out 50% of events, your sinks process half the data.

processors:
  - type: javascript
    id: filter
    inline: |
      function processBatch(events) {
        // Filter FIRST, then transform
        return events
          .filter(e => shouldKeep(e))  // Reduces array size
          .map(e => transform(e));      // Processes fewer events
      }
    limits:
      timeout_ms: 500    # Increase if filtering logic is complex
      mem_mb: 128        # Increase for large batches

Key Concepts Demonstrated

  • Event Filtering: Drop events before they reach sinks
  • Conditional Logic: Filter based on table, operation, or field values
  • PII Redaction: Mask sensitive data in remaining events
  • Sampling: Rate-limit high-volume event streams

Processor Constraints: JavaScript processors can only modify event.before, event.after, and event.tags. Arbitrary top-level fields like event.filtered_at would be lost during serialization.

Schema Sensing and Drift Detection

This example demonstrates DeltaForge’s automatic schema inference for JSON columns and drift detection capabilities.

Overview

ComponentConfiguration
SourcePostgreSQL logical replication
ProcessorNone
SinkKafka
FeatureSchema sensing with deep JSON inspection

Use Case

You have a PostgreSQL database with JSON/JSONB columns and want to:

  • Automatically discover the structure of JSON payloads
  • Detect when JSON schemas change over time (drift)
  • Export inferred schemas as JSON Schema for downstream validation
  • Monitor schema evolution without manual tracking

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: products-with-sensing
  tenant: acme

spec:
  source:
    type: postgres
    config:
      id: products-postgres
      dsn: ${POSTGRES_DSN}
      slot: deltaforge_products
      publication: products_pub
      tables:
        - public.products
        - public.product_variants
      start_position: earliest

  sinks:
    - type: kafka
      config:
        id: products-kafka
        brokers: ${KAFKA_BROKERS}
        topic: products.changes
        envelope:
          type: debezium
        encoding: json
        required: true

  batch:
    max_events: 500
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: required

  # Schema sensing configuration
  schema_sensing:
    enabled: true
    
    deep_inspect:
      enabled: true
      max_depth: 5           # How deep to traverse nested JSON
      max_sample_size: 500   # Events to analyze for deep inspection
    
    sampling:
      warmup_events: 100     # Analyze every event during warmup
      sample_rate: 20        # After warmup, analyze 1 in 20 events
      structure_cache: true  # Cache seen structures to avoid re-analysis
      structure_cache_size: 100
    
    tracking:
      detect_drift: true     # Enable drift detection
      drift_threshold: 0.1   # Alert if >10% of events have new fields
    
    output:
      emit_schemas: true     # Include schema info in API responses
      json_schema_format: draft-07

Table Structure

CREATE TABLE products (
  id SERIAL PRIMARY KEY,
  name TEXT NOT NULL,
  sku TEXT UNIQUE,
  price DECIMAL(10,2),
  
  -- JSON columns that schema sensing will analyze
  metadata JSONB,          -- Product attributes, tags, etc.
  specifications JSONB,    -- Technical specs
  
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE product_variants (
  id SERIAL PRIMARY KEY,
  product_id INTEGER REFERENCES products(id),
  variant_name TEXT,
  
  -- Nested JSON with variable structure
  attributes JSONB,        -- Color, size, material, etc.
  pricing JSONB,           -- Regional pricing, discounts
  
  created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Create publication
CREATE PUBLICATION products_pub FOR TABLE products, product_variants;

Sample Data

-- Insert products with JSON metadata
INSERT INTO products (name, sku, price, metadata, specifications) VALUES
(
  'Wireless Headphones',
  'WH-1000',
  299.99,
  '{
    "brand": "AudioTech",
    "category": "Electronics",
    "tags": ["wireless", "bluetooth", "noise-canceling"],
    "ratings": {"average": 4.5, "count": 1250}
  }',
  '{
    "battery_life_hours": 30,
    "driver_size_mm": 40,
    "frequency_response": {"min_hz": 20, "max_hz": 20000},
    "connectivity": ["bluetooth", "aux"]
  }'
);

-- Insert variant with nested attributes
INSERT INTO product_variants (product_id, variant_name, attributes, pricing) VALUES
(
  1,
  'Midnight Black',
  '{
    "color": {"name": "Midnight Black", "hex": "#1a1a2e"},
    "material": "Premium Plastic",
    "weight_grams": 250
  }',
  '{
    "base_price": 299.99,
    "regional": {
      "US": {"price": 299.99, "currency": "USD"},
      "EU": {"price": 279.99, "currency": "EUR"}
    },
    "discounts": [
      {"code": "SAVE20", "percent": 20, "expires": "2025-12-31"}
    ]
  }'
);

Running the Example

1. Set Environment Variables

export POSTGRES_DSN="postgres://user:password@localhost:5432/shop"
export KAFKA_BROKERS="localhost:9092"

2. Start DeltaForge

cargo run -p runner -- --config products-sensing.yaml

3. Insert Data and Let Sensing Analyze

-- Insert several products to build schema profile
INSERT INTO products (name, sku, price, metadata, specifications) VALUES
('Smart Watch', 'SW-200', 399.99, 
 '{"brand": "TechWear", "tags": ["fitness", "smart"]}',
 '{"battery_days": 7, "water_resistant": true}');

Using the Schema Sensing API

List Inferred Schemas

curl http://localhost:8080/pipelines/products-with-sensing/sensing/schemas

Response:

{
  "schemas": [
    {
      "table": "public.products",
      "columns": {
        "metadata": {
          "type": "object",
          "inferred_at": "2025-01-15T10:30:00Z",
          "sample_count": 150
        },
        "specifications": {
          "type": "object",
          "inferred_at": "2025-01-15T10:30:00Z",
          "sample_count": 150
        }
      }
    }
  ]
}

Get Detailed Schema for a Table

curl http://localhost:8080/pipelines/products-with-sensing/sensing/schemas/public.products

Response:

{
  "table": "public.products",
  "json_columns": {
    "metadata": {
      "inferred_schema": {
        "type": "object",
        "properties": {
          "brand": {"type": "string"},
          "category": {"type": "string"},
          "tags": {"type": "array", "items": {"type": "string"}},
          "ratings": {
            "type": "object",
            "properties": {
              "average": {"type": "number"},
              "count": {"type": "integer"}
            }
          }
        }
      },
      "sample_count": 150,
      "last_updated": "2025-01-15T10:35:00Z"
    }
  }
}

Export as JSON Schema

curl http://localhost:8080/pipelines/products-with-sensing/sensing/schemas/public.products/json-schema

Response:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "public.products.metadata",
  "type": "object",
  "properties": {
    "brand": {"type": "string"},
    "category": {"type": "string"},
    "tags": {
      "type": "array",
      "items": {"type": "string"}
    },
    "ratings": {
      "type": "object",
      "properties": {
        "average": {"type": "number"},
        "count": {"type": "integer"}
      }
    }
  }
}

Check Drift Detection

curl http://localhost:8080/pipelines/products-with-sensing/drift

Response:

{
  "drift_detected": true,
  "tables": {
    "public.products": {
      "metadata": {
        "new_fields": ["promotion", "seasonal"],
        "removed_fields": [],
        "type_changes": [],
        "drift_percentage": 0.15,
        "first_seen": "2025-01-15T11:00:00Z"
      }
    }
  }
}

Get Sensing Statistics

curl http://localhost:8080/pipelines/products-with-sensing/sensing/stats

Response:

{
  "total_events_analyzed": 1500,
  "total_events_sampled": 250,
  "cache_hits": 1250,
  "cache_misses": 250,
  "tables_tracked": 2,
  "json_columns_tracked": 4
}

Performance Tuning

Performance tip: Schema sensing can be CPU-intensive. Tune based on your throughput needs.

High-Throughput Configuration

schema_sensing:
  enabled: true
  deep_inspect:
    enabled: true
    max_depth: 3           # Limit depth for faster processing
    max_sample_size: 200   # Fewer samples
  sampling:
    warmup_events: 50      # Shorter warmup
    sample_rate: 100       # Analyze 1 in 100 events
    structure_cache: true
    structure_cache_size: 200  # Larger cache

Development/Debugging Configuration

schema_sensing:
  enabled: true
  deep_inspect:
    enabled: true
    max_depth: 10          # Full depth
    max_sample_size: 1000  # More samples
  sampling:
    warmup_events: 500     # Longer warmup
    sample_rate: 1         # Analyze every event

Key Concepts Demonstrated

  • Automatic Schema Inference: Discover JSON structure without manual definition
  • Deep JSON Inspection: Traverse nested objects and arrays
  • Drift Detection: Alert when schemas change unexpectedly
  • JSON Schema Export: Generate standard schemas for validation
  • Sampling Strategy: Balance accuracy vs. performance

Production Kafka Configuration

This example demonstrates a production-ready Kafka sink configuration with authentication, high availability settings, and some performance tuning.

Overview

ComponentConfiguration
SourcePostgreSQL logical replication
ProcessorNone
SinkKafka with SASL/SSL authentication
PatternProduction-grade reliability

Use Case

You’re deploying DeltaForge to production and need:

  • Secure authentication (SASL/SCRAM or mTLS)
  • High availability with proper acknowledgment settings
  • Optimal batching and compression for throughput
  • Exactly-once semantics for critical data

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-to-kafka-prod
  tenant: acme

spec:
  source:
    type: postgres
    config:
      id: orders-postgres
      dsn: ${POSTGRES_DSN}
      slot: deltaforge_orders
      publication: orders_pub
      tables:
        - public.orders
        - public.order_items
        - public.payments
      start_position: earliest

  sinks:
    - type: kafka
      config:
        id: orders-kafka
        brokers: ${KAFKA_BROKERS}
        topic: orders.cdc.events
        envelope:
          type: debezium
        encoding: json
        required: true
        
        # enable exactly-once semantics
        exactly_once: true
        
        # timeout for individual sends
        send_timeout_secs: 60
        
        # librdkafka client configuration
        client_conf:
          # security - SASL/SCRAM authentication
          security.protocol: "SASL_SSL"
          sasl.mechanism: "SCRAM-SHA-512"
          sasl.username: "${KAFKA_USERNAME}"
          sasl.password: "${KAFKA_PASSWORD}"
          
          # SSL/TLS configuration
          ssl.ca.location: "/etc/ssl/certs/kafka-ca.pem"
          ssl.endpoint.identification.algorithm: "https"
          
          # reliability - wait for all replicas
          acks: "all"
          
          # idempotence (required for exactly-once)
          enable.idempotence: "true"
          
          # retries and timeouts
          retries: "2147483647"
          retry.backoff.ms: "100"
          delivery.timeout.ms: "300000"
          request.timeout.ms: "30000"
          
          # kafka batching for throughput
          batch.size: "65536"
          linger.ms: "10"
          
          # compression
          compression.type: "lz4"
          
          # buffer management
          queue.buffering.max.messages: "100000"
          queue.buffering.max.kbytes: "1048576"

  batch:
    max_events: 1000
    max_bytes: 1048576
    max_ms: 100
    respect_source_tx: true
    max_inflight: 4

  commit_policy:
    mode: required

Security Configurations

SASL/SCRAM (Username/Password)

client_conf:
  security.protocol: "SASL_SSL"
  sasl.mechanism: "SCRAM-SHA-512"
  sasl.username: "${KAFKA_USERNAME}"
  sasl.password: "${KAFKA_PASSWORD}"
  ssl.ca.location: "/etc/ssl/certs/ca.pem"

mTLS (Mutual TLS)

client_conf:
  security.protocol: "SSL"
  ssl.ca.location: "/etc/ssl/certs/kafka-ca.pem"
  ssl.certificate.location: "/etc/ssl/certs/client.pem"
  ssl.key.location: "/etc/ssl/private/client.key"
  ssl.key.password: "${SSL_KEY_PASSWORD}"

AWS MSK with IAM

client_conf:
  security.protocol: "SASL_SSL"
  sasl.mechanism: "AWS_MSK_IAM"
  sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required;"
  sasl.client.callback.handler.class: "software.amazon.msk.auth.iam.IAMClientCallbackHandler"

Confluent Cloud

client_conf:
  security.protocol: "SASL_SSL"
  sasl.mechanism: "PLAIN"
  sasl.username: "${CONFLUENT_API_KEY}"
  sasl.password: "${CONFLUENT_API_SECRET}"

Performance Tuning

High Throughput

Optimize for maximum events per second:

client_conf:
  acks: "1"                    # Leader-only ack (faster, less safe)
  batch.size: "131072"         # 128KB batches
  linger.ms: "50"              # Wait longer to fill batches
  compression.type: "lz4"      # Fast compression
  
batch:
  max_events: 5000             # Larger batches
  max_ms: 200                  # More time to accumulate
  max_inflight: 8              # More concurrent requests

Low Latency

Optimize for minimal delay:

client_conf:
  acks: "1"                    # Don't wait for replicas
  batch.size: "16384"          # Smaller batches
  linger.ms: "0"               # Send immediately
  compression.type: "none"     # Skip compression
  
batch:
  max_events: 100              # Smaller batches
  max_ms: 10                   # Flush quickly
  max_inflight: 2              # Limit in-flight

Maximum Durability

Optimize for zero data loss:

client_conf:
  acks: "all"                  # All replicas must ack
  enable.idempotence: "true"   # Prevent duplicates
  max.in.flight.requests.per.connection: "5"  # Required for idempotence
  retries: "2147483647"        # Infinite retries
  
exactly_once: true             # Transactional producer

batch:
  respect_source_tx: true      # Preserve source transactions
  max_inflight: 1              # Strict ordering

Running the Example

1. Set Environment Variables

export POSTGRES_DSN="postgres://user:password@localhost:5432/orders"
export KAFKA_BROKERS="kafka1:9093,kafka2:9093,kafka3:9093"
export KAFKA_USERNAME="deltaforge"
export KAFKA_PASSWORD="secret"

2. Create Kafka Topic

kafka-topics.sh --create \
  --topic orders.cdc.events \
  --partitions 12 \
  --replication-factor 3 \
  --config min.insync.replicas=2 \
  --bootstrap-server ${KAFKA_BROKERS}

3. Start DeltaForge

cargo run -p runner --release -- --config kafka-prod.yaml

Monitoring

Key Metrics to Watch

  • deltaforge_sink_events_sent_total — Events delivered
  • deltaforge_sink_send_latency_seconds — Delivery latency
  • deltaforge_sink_errors_total — Delivery failures
  • deltaforge_checkpoint_lag_events — Events pending checkpoint

Health Check

curl http://localhost:8080/health

Pipeline Status

curl http://localhost:8080/pipelines/orders-to-kafka-prod

Key Concepts Demonstrated

  • SASL/SSL Authentication: Secure broker connections
  • Exactly-Once Semantics: Transactional producer for no duplicates
  • Acknowledgment Modes: Trade-off between durability and latency
  • Batching & Compression: Optimize throughput
  • Production Tuning: Real-world configuration patterns

Redis Cache Invalidation

This example demonstrates streaming database changes to a Redis stream where a worker can consume them to invalidate cache entries, ensuring cache consistency without polling.

Overview

ComponentConfiguration
SourceMySQL binlog CDC
ProcessorJavaScript cache key generator
SinkRedis Streams
PatternCDC-driven cache invalidation

Use Case

You have a MySQL database with Redis caching and want to:

  • Stream change events that trigger cache invalidation
  • Avoid stale cache issues without TTL-based expiration
  • Generate cache keys matching your application’s key format

Architecture

┌─────────┐     ┌─────────────┐     ┌────────────────┐     ┌─────────┐
│  MySQL  │────>│ DeltaForge  │────>│ Redis Stream   │────>│ Worker  │
│         │     │             │     │ (invalidations)│     │         │
└─────────┘     └─────────────┘     └────────────────┘     └────┬────┘
                                                                │
                                    ┌───────────────┐           │
                                    │  Redis Cache  │<──────────┘
                                    │  (DEL keys)   │
                                    └───────────────┘

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: cache-invalidation
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: app-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - app.users
        - app.products
        - app.orders
        - app.inventory

  processors:
    - type: javascript
      id: generate-cache-keys
      inline: |
        function processBatch(events) {
          return events.map(event => {
            const keys = generateCacheKeys(event);
            const strategy = getStrategy(event);
            
            // Store cache keys in event.after metadata
            // (we can modify event.after since it's a JSON Value)
            if (event.after) {
              event.after._cache_keys = keys;
              event.after._invalidation_strategy = strategy;
            } else if (event.before) {
              // For deletes, add to before
              event.before._cache_keys = keys;
              event.before._invalidation_strategy = strategy;
            }
            
            // Add tags for routing/filtering
            event.tags = (event.tags || []).concat([
              'cache:invalidate',
              `strategy:${strategy}`,
              `keys:${keys.length}`
            ]);
            
            return event;
          });
        }
        
        function generateCacheKeys(event) {
          const table = event.table.split('.')[1];
          const keys = [];
          const record = event.after || event.before;
          
          if (!record || !record.id) return keys;
          const id = record.id;
          
          switch (table) {
            case 'users':
              keys.push(`user:${id}`);
              if (record.email) {
                keys.push(`user:email:${record.email}`);
              }
              // If email changed, invalidate old email key
              if (event.before && event.before.email && 
                  event.before.email !== record.email) {
                keys.push(`user:email:${event.before.email}`);
              }
              keys.push(`user:${id}:orders`);
              keys.push(`user:${id}:profile`);
              break;
              
            case 'products':
              keys.push(`product:${id}`);
              keys.push(`product:${id}:details`);
              if (record.category_id) {
                keys.push(`category:${record.category_id}:products`);
              }
              break;
              
            case 'orders':
              keys.push(`order:${id}`);
              if (record.user_id) {
                keys.push(`user:${record.user_id}:orders`);
              }
              break;
              
            case 'inventory':
              keys.push(`inventory:${record.product_id}`);
              keys.push(`product:${record.product_id}:stock`);
              break;
          }
          
          return keys;
        }
        
        function getStrategy(event) {
          const table = event.table.split('.')[1];
          if (table === 'inventory') return 'immediate';
          if (event.op === 'd') return 'immediate';
          return 'batched';
        }
      limits:
        timeout_ms: 500
        mem_mb: 128

  sinks:
    - type: redis
      config:
        id: invalidation-stream
        uri: ${REDIS_URI}
        stream: cache:invalidations
        envelope:
          type: native
        encoding: json
        required: true

  batch:
    max_events: 100
    max_ms: 100
    respect_source_tx: true

  commit_policy:
    mode: required

JavaScript Processor Constraints

Important: The processor stores cache keys in event.after._cache_keys (or event.before for deletes) because we can only modify existing Event fields. Arbitrary top-level fields like event.cache_keys would be lost.

Cache Worker (Consumer)

// cache-invalidation-worker.js
const Redis = require('ioredis');

const redis = new Redis(process.env.REDIS_URI);
const streamKey = 'cache:invalidations';
const consumerGroup = 'cache-workers';
const consumerId = `worker-${process.pid}`;

async function processInvalidations() {
  try {
    await redis.xgroup('CREATE', streamKey, consumerGroup, '0', 'MKSTREAM');
  } catch (e) {
    if (!e.message.includes('BUSYGROUP')) throw e;
  }

  console.log(`Starting cache invalidation worker: ${consumerId}`);

  while (true) {
    try {
      const results = await redis.xreadgroup(
        'GROUP', consumerGroup, consumerId,
        'COUNT', 100, 'BLOCK', 5000,
        'STREAMS', streamKey, '>'
      );

      if (!results) continue;

      for (const [stream, messages] of results) {
        for (const [id, fields] of messages) {
          const event = JSON.parse(fields[1]);
          await invalidateKeys(event);
          await redis.xack(streamKey, consumerGroup, id);
        }
      }
    } catch (error) {
      console.error('Worker error:', error);
      await new Promise(r => setTimeout(r, 1000));
    }
  }
}

async function invalidateKeys(event) {
  // Get cache keys from the event payload
  const record = event.after || event.before;
  const cacheKeys = record?._cache_keys || [];
  
  if (cacheKeys.length === 0) return;

  for (const key of cacheKeys) {
    if (key.includes('*')) {
      await scanAndDelete(key);
    } else {
      await redis.del(key);
    }
  }

  console.log(`Invalidated ${cacheKeys.length} keys for ${event.table}`);
}

async function scanAndDelete(pattern) {
  let cursor = '0';
  do {
    const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100);
    cursor = nextCursor;
    if (keys.length > 0) {
      await redis.del(...keys);
    }
  } while (cursor !== '0');
}

processInvalidations().catch(console.error);

Running the Example

1. Set Environment Variables

export MYSQL_DSN="mysql://user:password@localhost:3306/app"
export REDIS_URI="redis://localhost:6379"

2. Start DeltaForge

cargo run -p runner -- --config cache-invalidation.yaml

3. Start Worker(s)

node cache-invalidation-worker.js

4. Test Invalidation

UPDATE app.users SET email = 'alice.new@example.com' WHERE id = 1;

Worker output:

Invalidated 5 keys for app.users

Key Concepts Demonstrated

  • CDC to Stream: DeltaForge captures changes and writes to Redis Streams
  • Custom Key Generation: Processor computes cache keys for downstream worker
  • Consumer Groups: Scalable worker processing with acknowledgments
  • Before/After Diffing: Compute invalidation keys for both old and new values

Note: DeltaForge streams events to Redis; a separate worker (shown above) consumes them and performs the actual cache invalidation.

Audit Trail and Compliance Logging

This example demonstrates building an immutable audit trail for compliance requirements (SOC2, HIPAA, GDPR) by capturing all database changes with full context.

Overview

ComponentConfiguration
SourcePostgreSQL logical replication
ProcessorJavaScript audit enrichment
SinkKafka (durable audit log)
PatternCompliance-grade change tracking

Use Case

You need to meet compliance requirements and want to:

  • Capture every change to sensitive tables with before/after values
  • Add audit metadata (classification, retention, regulations)
  • Redact sensitive fields while preserving change records
  • Store immutable audit logs for required retention periods

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: compliance-audit-trail
  tenant: acme

spec:
  source:
    type: postgres
    config:
      id: app-postgres
      dsn: ${POSTGRES_DSN}
      slot: deltaforge_audit
      publication: audit_pub
      tables:
        - public.users
        - public.user_profiles
        - public.payment_methods
        - public.transactions
        - public.roles
        - public.permissions
        - public.user_roles
      start_position: earliest

  processors:
    - type: javascript
      id: audit-enrichment
      inline: |
        function processBatch(events) {
          return events.map(event => {
            const table = event.table.split('.')[1];
            
            // Build audit tags (event.tags is a valid Event field)
            const auditTags = [
              'audited',
              `sensitivity:${classifySensitivity(table)}`,
              `classification:${getDataClassification(table)}`,
              `retention:${getRetentionPeriod(table)}d`
            ];
            
            // Add regulation tags
            for (const reg of getApplicableRegulations(table)) {
              auditTags.push(`regulation:${reg}`);
            }
            
            // Track changed fields for updates
            if (event.before && event.after) {
              const changed = detectChangedFields(event.before, event.after);
              for (const field of changed) {
                auditTags.push(`changed:${field}`);
              }
            }
            
            event.tags = (event.tags || []).concat(auditTags);
            
            // Redact sensitive fields in before/after
            if (event.before) {
              event.before = sanitizeRecord(event.before, table);
            }
            if (event.after) {
              event.after = sanitizeRecord(event.after, table);
            }
            
            return event;
          });
        }
        
        function classifySensitivity(table) {
          const highSensitivity = ['users', 'payment_methods', 'transactions'];
          const mediumSensitivity = ['user_profiles', 'user_roles'];
          
          if (highSensitivity.includes(table)) return 'HIGH';
          if (mediumSensitivity.includes(table)) return 'MEDIUM';
          return 'LOW';
        }
        
        function sanitizeRecord(record, table) {
          if (!record) return null;
          const sanitized = { ...record };
          
          const sensitiveFields = {
            'users': ['password_hash', 'ssn', 'tax_id'],
            'payment_methods': ['card_number', 'cvv', 'account_number'],
            'user_profiles': ['date_of_birth']
          };
          
          const fieldsToMask = sensitiveFields[table] || [];
          
          for (const field of fieldsToMask) {
            if (sanitized[field] !== undefined) {
              sanitized[`_${field}_redacted`] = true;
              sanitized[field] = '[REDACTED]';
            }
          }
          
          return sanitized;
        }
        
        function detectChangedFields(before, after) {
          const changed = [];
          const allKeys = new Set([...Object.keys(before), ...Object.keys(after)]);
          for (const key of allKeys) {
            if (JSON.stringify(before[key]) !== JSON.stringify(after[key])) {
              changed.push(key);
            }
          }
          return changed;
        }
        
        function getRetentionPeriod(table) {
          if (['transactions', 'payment_methods'].includes(table)) return 2555;
          if (['users', 'user_profiles'].includes(table)) return 2190;
          return 1095;
        }
        
        function getDataClassification(table) {
          if (['payment_methods', 'transactions'].includes(table)) return 'PCI';
          if (['users', 'user_profiles'].includes(table)) return 'PII';
          return 'INTERNAL';
        }
        
        function getApplicableRegulations(table) {
          const regs = [];
          if (['users', 'user_profiles'].includes(table)) regs.push('GDPR', 'CCPA');
          if (['payment_methods', 'transactions'].includes(table)) regs.push('PCI-DSS', 'SOX');
          return regs;
        }
      limits:
        timeout_ms: 1000
        mem_mb: 256

  sinks:
    - type: kafka
      config:
        id: audit-kafka
        brokers: ${KAFKA_BROKERS}
        topic: audit.trail.events
        envelope:
          type: debezium
        encoding: json
        required: true
        exactly_once: true
        client_conf:
          acks: "all"
          enable.idempotence: "true"
          compression.type: "gzip"

  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: required

JavaScript Processor Constraints

Important: The JavaScript processor can only modify fields that exist on DeltaForge’s Event struct. You can:

  • Modify event.before and event.after values (JSON objects)
  • Set event.tags (array of strings)
  • Filter out events (return empty array)

You cannot add arbitrary top-level fields like event.audit or event.metadata - they will be lost during serialization. This limitation will be addressed soon.

This example uses event.tags to store audit metadata as key:value strings that downstream systems can parse.

PostgreSQL Setup

-- Create publication for audited tables
CREATE PUBLICATION audit_pub FOR TABLE 
  public.users,
  public.user_profiles,
  public.payment_methods,
  public.transactions,
  public.roles,
  public.permissions,
  public.user_roles
WITH (publish = 'insert, update, delete');

-- Enable REPLICA IDENTITY FULL to capture before values on updates
ALTER TABLE public.users REPLICA IDENTITY FULL;
ALTER TABLE public.user_profiles REPLICA IDENTITY FULL;
ALTER TABLE public.payment_methods REPLICA IDENTITY FULL;
ALTER TABLE public.transactions REPLICA IDENTITY FULL;

Sample Audit Event Output

With Debezium envelope:

{
  "payload": {
    "before": {
      "id": 42,
      "email": "alice@old-domain.com",
      "name": "Alice Smith",
      "password_hash": "[REDACTED]",
      "_password_hash_redacted": true
    },
    "after": {
      "id": 42,
      "email": "alice@new-domain.com",
      "name": "Alice Smith",
      "password_hash": "[REDACTED]",
      "_password_hash_redacted": true
    },
    "source": {
      "version": "0.1.0",
      "connector": "postgres",
      "name": "app-postgres",
      "ts_ms": 1705312199123,
      "db": "app",
      "schema": "public",
      "table": "users"
    },
    "op": "u",
    "ts_ms": 1705312200000,
    "tags": [
      "audited",
      "sensitivity:HIGH",
      "classification:PII",
      "retention:2190d",
      "regulation:GDPR",
      "regulation:CCPA",
      "changed:email"
    ]
  }
}

Parsing Audit Tags

Downstream consumers can parse the structured tags:

// Parse audit tags into an object
function parseAuditTags(tags) {
  const audit = {};
  for (const tag of tags || []) {
    if (tag.includes(':')) {
      const [key, value] = tag.split(':', 2);
      if (audit[key]) {
        if (!Array.isArray(audit[key])) {
          audit[key] = [audit[key]];
        }
        audit[key].push(value);
      } else {
        audit[key] = value;
      }
    } else {
      audit[tag] = true;
    }
  }
  return audit;
}

// Example: parseAuditTags(event.tags)
// Returns: {
//   audited: true,
//   sensitivity: "HIGH",
//   classification: "PII",
//   retention: "2190d",
//   regulation: ["GDPR", "CCPA"],
//   changed: "email"
// }

Kafka Topic Configuration

kafka-topics.sh --create \
  --topic audit.trail.events \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=189216000000 \
  --config cleanup.policy=delete \
  --config min.insync.replicas=2 \
  --config compression.type=gzip \
  --bootstrap-server ${KAFKA_BROKERS}

Running the Example

1. Set Environment Variables

export POSTGRES_DSN="postgres://user:password@localhost:5432/app"
export KAFKA_BROKERS="kafka1:9092,kafka2:9092,kafka3:9092"

2. Start DeltaForge

cargo run -p runner -- --config audit-trail.yaml

3. Verify Audit Events

./dev.sh k-consume audit.trail.events --from-beginning

Querying Audit Data

-- Find high-sensitivity changes
SELECT * FROM audit_events 
WHERE ARRAY_CONTAINS(payload.tags, 'sensitivity:HIGH');

-- Find all email changes
SELECT * FROM audit_events
WHERE ARRAY_CONTAINS(payload.tags, 'changed:email');

-- Find PCI-regulated changes
SELECT * FROM audit_events
WHERE ARRAY_CONTAINS(payload.tags, 'regulation:PCI-DSS');

Key Concepts Demonstrated

  • Full Change Capture: Before and after values with REPLICA IDENTITY FULL
  • PII Redaction: Sensitive fields masked, presence tracked via _field_redacted
  • Tag-Based Metadata: Audit info stored in event.tags as parseable strings
  • Immutable Storage: Exactly-once delivery to append-only Kafka log
  • Compliance Tagging: Retention periods, classifications, regulations as tags

Real-Time Analytics Preprocessing Pipeline

This example demonstrates preparing CDC events for real-time analytics by enriching events with dimensions, metrics, and routing tags (pre-processing).

Overview

ComponentConfiguration
SourceMySQL binlog CDC
ProcessorJavaScript analytics enrichment
SinksKafka (stream processing) + Redis (real-time counters)
PatternAnalytics-ready event preparation

Use Case

You have an e-commerce MySQL database and want to:

  • Stream order events to real-time dashboards
  • Feed a worker that maintains live counters in Redis
  • Prepare events for stream processing (Flink, Spark Streaming)

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: ecommerce-analytics
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: ecommerce-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders
        - shop.order_items
        - shop.payments
        - shop.cart_events

  processors:
    - type: javascript
      id: analytics-enrichment
      inline: |
        function processBatch(events) {
          return events.map(event => {
            const table = event.table.split('.')[1];
            const record = event.after || event.before;
            
            // Add analytics metadata to event.after
            // (we can modify event.after since it's a JSON Value)
            if (event.after) {
              event.after._analytics = {
                event_type: `${table}.${mapOperation(event.op)}`,
                dimensions: extractDimensions(table, record, event),
                metrics: extractMetrics(table, record)
              };
            }
            
            // Add routing tags
            event.tags = generateTags(table, record, event);
            
            return event;
          });
        }
        
        function mapOperation(op) {
          return { 'c': 'created', 'u': 'updated', 'd': 'deleted', 'r': 'snapshot' }[op] || op;
        }
        
        function extractDimensions(table, record, event) {
          if (!record) return {};
          
          const dims = {
            hour_of_day: new Date(event.timestamp).getUTCHours(),
            day_of_week: new Date(event.timestamp).getUTCDay()
          };
          
          switch (table) {
            case 'orders':
              dims.customer_id = record.customer_id;
              dims.status = record.status;
              dims.channel = record.channel || 'web';
              break;
            case 'order_items':
              dims.order_id = record.order_id;
              dims.product_id = record.product_id;
              break;
            case 'payments':
              dims.order_id = record.order_id;
              dims.payment_method = record.method;
              break;
          }
          
          return dims;
        }
        
        function extractMetrics(table, record) {
          if (!record) return {};
          
          const metrics = { event_count: 1 };
          
          switch (table) {
            case 'orders':
              metrics.order_total = parseFloat(record.total) || 0;
              metrics.item_count = parseInt(record.item_count) || 0;
              break;
            case 'order_items':
              metrics.quantity = parseInt(record.quantity) || 1;
              metrics.line_total = parseFloat(record.line_total) || 0;
              break;
            case 'payments':
              metrics.payment_amount = parseFloat(record.amount) || 0;
              break;
          }
          
          return metrics;
        }
        
        function generateTags(table, record, event) {
          const tags = [table, event.op];
          
          if (!record) return tags;
          
          if (table === 'orders' && record.total > 500) {
            tags.push('high_value');
          }
          if (record.status) {
            tags.push(`status:${record.status}`);
          }
          
          return tags;
        }
      limits:
        timeout_ms: 500
        mem_mb: 256

  sinks:
    - type: kafka
      config:
        id: analytics-kafka
        brokers: ${KAFKA_BROKERS}
        topic: analytics.events
        envelope:
          type: native
        encoding: json
        required: true
        client_conf:
          compression.type: "lz4"

    - type: redis
      config:
        id: realtime-redis
        uri: ${REDIS_URI}
        stream: analytics:realtime
        envelope:
          type: native
        encoding: json
        required: false

  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 100
    respect_source_tx: false

  commit_policy:
    mode: required

JavaScript Processor Constraints

Important: Analytics metadata is stored in event.after._analytics because the processor can only modify existing Event fields (before, after, tags). Arbitrary top-level fields would be lost during serialization.

Sample Event Output

{
  "before": null,
  "after": {
    "id": 98765,
    "customer_id": 12345,
    "total": 299.99,
    "status": "pending",
    "channel": "mobile",
    "_analytics": {
      "event_type": "orders.created",
      "dimensions": {
        "hour_of_day": 10,
        "day_of_week": 3,
        "customer_id": 12345,
        "status": "pending",
        "channel": "mobile"
      },
      "metrics": {
        "event_count": 1,
        "order_total": 299.99,
        "item_count": 3
      }
    }
  },
  "source": {
    "connector": "mysql",
    "db": "shop",
    "table": "orders"
  },
  "op": "c",
  "ts_ms": 1705312200000,
  "tags": ["orders", "c", "status:pending"]
}

Redis Counter Worker

const Redis = require('ioredis');
const redis = new Redis(process.env.REDIS_URI);

async function processAnalyticsEvents() {
  let lastId = '0';
  
  while (true) {
    const results = await redis.xread(
      'COUNT', 100, 'BLOCK', 1000,
      'STREAMS', 'analytics:realtime', lastId
    );
    
    if (!results) continue;
    
    for (const [stream, messages] of results) {
      for (const [id, fields] of messages) {
        const event = JSON.parse(fields[1]);
        await updateCounters(event);
        lastId = id;
      }
    }
  }
}

async function updateCounters(event) {
  const pipe = redis.pipeline();
  const now = new Date();
  const hourKey = `${now.getUTCFullYear()}:${now.getUTCMonth()+1}:${now.getUTCDate()}:${now.getUTCHours()}`;
  const dayKey = `${now.getUTCFullYear()}:${now.getUTCMonth()+1}:${now.getUTCDate()}`;
  
  // Get analytics from event.after
  const analytics = event.after?._analytics || {};
  const eventType = analytics.event_type || `${event.table}.${event.op}`;
  const metrics = analytics.metrics || {};
  const dimensions = analytics.dimensions || {};
  
  // Event counts
  pipe.hincrby(`stats:events:${hourKey}`, eventType, 1);
  
  // Order-specific counters
  if (eventType === 'orders.created') {
    pipe.incr(`stats:orders:count:${hourKey}`);
    pipe.incrbyfloat(`stats:orders:revenue:${hourKey}`, metrics.order_total || 0);
    
    if (dimensions.channel) {
      pipe.hincrby(`stats:orders:channel:${dayKey}`, dimensions.channel, 1);
    }
  }
  
  // High-value alerts
  if (event.tags?.includes('high_value')) {
    pipe.lpush('alerts:high_value_orders', JSON.stringify({
      order_id: event.after?.id,
      total: metrics.order_total
    }));
    pipe.ltrim('alerts:high_value_orders', 0, 99);
  }
  
  pipe.expire(`stats:events:${hourKey}`, 90000);
  await pipe.exec();
}

processAnalyticsEvents().catch(console.error);

Running the Example

1. Set Environment Variables

export MYSQL_DSN="mysql://user:password@localhost:3306/shop"
export KAFKA_BROKERS="localhost:9092"
export REDIS_URI="redis://localhost:6379"

2. Create Kafka Topic

./dev.sh k-create analytics.events 12

3. Start DeltaForge

cargo run -p runner -- --config analytics-pipeline.yaml

4. Start Counter Worker

node realtime-counter-worker.js

Key Concepts Demonstrated

  • Event Enrichment: Add dimensions/metrics in event.after._analytics
  • Tag-Based Filtering: Use event.tags for high-value order detection
  • Multi-Sink Fan-Out: Kafka for stream processing + Redis for worker consumption
  • Worker Pattern: Separate worker consumes Redis stream to update counters

Outbox Pattern

This example demonstrates the transactional outbox pattern - writing business data and a domain event in the same database transaction, then streaming the event to Kafka via DeltaForge.

Overview

ComponentConfiguration
SourceMySQL binlog CDC
ProcessorOutbox (extract + route)
SinkKafka (per-aggregate topics)
PatternTransactional outbox with raw payload delivery

Use Case

Your application writes orders and needs to publish OrderCreated, OrderShipped, etc. events to Kafka. You want:

  • Atomicity: event published if and only if the transaction commits
  • Clean payloads: consumers receive the application’s JSON, not CDC envelopes
  • Per-aggregate routing: events land on Order.OrderCreated, Payment.PaymentReceived, etc.
  • Zero polling: DeltaForge tails the binlog, no application-side outbox relay

Database Setup

-- Business table
CREATE TABLE orders (
  id         INT AUTO_INCREMENT PRIMARY KEY,
  customer   VARCHAR(64),
  total      DECIMAL(10,2),
  status     VARCHAR(32),
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Outbox table (BLACKHOLE = binlog only, no disk storage)
CREATE TABLE outbox (
  id             INT AUTO_INCREMENT PRIMARY KEY,
  aggregate_type VARCHAR(64),
  aggregate_id   VARCHAR(64),
  event_type     VARCHAR(64),
  payload        JSON
) ENGINE=BLACKHOLE;

Application Code

Write both in the same transaction:

BEGIN;

INSERT INTO orders (customer, total, status)
VALUES ('alice', 149.99, 'pending');

INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES (
  'Order',
  LAST_INSERT_ID(),
  'OrderCreated',
  JSON_OBJECT('customer', 'alice', 'total', 149.99, 'status', 'pending')
);

COMMIT;

If the transaction rolls back, neither the order nor the event exist. If it commits, both do.

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-outbox
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders
        - shop.outbox
      outbox:
        tables: ["shop.outbox"]

  processors:
    - type: outbox
      topic: "${aggregate_type}.${event_type}"
      default_topic: events.unrouted
      raw_payload: true

  sinks:
    - type: kafka
      config:
        id: events-kafka
        brokers: ${KAFKA_BROKERS}
        topic: "cdc.${source.db}.${source.table}"
        envelope:
          type: debezium
        encoding: json
        required: true

  batch:
    max_events: 500
    max_ms: 500
    respect_source_tx: true

  commit_policy:
    mode: required

What lands on Kafka

Outbox event –> topic Order.OrderCreated:

{"customer": "alice", "total": 149.99, "status": "pending"}

Raw payload, no envelope. Kafka headers carry metadata: df-aggregate-type: Order, df-aggregate-id: 1, df-event-type: OrderCreated.

CDC event –> topic cdc.shop.orders (from sink’s topic template):

{
  "payload": {
    "before": null,
    "after": {"id": 1, "customer": "alice", "total": 149.99, "status": "pending"},
    "source": {"connector": "mysql", "db": "shop", "table": "orders"},
    "op": "c",
    "ts_ms": 1700000000000
  }
}

Full Debezium envelope. Both flow through the same pipeline - the outbox processor only touches events tagged as outbox.

Running the Example

1. Start Infrastructure

./dev.sh up
./dev.sh k-create Order.OrderCreated 3
./dev.sh k-create Order.OrderShipped 3
./dev.sh k-create cdc.shop.orders 3

2. Set Environment Variables

export MYSQL_DSN="mysql://deltaforge:dfpw@localhost:3306/shop"
export KAFKA_BROKERS="localhost:9092"

3. Start DeltaForge

cargo run -p runner -- --config outbox.yaml

4. Insert Test Data

BEGIN;
INSERT INTO shop.orders (customer, total, status) VALUES ('alice', 149.99, 'pending');
INSERT INTO shop.outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES ('Order', LAST_INSERT_ID(), 'OrderCreated',
        '{"customer":"alice","total":149.99,"status":"pending"}');
COMMIT;

5. Verify

# Outbox event (raw payload, per-aggregate topic)
./dev.sh k-inspect Order.OrderCreated

# CDC event (Debezium envelope, per-table topic)
./dev.sh k-inspect cdc.shop.orders

Variations

PostgreSQL with WAL Messages

PostgreSQL doesn’t need an outbox table - write directly to the WAL:

BEGIN;
INSERT INTO orders (customer, total, status) VALUES ('alice', 149.99, 'pending');
SELECT pg_logical_emit_message(
  true, 'outbox',
  '{"aggregate_type":"Order","aggregate_id":"1","event_type":"OrderCreated","payload":{"customer":"alice","total":149.99}}'
);
COMMIT;
source:
  type: postgres
  config:
    id: orders-pg
    dsn: ${POSTGRES_DSN}
    slot: deltaforge_orders
    publication: orders_pub
    tables: [public.orders]
    outbox:
      prefixes: [outbox]

No table, no index, no vacuum - just a WAL entry.

Multiple Outbox Channels

Route order events and payment events to different topic hierarchies:

processors:
  - type: outbox
    tables: [orders_outbox]
    topic: "orders.${event_type}"
    raw_payload: true

  - type: outbox
    tables: [payments_outbox]
    topic: "payments.${event_type}"
    raw_payload: true
    columns:
      payload: data

Additional Headers for Tracing

Forward trace and correlation IDs as Kafka headers:

processors:
  - type: outbox
    topic: "${aggregate_type}.${event_type}"
    raw_payload: true
    additional_headers:
      x-trace-id: trace_id
      x-correlation-id: correlation_id
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload, trace_id, correlation_id)
VALUES ('Order', '42', 'OrderCreated', '{"total":99.99}', 'abc-123', 'req-456');

Migrating from Debezium

If your outbox table uses Debezium’s column names (aggregatetype, aggregateid, type):

processors:
  - type: outbox
    topic: "${aggregatetype}.${type}"
    raw_payload: true
    columns:
      aggregate_type: aggregatetype
      aggregate_id: aggregateid
      event_type: type
    additional_headers:
      x-trace-id: traceid
      x-tenant: tenant

Column mappings control header extraction. The topic template uses raw column names directly.

Key Concepts Demonstrated

  • Transactional outbox: atomicity without distributed transactions
  • BLACKHOLE engine: binlog-only storage for zero-cost outbox tables on MySQL
  • Raw payload delivery: raw_payload: true bypasses envelope wrapping - consumers get exactly what the application wrote
  • Mixed pipeline: outbox events and CDC events coexist in the same pipeline with different serialization
  • Per-aggregate routing: topic template routes events by aggregate_type and event_type

Troubleshooting

Common issues and quick checks when running DeltaForge.

  • 🩺 Health-first: start with /health and /ready to pinpoint failing components.

Runner fails to start

  • Confirm the config path passed to --config exists and is readable.
  • Validate YAML syntax and that required fields like metadata.name and spec.source are present.
  • Ensure environment variables referenced in the spec are set (dsn, brokers, uri, etc.).

Pipelines remain unready

  • Check the /ready endpoint for per-pipeline status and error messages.
  • Verify upstream credentials allow replication (MySQL binlog). Other engines are experimental unless explicitly documented.
  • Inspect sink connectivity; a required sink that cannot connect will block checkpoints.

Slow throughput

  • Increase batch.max_events or batch.max_bytes to reduce flush frequency.
  • Adjust max_inflight to allow more concurrent batches if sinks can handle parallelism.
  • Reduce processor work or add guardrails (limits) to prevent slow JavaScript from stalling the pipeline.

Checkpoints not advancing

  • Review the commit policy: mode: all or required sinks that are unavailable will block progress.
  • Look for sink-specific errors (for example, Kafka broker unreachability or Redis backpressure).
  • Pause and resume the pipeline to force a clean restart after addressing the underlying issue.

/health returns 503

A 503 means at least one pipeline has entered a permanently failed state — it will not recover on its own. Common causes:

CauseLog messageResolution
Failover to a server with no GTID overlapposition lost after failoverRe-snapshot from the new primary
RESET BINARY LOGS AND GTIDS run on same servercheckpoint GTID set no longer reachableClear the checkpoint DB and re-snapshot
Unrecoverable source errorrun task ended with errorCheck source logs; fix the root cause and restart

Use GET /pipelines to see which pipeline has "status": "failed" and check its logs for the specific error. After fixing the root cause, restart the DeltaForge process (or the container) to reset pipeline state.

Avro encoding issues

Schema Registry connection failed

WARN Schema Registry unavailable — using cached schema

DeltaForge is encoding with a previously cached schema ID. Events are still flowing, but new schema registrations (e.g., after DDL changes) will fail until the SR recovers. Check:

  • Schema Registry connectivity: curl http://<sr-url>/subjects
  • Network/firewall between DeltaForge and the SR
  • Monitor deltaforge_avro_sr_cache_fallback_total — if increasing, the SR is unreachable

Schema registration rejected (compatibility failure)

WARN DDL change for {table} produced incompatible Avro schema — encoding with previous version

The source table DDL changed and the new Avro schema was rejected by the Schema Registry’s compatibility rules. DeltaForge attempts to encode with the previous schema version:

  • If encoding succeeds: events continue flowing (the old schema still covers the new data)
  • If encoding fails: events are routed to DLQ with schema_mismatch error

Resolution: Either relax the SR subject compatibility mode (e.g., NONE or FORWARD) or handle the DLQ entries after updating consumer schemas.

Events routed to DLQ with schema_mismatch

A DDL change produced events that can’t be encoded under the cached schema. This happens when a non-backward-compatible change is made (e.g., column type change, NOT NULL added to existing column).

Resolution:

  1. Check deltaforge_avro_encode_failure_total{reason="schema_mismatch"}
  2. Identify the DDL change from source DB logs
  3. Update the SR subject compatibility if needed
  4. Restart the pipeline to clear the schema cache and re-register

BIGINT UNSIGNED overflow warning

WARN BIGINT UNSIGNED column {col} mapped to long — values >= 2^63 will fail encoding

Only appears when unsigned_bigint_mode: long is configured. If a row contains a value ≥ 2^63, encoding will fail and the event is routed to DLQ. The default unsigned_bigint_mode: string avoids this entirely.

Using inferred schema instead of DDL

DEBUG no DDL schema available — falling back to JSON inference (Path C)

DeltaForge couldn’t look up the source table schema and is using a less precise JSON-inferred schema. This happens when:

  • The schema loader hasn’t cached the table schema yet (first events at startup)
  • The table doesn’t match the configured table patterns
  • The source type doesn’t support schema loading

Check deltaforge_avro_encode_total{path="inferred"} — if this counter is growing while path="ddl" is not, investigate why DDL lookup is failing.

DeltaForge

Development Guide

Use this guide to build, test, and extend DeltaForge. It covers local workflows, optional dependency containers, and how to work with Docker images.

All contributions are welcome and highly appreciated.

Local prerequisites

  • Rust toolchain 1.89+ (install via rustup).
  • Optional: Docker or Podman for running the dev dependency stack and the container image.

Workspace layout

  • crates/deltaforge-core : shared event model, pipeline engine, and checkpointing primitives.
  • crates/deltaforge-config : YAML config parsing, environment variable expansion, and pipeline spec types.
  • crates/sources : database CDC readers (MySQL binlog, Postgres logical replication) implemented as pluggable sources.
  • crates/processors : JavaScript-based processors and support code for transforming batches.
  • crates/sinks : sink implementations (Kafka producer, Redis streams, NATS JetStream) plus sink utilities.
  • crates/rest-api : HTTP control plane with health/readiness and pipeline lifecycle endpoints.
  • crates/runner : CLI entrypoint that wires the runtime, metrics, and control plane together.
  • crates/chaos : end-to-end chaos scenario runner, benchmarks, and interactive playground UI.

Use these crate boundaries as reference points when adding new sources, sinks, or pipeline behaviors.

Start dev dependencies

Bring up the optional backing services (MySQL, Kafka, Redis) with Docker Compose:

docker compose -f docker-compose.dev.yml up -d

Each service is exposed on localhost for local runs (5432, 3306, 9092, 6379). The MySQL container seeds demo data from ./init-scripts and configures binlog settings required for CDC.

Prefer the convenience dev.sh wrapper to keep common tasks consistent:

./dev.sh up     # start the dependency stack
./dev.sh down   # stop and remove it
./dev.sh ps     # see container status

Build and test locally

Run the usual Rust workflow from the repo root:

cargo fmt --all
cargo clippy --workspace --all-targets --all-features
cargo test --workspace

Or use the helper script for a single command that mirrors CI expectations:

./dev.sh build         # build project (debug)
./dev.sh build-release # build project (release)
./dev.sh run           # run with examples/dev.yaml
./dev.sh fmt           # format code
./dev.sh lint          # clippy with warnings as errors
./dev.sh test          # full test suite
./dev.sh check         # fmt --check + clippy + tests (mirrors CI)
./dev.sh cov           # generate coverage report

Docker images

Use pre-built images

Multi-arch images (amd64/arm64) are published to GHCR and Docker Hub:

# Minimal (~57MB, scratch-based, no shell)
docker pull ghcr.io/vnvo/deltaforge:latest
docker pull vnvohub/deltaforge:latest

# Debug (~140MB, includes shell for troubleshooting)
docker pull ghcr.io/vnvo/deltaforge:latest-debug
docker pull vnvohub/deltaforge:latest-debug
VariantSizeBaseUse case
latest~57MBscratchProduction
latest-debug~140MBdebian-slimTroubleshooting, has shell

Build locally

Two Dockerfiles are provided:

# Minimal image (~57MB)
docker build -t deltaforge:local .

# Debug image (~140MB, includes shell)
docker build -t deltaforge:local-debug -f Dockerfile.debug .

Or use the dev helper:

./dev.sh docker            # build minimal image
./dev.sh docker-debug      # build debug image
./dev.sh docker-test       # test minimal image runs
./dev.sh docker-test-debug # test debug image runs
./dev.sh docker-all        # build and test all variants
./dev.sh docker-shell      # open shell in debug container

Build multi-arch locally

To build for both amd64 and arm64:

./dev.sh docker-multi-setup  # create buildx builder (once)
./dev.sh docker-multi        # build both architectures

Note: Multi-arch builds use QEMU emulation and take ~30-35 minutes. The images are not loaded locally - use --push to push to a registry.

Run the image

Run the container by mounting your pipeline specs and exposing the API and metrics ports:

docker run --rm \
  -p 8080:8080 -p 9000:9000 \
  -v $(pwd)/examples/dev.yaml:/etc/deltaforge/pipeline.yaml:ro \
  -v deltaforge-checkpoints:/app/data \
  ghcr.io/vnvo/deltaforge:latest \
  --config /etc/deltaforge/pipeline.yaml

Notes:

  • The container listens on 0.0.0.0:8080 for the control plane API with metrics on :9000.
  • Checkpoints are written to /app/data/df_checkpoints.json; mount a volume to persist them across restarts.
  • Environment variables inside the YAML are expanded before parsing.
  • Pass any other runner flags as needed (e.g., --api-addr or --metrics-addr).

Debug a running container

Use the debug image to troubleshoot:

# Run with shell access
docker run --rm -it --entrypoint /bin/bash ghcr.io/vnvo/deltaforge:latest-debug

# Exec into a running container
docker exec -it <container_id> /bin/bash

Dev helper commands

The dev.sh script provides shortcuts for common tasks:

./dev.sh help  # show all commands

Infrastructure

./dev.sh up           # start MySQL, Kafka, Redis
./dev.sh down         # stop and remove containers
./dev.sh ps           # list running services

Kafka

./dev.sh k-list                         # list topics
./dev.sh k-create <topic>               # create topic
./dev.sh k-consume <topic> --from-beginning
./dev.sh k-produce <topic>              # interactive producer

Redis

./dev.sh redis-cli                      # open redis-cli
./dev.sh redis-read <stream>            # read from stream

Database shells

./dev.sh pg-sh       # psql into Postgres
./dev.sh mysql-sh    # mysql into MySQL

Documentation

./dev.sh docs        # serve docs locally (opens browser)
./dev.sh docs-build  # build docs

Pre-release checks

./dev.sh release-check  # run all checks + build all Docker variants

Chaos testing

End-to-end resilience tests and benchmarks run against a live Docker Compose stack with fault injection via Toxiproxy. Scenarios cover network partitions, sink outages, crash recovery, server failover, schema drift, binlog purge, long-running endurance runs, and binlog backlog drain benchmarks.

Prerequisites

Build the debug image first (includes a shell, needed for some scenarios):

docker build -t deltaforge:dev-debug -f Dockerfile.debug .

Stack profiles

The df compose profile starts 3 DeltaForge instances — one per build variant:

InstancePortImageUse case
deltaforge-release8080deltaforge:latestProduction behavior, regression testing
deltaforge-debug8081deltaforge:dev-debugVerbose logging, assertions, chaos scenarios
deltaforge-profile8082deltaforge:dev-profileFlamegraphs, CPU profiling, benchmarks

Pipeline configs are selected dynamically — either via the chaos UI config dropdown, or the CLI --port flag. All 3 instances start with a default config (mysql-to-kafka.yaml) and can be swapped at runtime.

Start the chaos environment

docker compose -f docker-compose.chaos.yml \
  --profile base --profile mysql-infra --profile kafka-infra --profile df up -d

Add --profile pg-infra if testing PostgreSQL scenarios.

Run resilience scenarios

# Target the debug instance (default --port 8080, override with --port)
cargo run -p chaos -- --scenario all --source mysql
cargo run -p chaos -- --scenario network-partition --port 8081
cargo run -p chaos -- --scenario all --source postgres --port 8080

Exit code is 0 on full pass, 1 on any failure — suitable for CI.

Run endurance and benchmark scenarios

Before running soak/drain, apply the appropriate config via the UI or REST API:

# Apply soak config to the profile instance (for flamegraphs)
curl -X POST http://localhost:7474/api/apply-config \
  -H 'Content-Type: application/json' \
  -d '{"port": 8082, "config": "mysql-soak.yaml"}'

# Soak — long-running with random fault injection
cargo run -p chaos -- --scenario soak --port 8082 --topic chaos.soak

# Soak-stable — same workload, no faults (baseline)
cargo run -p chaos -- --scenario soak-stable --port 8082 --topic chaos.soak --duration-mins 30

# Backlog-drain — measures catch-up throughput (1M row replay)
cargo run -p chaos -- --scenario backlog-drain --port 8082 --topic chaos.soak --no-proxy

# Backlog-drain with custom tuning
cargo run -p chaos -- --scenario backlog-drain --port 8082 --topic chaos.soak --no-proxy \
  --drain-max-events 4000 --drain-max-ms 100 \
  --drain-kafka-conf linger.ms=0

# TPC-C — apply tpcc config first, then run
curl -X POST http://localhost:7474/api/apply-config \
  -H 'Content-Type: application/json' \
  -d '{"port": 8081, "config": "mysql-tpcc.yaml"}'
cargo run -p chaos -- --scenario tpcc --port 8081 --duration-mins 30

Avro encoding tests

# Unit + mock tests (no Docker needed)
cargo test -p sinks --test avro_encoding_tests

# Real Schema Registry integration tests (Docker, needs kafka-infra for SR)
cargo test -p sinks --test avro_encoding_tests -- --include-ignored --nocapture --test-threads=1

# Avro chaos scenario: apply Avro config, then run SR outage
curl -X POST http://localhost:7474/api/apply-config \
  -H 'Content-Type: application/json' \
  -d '{"port": 8081, "config": "mysql-to-kafka-avro.yaml"}'
cargo run -p chaos -- --scenario sr-outage --port 8081

# JSON vs Avro throughput comparison:
# Instance 1 (debug): JSON soak
curl -X POST http://localhost:7474/api/apply-config \
  -H 'Content-Type: application/json' \
  -d '{"port": 8081, "config": "mysql-soak.yaml"}'
cargo run -p chaos -- --scenario soak-stable --port 8081 --topic chaos.soak --duration-mins 30

# Instance 2 (profile): Avro soak
curl -X POST http://localhost:7474/api/apply-config \
  -H 'Content-Type: application/json' \
  -d '{"port": 8082, "config": "mysql-soak-avro.yaml"}'
cargo run -p chaos -- --scenario soak-stable --port 8082 --topic chaos.soak.avro --duration-mins 30

Compare in Grafana: rate(deltaforge_sink_events_total[1m]) by instance port.

See Performance Tuning for detailed throughput optimization guidance and profiling instructions.

Playground UI

The chaos binary also ships an interactive web UI for manual exploration:

cargo run -p chaos -- --scenario ui
# Open http://localhost:7474

The UI provides:

  • Live service status with health dots, port badges, and Docker image selector
  • Stale image detection — warns when a container is running an older image after a rebuild
  • Activity bar — shows current operation with per-button loading state and task history
  • Console log — unified output for all actions (infra, faults, scenarios) with smart auto-scroll
  • One-click fault injection via Toxiproxy (partitions, latency, bandwidth throttle)
  • Scenario runner with proxy bypass toggle, drain settings, and live log streaming
  • Pipeline API browser for any DeltaForge instance
  • Config Lab for A/B config comparison with presets
  • CPU profiler — captures flamegraphs from running containers with pipeline context in the subtitle

Data management

The UI includes a Data Management card for resetting persistent state between test runs:

  • Reset Checkpoints — stops all DeltaForge instances and deletes their SQLite checkpoint databases (GTID positions, replication offsets). Source databases and Kafka are untouched. Use this when switching branches or after a binlog purge leaves stale checkpoint state.
  • Reset All Volumes — runs docker compose down -v across all profiles, removing every named volume (MySQL data, Kafka state, Postgres data, checkpoints, Grafana). Full clean slate that requires re-initialization of all services.

Teardown

docker compose -f docker-compose.chaos.yml --profile app down -v
docker compose -f docker-compose.chaos.yml down -v

See crates/chaos/README.md for the full scenario catalogue, network topology, all CLI flags, and instructions for adding new scenarios.

Contributing

  1. Fork the repository
  2. Create a branch from main (e.g., feature/new-sink, fix/checkpoint-bug)
  3. Make your changes
  4. Run ./dev.sh check to ensure CI will pass
  5. Submit a PR against main

Things to Remember

Tests

There a few #[ignore] tests, run them when making deep changes to the sources, pipeline coordination and anything with impact on core functionality.

Logging hygiene

  • Include pipeline, tenant, source_id/sink_id, and batch_id fields on all warnings/errors to make traces joinable in log aggregation tools.
  • Normalize retry/backoff logs so they include the attempt count and sleep duration; consider a structured reason field alongside error details for dashboards.
  • Add info-level summaries on interval (e.g., every N batches) reporting batches processed, average batch size, lag, and sink latency percentiles pulled from the metrics registry to create human-friendly breadcrumbs.
  • Add metrics in a backward-compatible way: prefer new metric names over redefining existing ones to avoid breaking dashboards. Validate cardinality (bounded label sets) before merging.
  • Gate noisy logs behind levels (debug for per-event traces, info for batch summaries, warn/error for retries and failures).
  • Exercise the new metrics in integration tests by asserting counters change when sending synthetic events through pipelines.

Roadmap

Completed

  • Avro encoding with Confluent Schema Registry — DDL-derived Avro schemas, Confluent wire format, all sinks supported, type conversion policies, Schema Registry failure handling with cached fallback
  • HTTP/Webhook sink — POST/PUT to any URL, URL templates, batch mode, retry with backoff
  • Dead Letter Queue — per-event failure routing, overflow policies, REST API for inspection
  • Per-sink independent checkpoints — each sink advances independently, source replays from minimum
  • Exactly-once delivery — Kafka transactional producer with producer fencing detection
  • Helm chart — StatefulSet, ConfigMap, PVC, ServiceMonitor, PDB
  • Schema sensing — automatic schema inference from payloads, high-cardinality key detection

In Progress

  • Avro Schema Registry Phase 2 — publish sensed schemas to external Schema Registry as catalog metadata (under sensed. prefix). See RFC.

Planned

  • MongoDB source — change streams CDC
  • S3/Parquet sink — data lake integration
  • Event replay — replay DLQ entries or historical events
  • Kubernetes operator — PipelineTemplate + PipelinePool for fleet management
  • OpenAPI spec generation — auto-generated REST API documentation