Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

DeltaForge

Version Arch License

Introduction

DeltaForge is a modular, config-driven Change Data Capture (CDC) micro-framework built in Rust. It streams database changes into downstream systems like Kafka, Redis, and NATS while giving you full control over how each event is processed.

Pipelines are defined declaratively in YAML, making it straightforward to onboard new use cases without custom code.

Built with Sources Processors Sinks
Rust
Rust
MySQL PostgreSQL
MySQL · PostgreSQL
JavaScript
JavaScript
Kafka Redis NATS
Kafka · Redis · NATS

Why DeltaForge?

Core Strengths

  • Powered by Rust : Predictable performance, memory safety, and minimal resource footprint.
  • 🔌 Pluggable architecture : Sources, processors, and sinks are modular and independently extensible.
  • 🧩 Declarative pipelines : Define sources, transforms, sinks, and commit policies in version-controlled YAML with environment variable expansion for secrets.
  • 📦 Reliable checkpointing : Resume safely after restarts with at-least-once delivery guarantees.
  • 🛠️ Cloud-native ready : Single binary, Docker images, JSON logs, Prometheus metrics, and liveness/readiness probes for Kubernetes.

Schema Intelligence

  • 🔍 Schema sensing : Automatically infer and track schema from event payloads, including deep inspection of nested JSON structures.
  • 🏷️ Schema fingerprinting : SHA-256 based change detection with schema-to-checkpoint correlation for reliable replay.
  • 🗃️ Source-owned semantics : Preserves native database types (PostgreSQL arrays, MySQL JSON, etc.) instead of normalizing to a universal type system.

Operational Features

  • 🔄 Automatic reconnection : Exponential backoff with jitter for resilient connections.
  • 🎯 Flexible table selection : Wildcard patterns (db.*, schema.prefix%) for easy onboarding.
  • 📀 Transaction boundaries : Optionally keep source transactions intact across batches.
  • ⚙️ Commit policies : Control checkpoint behavior with all, required, or quorum modes across multiple sinks.

Use Cases

DeltaForge is designed for:

  • Real-time data synchronization : Keep caches, search indexes, and analytics systems in sync.
  • Event-driven architectures : Stream database changes to Kafka or NATS for downstream consumers.
  • Lightweight ETL : Transform and route data without heavyweight infrastructure.

DeltaForge is not a DAG-based stream processor. It is a focused CDC engine meant to replace tools like Debezium when you need a lighter, cloud-native, and more customizable runtime.

Getting Started

GuideDescription
QuickstartGet DeltaForge running in minutes
CDC OverviewUnderstand Change Data Capture concepts
ConfigurationPipeline spec reference
DevelopmentBuild from source, contribute

Quick Example

metadata:
  name: orders-to-kafka
  tenant: acme

spec:
  source:
    type: mysql
    config:
      dsn: ${MYSQL_DSN}
      tables: [shop.orders]

  processors:
    - type: javascript
      inline: |
        (event) => {
          event.processed_at = Date.now();
          return [event];
        }

  sinks:
    - type: kafka
      config:
        brokers: ${KAFKA_BROKERS}
        topic: order-events

Installation

Docker (recommended):

docker pull ghcr.io/vnvo/deltaforge:latest

From source:

git clone https://github.com/vnvo/deltaforge.git
cd deltaforge
cargo build --release

See the Development Guide for detailed build instructions and available Docker image variants.

License

DeltaForge is dual-licensed under MIT and Apache 2.0.

Change Data Capture (CDC)

Change Data Capture (CDC) is the practice of streaming database mutations as ordered events so downstream systems can stay in sync without periodic full loads. Rather than asking “what does my data look like now?”, CDC answers “what changed, and when?”

DeltaForge is a CDC engine built to make log-based change streams reliable, observable, and operationally simple in modern, containerized environments. It focuses on row-level CDC from MySQL binlog and Postgres logical replication to keep consumers accurate and latency-aware while minimizing impact on source databases.

In short: DeltaForge tails committed transactions from MySQL and Postgres logs, preserves ordering and transaction boundaries, and delivers events to Kafka and Redis with checkpointed delivery, configurable batching, and Prometheus metrics - without requiring a JVM or distributed coordinator.


Why CDC matters

Traditional data integration relies on periodic batch jobs that query source systems, compare snapshots, and push differences downstream. This approach worked for decades, but modern architectures demand something better.

The batch ETL problem: A nightly sync means your analytics are always a day stale. An hourly sync still leaves gaps and hammers your production database with expensive SELECT * queries during business hours. As data volumes grow, these jobs take longer, fail more often, and compete for the same resources your customers need.

CDC flips the model. Instead of pulling data on a schedule, you subscribe to changes as they happen.

AspectBatch ETLCDC
LatencyMinutes to hoursSeconds to milliseconds
Source loadHigh (repeated scans)Minimal (log tailing)
Data freshnessStale between runsNear real-time
Failure recoveryRe-run entire jobResume from checkpoint
Change detectionDiff comparisonNative from source

These benefits compound as systems scale and teams decentralize:

  • Deterministic replay: Ordered events allow consumers to reconstruct state or power exactly-once delivery with checkpointing.
  • Polyglot delivery: The same change feed can serve caches, queues, warehouses, and search indexes simultaneously without additional source queries.

How CDC works

All CDC implementations share a common goal: detect changes and emit them as events. The approaches differ in how they detect those changes.

Log-based CDC

Databases maintain transaction logs (MySQL binlog, Postgres WAL) that record every committed change for durability and replication. Log-based CDC reads these logs directly, capturing changes without touching application tables.

┌─────────────┐    commits    ┌─────────────┐    tails     ┌─────────────┐
│ Application │──────────────▶│  Database   │─────────────▶│ CDC Engine  │
└─────────────┘               │   + WAL     │              └──────┬──────┘
                              └─────────────┘                     │
                                                                  ▼
                                                           ┌─────────────┐
                                                           │   Kafka /   │
                                                           │   Redis     │
                                                           └─────────────┘

Advantages:

  • Zero impact on source table performance
  • Captures all changes including those from triggers and stored procedures
  • Preserves transaction boundaries and ordering
  • Can capture deletes without soft-delete columns

Trade-offs:

  • Requires database configuration (replication slots, binlog retention)
  • Schema changes need careful handling
  • Log retention limits how far back you can replay

DeltaForge uses log-based CDC exclusively. This allows DeltaForge to provide stronger ordering guarantees, lower source impact, and simpler operational semantics than hybrid approaches that mix log tailing with polling or triggers.

Trigger-based CDC

Database triggers fire on INSERT, UPDATE, and DELETE operations, writing change records to a shadow table that a separate process polls.

Advantages:

  • Works on databases without accessible transaction logs
  • Can capture application-level context unavailable in logs

Trade-offs:

  • Adds write overhead to every transaction
  • Triggers can be disabled or forgotten during schema migrations
  • Shadow tables require maintenance and can grow unbounded

Polling-based CDC

A process periodically queries tables for rows modified since the last check, typically using an updated_at timestamp or incrementing ID.

Advantages:

  • Simple to implement
  • No special database configuration required

Trade-offs:

  • Cannot reliably detect deletes
  • Requires updated_at columns on every table
  • Polling frequency trades off latency against database load
  • Clock skew and transaction visibility can cause missed or duplicate events

Anatomy of a CDC event

A well-designed CDC event contains everything downstream consumers need to process the change correctly.

{
  "id": "evt_01J7K9X2M3N4P5Q6R7S8T9U0V1",
  "source": {
    "database": "shop",
    "table": "orders",
    "server_id": "mysql-prod-1"
  },
  "operation": "update",
  "timestamp": "2025-01-15T14:32:01.847Z",
  "transaction": {
    "id": "gtid:3E11FA47-71CA-11E1-9E33-C80AA9429562:42",
    "position": 15847293
  },
  "before": {
    "id": 12345,
    "status": "pending",
    "total": 99.99,
    "updated_at": "2025-01-15T14:30:00.000Z"
  },
  "after": {
    "id": 12345,
    "status": "shipped",
    "total": 99.99,
    "updated_at": "2025-01-15T14:32:01.000Z"
  },
  "schema_version": "v3"
}

Key components:

FieldPurpose
operationINSERT, UPDATE, DELETE, or DDL
before / afterRow state before and after the change (enables diff logic)
transactionGroups changes from the same database transaction
timestampWhen the change was committed at the source
schema_versionHelps consumers handle schema evolution

Not all fields are present for every operation-before is omitted for INSERTs, after is omitted for DELETEs - but the event envelope and metadata fields are consistent across MySQL and Postgres sources.


Real-world use cases

Cache invalidation and population

Problem: Your Redis cache serves product catalog data, but cache invalidation logic is scattered across dozens of services. Some forget to invalidate, others invalidate too aggressively, and debugging staleness issues takes hours.

CDC solution: Stream changes from the products table to a message queue. A dedicated consumer reads the stream and updates or deletes cache keys based on the operation type. This centralizes invalidation logic, provides replay capability for cache rebuilds, and removes cache concerns from application code entirely.

┌──────────┐     CDC      ┌─────────────┐   consumer   ┌─────────────┐
│  MySQL   │─────────────▶│   Stream    │─────────────▶│ Cache Keys  │
│ products │              │  (Kafka/    │              │ product:123 │
└──────────┘              │   Redis)    │              └─────────────┘
                          └─────────────┘

Event-driven microservices

Problem: Your order service needs to notify inventory, shipping, billing, and analytics whenever an order changes state. Direct service-to-service calls create tight coupling and cascade failures.

CDC solution: Publish order changes to a durable message queue. Each downstream service subscribes independently, processes at its own pace, and can replay events after failures or during onboarding. The order service doesn’t need to know about its consumers.

┌─────────────┐     CDC      ┌─────────────┐
│   Orders    │─────────────▶│   Message   │
│   Database  │              │   Queue     │
└─────────────┘              └──────┬──────┘
                    ┌───────────────┼───────────────┐
                    ▼               ▼               ▼
             ┌───────────┐   ┌───────────┐   ┌───────────┐
             │ Inventory │   │ Shipping  │   │ Analytics │
             └───────────┘   └───────────┘   └───────────┘

Search index synchronization

Problem: Your Elasticsearch index drifts from the source of truth. Full reindexing takes hours and blocks search updates during the process.

CDC solution: Stream changes continuously to keep the index synchronized. Use the before image to remove old documents and the after image to index new content.

Data warehouse incremental loads

Problem: Nightly full loads into your warehouse take 6 hours and block analysts until noon. Business users complain that dashboards show yesterday’s numbers.

CDC solution: Stream changes to a staging topic, then micro-batch into your warehouse every few minutes. Analysts get near real-time data without impacting source systems.

Audit logging and compliance

Problem: Regulations require you to maintain a complete history of changes to sensitive data. Application-level audit logging is inconsistent and can be bypassed.

CDC solution: The transaction log captures every committed change regardless of how it was made - application code, admin scripts, or direct SQL. Stream these events to immutable storage for compliance.

Cross-region replication

Problem: You need to replicate data to a secondary region for disaster recovery, but built-in replication doesn’t support the transformations you need.

CDC solution: Stream changes through a CDC pipeline that filters, transforms, and routes events to the target region’s databases or message queues.


Architecture patterns

The outbox pattern

When you need to update a database and publish an event atomically, the outbox pattern provides exactly-once semantics without distributed transactions.

┌─────────────────────────────────────────┐
│              Single Transaction         │
│  ┌─────────────┐    ┌─────────────────┐ │
│  │ UPDATE      │    │ INSERT INTO     │ │
│  │ orders SET  │ +  │ outbox (event)  │ │
│  │ status=...  │    │                 │ │
│  └─────────────┘    └─────────────────┘ │
└─────────────────────────────────────────┘
                      │
                      ▼ CDC
               ┌─────────────┐
               │   Kafka     │
               └─────────────┘
  1. The application writes business data and an event record in the same transaction.
  2. CDC tails the outbox table and publishes events to Kafka.
  3. A cleanup process (or CDC itself) removes processed outbox rows.

This guarantees that events are published if and only if the transaction commits.

When to skip the outbox: If your only requirement is to react to committed database state (not application intent), direct CDC from business tables is often simpler than introducing an outbox. The outbox pattern adds value when you need custom event payloads, explicit event versioning, or when the event schema differs significantly from table structure.

Event sourcing integration

CDC complements event sourcing by bridging legacy systems that weren’t built event-first. Stream changes from existing tables into event stores, then gradually migrate to native event sourcing.

CQRS (Command Query Responsibility Segregation)

CDC naturally supports CQRS by populating read-optimized projections from the write model. Changes flow through the CDC pipeline to update denormalized views, search indexes, or cache layers.


Challenges and solutions

Schema evolution

Databases change. Columns get added, renamed, or removed. Types change. CDC pipelines need to handle this gracefully.

Strategies:

  • Schema registry: Store and version schemas centrally (e.g., Confluent Schema Registry with Avro/Protobuf).
  • Forward compatibility: Add columns as nullable; avoid removing columns that consumers depend on.
  • Consumer tolerance: Design consumers to ignore unknown fields and handle missing optional fields.
  • Processor transforms: Use DeltaForge’s JavaScript processors to normalize schemas before sinks.

Ordering guarantees

Events must arrive in the correct order for consumers to reconstruct state accurately. A DELETE arriving before its corresponding INSERT would be catastrophic.

DeltaForge guarantees:

  • Source-order preservation per table partition (always enabled)
  • Transaction boundary preservation when respect_source_tx: true is configured in batch settings

Kafka sink uses consistent partitioning by primary key to maintain ordering within a partition at the consumer.

Exactly-once delivery

Network failures, process crashes, and consumer restarts can cause duplicates or gaps. True exactly-once semantics require coordination between source, pipeline, and sink.

DeltaForge approach:

  • Checkpoints track the last committed position in the source log.
  • Configurable commit policies (all, required, quorum) control when checkpoints advance.
  • Kafka sink supports idempotent producers; transactional writes available via exactly_once: true.

Default behavior: DeltaForge provides at-least-once delivery out of the box. Exactly-once semantics require sink support and explicit configuration.

High availability

Production CDC pipelines need to handle failures without data loss or extended downtime.

Best practices:

  • Run multiple pipeline instances with leader election.
  • Store checkpoints in durable storage (DeltaForge persists to local files, mountable volumes in containers).
  • Monitor lag between source position and checkpoint position.
  • Set up alerts for pipeline failures and excessive lag.

Expectations: DeltaForge checkpoints ensure no data loss on restart, but does not currently include built-in leader election. For HA deployments, use external coordination (Kubernetes leader election, etcd locks) or run active-passive with health-check-based failover.

Backpressure

When sinks can’t keep up with the change rate, pipelines need to slow down gracefully rather than dropping events or exhausting memory.

DeltaForge handles backpressure through:

  • Configurable batch sizes (max_events, max_bytes, max_ms).
  • In-flight limits (max_inflight) that bound concurrent sink writes.
  • Blocking reads from source when batches queue up.

Performance considerations

Batching trade-offs

SettingLow valueHigh value
max_eventsLower latency, more overheadHigher throughput, more latency
max_msFaster flush, smaller batchesLarger batches, delayed flush
max_bytesMemory-safe, frequent commitsEfficient for large rows

Start with DeltaForge defaults and tune based on observed latency and throughput.

Source database impact

Log-based CDC has minimal impact, but consider:

  • Replication slot retention: Paused pipelines cause WAL/binlog accumulation.
  • Connection limits: Each pipeline holds a replication connection.
  • Network bandwidth: High-volume tables generate significant log traffic.

Sink throughput

  • Kafka: Tune batch.size, linger.ms, and compression in client_conf.
  • Redis: Use pipelining and connection pooling for high-volume streams.

Monitoring and observability

CDC pipelines are long-running, stateful processes; without metrics and alerts, failures are silent by default. A healthy pipeline requires visibility into lag, throughput, and errors.

Key metrics to track

MetricDescriptionAlert threshold
cdc_lag_secondsTime between event timestamp and processing> 60s
events_processed_totalThroughput counterSudden drops
checkpoint_lag_eventsEvents since last checkpoint> 10,000
sink_errors_totalFailed sink writesAny sustained errors
batch_size_avgEvents per batchOutside expected range

DeltaForge exposes Prometheus metrics on the configurable metrics endpoint (default :9000).

Health checks

  • GET /healthz: Liveness probe - is the process running?
  • GET /readyz: Readiness probe - are pipelines connected and processing?
  • GET /pipelines: Detailed status of each pipeline including configuration.

Choosing a CDC solution

When evaluating CDC tools, consider:

FactorQuestions to ask
Source supportDoes it support your databases? MySQL binlog? Postgres logical replication?
Sink flexibilityCan it write to your target systems? Kafka, Redis, HTTP, custom?
TransformationCan you filter, enrich, or reshape events in-flight?
Operational overheadHow much infrastructure does it require? JVM? Distributed coordinator?
Resource efficiencyWhat’s the memory/CPU footprint per pipeline?
Cloud-nativeDoes it containerize cleanly? Support health checks? Emit metrics?

Where DeltaForge fits

DeltaForge intentionally avoids the operational complexity of JVM-based CDC stacks (Kafka Connect-style deployments with Zookeeper, Connect workers, and converter configurations) while remaining compatible with Kafka-centric architectures.

DeltaForge is designed for teams that want:

  • Lightweight runtime: Single binary, minimal memory footprint, no JVM warmup.
  • Config-driven pipelines: YAML specs instead of code for common patterns.
  • Inline transformation: JavaScript processors for custom logic without recompilation.
  • Container-native operations: Built for Kubernetes with health endpoints and Prometheus metrics.

DeltaForge is not designed for:

  • Complex DAG-based stream processing with windowed aggregations
  • Stateful joins across multiple streams
  • Sources beyond MySQL and Postgres (currently)
  • Built-in schema registry integration (use external registries)

If you need those capabilities, consider dedicated stream processors or the broader Kafka ecosystem. DeltaForge excels at getting data out of databases and into your event infrastructure reliably and efficiently.


Getting started

Ready to try CDC with DeltaForge? Head to the Quickstart guide to run your first pipeline in minutes.

For production deployments, review the Development guide for container builds and operational best practices.

Quickstart

Get DeltaForge running in minutes.

1. Prepare a pipeline spec

Create a YAML file that defines your CDC pipeline. Environment variables are expanded at runtime, so secrets stay out of version control.

metadata:
  name: orders-mysql-to-kafka
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders

  processors:
    - type: javascript
      id: transform
      inline: |
        (event) => {
          event.tags = ["processed"];
          return [event];
        }

  sinks:
    - type: kafka
      config:
        id: orders-kafka
        brokers: ${KAFKA_BROKERS}
        topic: orders
        required: true

  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 1000

2. Start DeltaForge

Using Docker (recommended):

docker run --rm \
  -p 8080:8080 -p 9000:9000 \
  -e MYSQL_DSN="mysql://user:pass@host:3306/db" \
  -e KAFKA_BROKERS="kafka:9092" \
  -v $(pwd)/pipeline.yaml:/etc/deltaforge/pipeline.yaml:ro \
  -v deltaforge-checkpoints:/app/data \
  ghcr.io/vnvo/deltaforge:latest \
  --config /etc/deltaforge/pipeline.yaml

From source:

cargo run -p runner -- --config ./pipeline.yaml

Runner options

FlagDefaultDescription
--config(required)Path to pipeline spec file or directory
--api-addr0.0.0.0:8080REST API address
--metrics-addr0.0.0.0:9095Prometheus metrics address

3. Verify it’s running

Check health and pipeline status:

# Liveness probe
curl http://localhost:8080/healthz

# Readiness with pipeline status
curl http://localhost:8080/readyz

# List all pipelines
curl http://localhost:8080/pipelines

4. Manage pipelines

Control pipelines via the REST API:

# Pause a pipeline
curl -X POST http://localhost:8080/pipelines/orders-mysql-to-kafka/pause

# Resume a pipeline
curl -X POST http://localhost:8080/pipelines/orders-mysql-to-kafka/resume

# Stop a pipeline
curl -X POST http://localhost:8080/pipelines/orders-mysql-to-kafka/stop

Next steps

Configuration

Pipelines are defined as YAML documents that map directly to the PipelineSpec type. Environment variables are expanded before parsing using ${VAR} syntax, so secrets and connection strings can be injected at runtime.

Document structure

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: <pipeline-name>
  tenant: <tenant-id>
spec:
  source: { ... }
  processors: [ ... ]
  sinks: [ ... ]
  batch: { ... }
  commit_policy: { ... }
  schema_sensing: { ... }

Metadata

FieldTypeRequiredDescription
namestringYesUnique pipeline identifier. Used in API routes and metrics.
tenantstringYesBusiness-oriented tenant label for multi-tenancy.

Spec fields

FieldTypeRequiredDescription
sourceobjectYesDatabase source configuration. See Sources.
processorsarrayNoOrdered list of processors. See Processors.
sinksarrayYes (at least one)One or more sinks that receive each batch. See Sinks.
shardingobjectNoOptional hint for downstream distribution.
connection_policyobjectNoHow the runtime establishes upstream connections.
batchobjectNoCommit unit thresholds. See Batching.
commit_policyobjectNoHow sink acknowledgements gate checkpoints. See Commit policy.
schema_sensingobjectNoAutomatic schema inference from event payloads. See Schema sensing.

Sources

MySQL

Captures row-level changes via binlog replication. See MySQL source documentation for prerequisites and detailed configuration.

source:
  type: mysql
  config:
    id: orders-mysql
    dsn: ${MYSQL_DSN}
    tables:
      - shop.orders
      - shop.order_items
FieldTypeDescription
idstringUnique identifier for checkpoints and metrics
dsnstringMySQL connection string with replication privileges
tablesarrayTable patterns to capture; omit for all tables

Table patterns support SQL LIKE syntax:

  • db.table - exact match
  • db.prefix% - tables matching prefix
  • db.* - all tables in database
  • %.table - table in any database

Turso

source:
  type: turso
  config:
    id: turso-main
    url: "libsql://your-db.turso.io"
    auth_token: ${TURSO_AUTH_TOKEN}
    tables: ["users", "orders"]
    cdc_mode: auto
    poll_interval_ms: 1000
    native_cdc:
      level: data
FieldTypeDefaultDescription
idstringLogical identifier
urlstringDatabase URL
auth_tokenstringTurso cloud token
tablesarrayTables to track
cdc_modestringautonative, triggers, polling, auto
poll_interval_msint1000Polling interval
native_cdc.levelstringdatabinlog or data

PostgreSQL

Captures row-level changes via logical replication using the pgoutput plugin. See PostgreSQL source documentation for prerequisites and detailed configuration.

source:
  type: postgres
  config:
    id: orders-postgres
    dsn: ${POSTGRES_DSN}
    slot: deltaforge_orders
    publication: orders_pub
    tables:
      - public.orders
      - public.order_items
    start_position: earliest
FieldTypeDefaultDescription
idstringUnique identifier
dsnstringPostgreSQL connection string
slotstringReplication slot name
publicationstringPublication name
tablesarrayTable patterns to capture
start_positionstringearliestearliest, latest, or {lsn: "X/Y"}

Table patterns support SQL LIKE syntax (same as MySQL):

  • schema.table - exact match
  • schema.prefix% - tables matching prefix
  • schema.* - all tables in schema
  • %.table - table in any schema
  • table - defaults to public.table

Processors

Processors transform batches of events before delivery to sinks.

JavaScript

processors:
  - type: javascript
    id: transform
    inline: |
      function processBatch(events) {
        return events.map(event => {
          event.tags = ["processed"];
          return event;
        });
      }
    limits:
      cpu_ms: 50
      mem_mb: 128
      timeout_ms: 500
FieldTypeDescription
idstringProcessor identifier
inlinestringJS source defining processBatch(events)
limits.cpu_msintCPU time limit per batch
limits.mem_mbintMemory limit
limits.timeout_msintExecution timeout

The processBatch(events) function receives an array of events and can return:

  • An array of events (modified, filtered, or expanded)
  • A single event object (wrapped in array automatically)
  • null or undefined to use the mutated input array
  • Empty array [] to drop all events

Sinks

Kafka

sinks:
  - type: kafka
    config:
      id: orders-kafka
      brokers: ${KAFKA_BROKERS}
      topic: orders
      required: true
      exactly_once: false
      client_conf:
        message.timeout.ms: "5000"
FieldTypeDefaultDescription
idstringSink identifier
brokersstringComma-separated broker list
topicstringDestination topic
requiredbooltrueWhether this sink gates checkpoints
exactly_onceboolfalseEnable EOS semantics
client_confmap{}Raw librdkafka config overrides

Redis

sinks:
  - type: redis
    config:
      id: orders-redis
      uri: ${REDIS_URI}
      stream: orders
      required: true
FieldTypeDefaultDescription
idstringSink identifier
uristringRedis connection URI
streamstringRedis stream key
requiredbooltrueWhether this sink gates checkpoints

NATS

sinks:
  - type: nats
    config:
      id: orders-nats
      url: ${NATS_URL}
      subject: orders.events
      stream: ORDERS
      required: true
      send_timeout_secs: 5
      batch_timeout_secs: 30
FieldTypeDefaultDescription
idstringSink identifier
urlstringNATS server URL
subjectstringSubject to publish to
streamstringJetStream stream name
requiredbooltrueGates checkpoints
send_timeout_secsint5Publish timeout
batch_timeout_secsint30Batch timeout
connect_timeout_secsint10Connection timeout
credentials_filestringNATS credentials file
usernamestringAuth username
passwordstringAuth password
tokenstringAuth token

Batching

batch:
  max_events: 500
  max_bytes: 1048576
  max_ms: 1000
  respect_source_tx: true
  max_inflight: 2
FieldTypeDefaultDescription
max_eventsint500Flush after this many events
max_bytesint1048576Flush after size reaches limit
max_msint1000Flush after time (ms)
respect_source_txbooltrueNever split source transactions
max_inflightint2Max concurrent batches

Commit policy

commit_policy:
  mode: required

# For quorum mode:
commit_policy:
  mode: quorum
  quorum: 2
ModeDescription
allEvery sink must acknowledge before checkpoint
requiredOnly required: true sinks must acknowledge (default)
quorumCheckpoint after quorum sinks acknowledge

Schema sensing

schema_sensing:
  enabled: true
  deep_inspect:
    enabled: true
    max_depth: 3
    max_sample_size: 500
  sampling:
    warmup_events: 50
    sample_rate: 5
    structure_cache: true
    structure_cache_size: 50
FieldTypeDefaultDescription
enabledboolfalseEnable schema sensing
deep_inspect.enabledbooltrueInspect nested JSON
deep_inspect.max_depthint10Max nesting depth
deep_inspect.max_sample_sizeint1000Max events for deep analysis
sampling.warmup_eventsint1000Events to fully analyze first
sampling.sample_rateint10After warmup, analyze 1 in N
sampling.structure_cachebooltrueCache structure fingerprints
sampling.structure_cache_sizeint100Max cached structures

Complete example

MySQL to Kafka

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-mysql-to-kafka
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders

  processors:
    - type: javascript
      id: transform
      inline: |
        function processBatch(events) {
          return events.map(event => {
            event.tags = (event.tags || []).concat(["normalized"]);
            return event;
          });
        }
      limits:
        cpu_ms: 50
        mem_mb: 128
        timeout_ms: 500

  sinks:
    - type: kafka
      config:
        id: orders-kafka
        brokers: ${KAFKA_BROKERS}
        topic: orders
        required: true
        exactly_once: false
        client_conf:
          message.timeout.ms: "5000"
    - type: redis
      config:
        id: orders-redis
        uri: ${REDIS_URI}
        stream: orders
        required: false

  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 1000
    respect_source_tx: true
    max_inflight: 2

  commit_policy:
    mode: required

  schema_sensing:
    enabled: true
    deep_inspect:
      enabled: true
      max_depth: 3
    sampling:
      warmup_events: 50
      sample_rate: 5

PostgreSQL to Kafka

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: users-postgres-to-kafka
  tenant: acme

spec:
  source:
    type: postgres
    config:
      id: users-postgres
      dsn: ${POSTGRES_DSN}
      slot: deltaforge_users
      publication: users_pub
      tables:
        - public.users
        - public.user_sessions
      start_position: earliest

  sinks:
    - type: kafka
      config:
        id: users-kafka
        brokers: ${KAFKA_BROKERS}
        topic: user-events
        required: true

  batch:
    max_events: 500
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: required

MySQL to NATS

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-mysql-to-nats
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders
        - shop.order_items

  sinks:
    - type: nats
      config:
        id: orders-nats
        url: ${NATS_URL}
        subject: orders.events
        stream: ORDERS
        required: true

  batch:
    max_events: 500
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: required

REST API Reference

DeltaForge exposes a REST API for health checks, pipeline management, schema inspection, and drift detection. All endpoints return JSON.

Base URL

Default: http://localhost:8080

Configure with --api-addr:

deltaforge --config pipelines.yaml --api-addr 0.0.0.0:9090

Health Endpoints

Liveness Probe

GET /healthz

Returns ok if the process is running. Use for Kubernetes liveness probes.

Response: 200 OK

ok

Readiness Probe

GET /readyz

Returns pipeline states. Use for Kubernetes readiness probes.

Response: 200 OK

{
  "status": "ready",
  "pipelines": [
    {
      "name": "orders-cdc",
      "status": "running",
      "spec": { ... }
    }
  ]
}

Pipeline Management

List Pipelines

GET /pipelines

Returns all pipelines with current status.

Response: 200 OK

[
  {
    "name": "orders-cdc",
    "status": "running",
    "spec": {
      "metadata": { "name": "orders-cdc", "tenant": "acme" },
      "spec": { ... }
    }
  }
]

Get Pipeline

GET /pipelines/{name}

Returns a single pipeline by name.

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "running",
  "spec": { ... }
}

Errors:

  • 404 Not Found - Pipeline doesn’t exist

Create Pipeline

POST /pipelines
Content-Type: application/json

Creates a new pipeline from a full spec.

Request:

{
  "metadata": {
    "name": "orders-cdc",
    "tenant": "acme"
  },
  "spec": {
    "source": {
      "type": "mysql",
      "config": {
        "id": "mysql-1",
        "dsn": "mysql://user:pass@host/db",
        "tables": ["shop.orders"]
      }
    },
    "processors": [],
    "sinks": [
      {
        "type": "kafka",
        "config": {
          "id": "kafka-1",
          "brokers": "localhost:9092",
          "topic": "orders"
        }
      }
    ]
  }
}

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "running",
  "spec": { ... }
}

Errors:

  • 409 Conflict - Pipeline already exists

Update Pipeline

PATCH /pipelines/{name}
Content-Type: application/json

Applies a partial update to an existing pipeline. The pipeline is stopped, updated, and restarted.

Request:

{
  "spec": {
    "batch": {
      "max_events": 1000,
      "max_ms": 500
    }
  }
}

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "running",
  "spec": { ... }
}

Errors:

  • 404 Not Found - Pipeline doesn’t exist
  • 400 Bad Request - Name mismatch in patch

Delete Pipeline

DELETE /pipelines/{name}

Permanently deletes a pipeline. This removes the pipeline from the runtime and cannot be undone.

Response: 204 No Content

Errors:

  • 404 Not Found - Pipeline doesn’t exist

Pause Pipeline

POST /pipelines/{name}/pause

Pauses ingestion. Events in the buffer are not processed until resumed.

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "paused",
  "spec": { ... }
}

Resume Pipeline

POST /pipelines/{name}/resume

Resumes a paused pipeline.

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "running",
  "spec": { ... }
}

Stop Pipeline

POST /pipelines/{name}/stop

Stops a pipeline. Final checkpoint is saved.

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "stopped",
  "spec": { ... }
}

Schema Management

List Database Schemas

GET /pipelines/{name}/schemas

Returns all tracked database schemas for a pipeline. These are the schemas loaded directly from the source database.

Response: 200 OK

[
  {
    "database": "shop",
    "table": "orders",
    "column_count": 5,
    "primary_key": ["id"],
    "fingerprint": "sha256:a1b2c3d4e5f6...",
    "registry_version": 2
  },
  {
    "database": "shop",
    "table": "customers",
    "column_count": 8,
    "primary_key": ["id"],
    "fingerprint": "sha256:f6e5d4c3b2a1...",
    "registry_version": 1
  }
]

Get Schema Details

GET /pipelines/{name}/schemas/{db}/{table}

Returns detailed schema information including all columns.

Response: 200 OK

{
  "database": "shop",
  "table": "orders",
  "columns": [
    {
      "name": "id",
      "type": "bigint(20) unsigned",
      "nullable": false,
      "default": null,
      "extra": "auto_increment"
    },
    {
      "name": "customer_id",
      "type": "bigint(20)",
      "nullable": false,
      "default": null
    }
  ],
  "primary_key": ["id"],
  "fingerprint": "sha256:a1b2c3d4..."
}

Schema Sensing

Schema sensing automatically infers schema structure from JSON event payloads. This is useful for sources that don’t provide schema metadata or for detecting schema evolution in JSON columns.

List Inferred Schemas

GET /pipelines/{name}/sensing/schemas

Returns all schemas inferred via sensing for a pipeline.

Response: 200 OK

[
  {
    "table": "orders",
    "fingerprint": "sha256:abc123...",
    "sequence": 3,
    "event_count": 1500,
    "stabilized": true,
    "first_seen": "2025-01-15T10:30:00Z",
    "last_seen": "2025-01-15T14:22:00Z"
  }
]
FieldDescription
tableTable name (or table:column for JSON column sensing)
fingerprintSHA-256 content hash of current schema
sequenceMonotonic version number (increments on evolution)
event_countTotal events observed
stabilizedWhether schema has stopped sampling (structure stable)
first_seenFirst observation timestamp
last_seenMost recent observation timestamp

Get Inferred Schema Details

GET /pipelines/{name}/sensing/schemas/{table}

Returns detailed inferred schema including all fields.

Response: 200 OK

{
  "table": "orders",
  "fingerprint": "sha256:abc123...",
  "sequence": 3,
  "event_count": 1500,
  "stabilized": true,
  "fields": [
    {
      "name": "id",
      "types": ["integer"],
      "nullable": false,
      "optional": false
    },
    {
      "name": "metadata",
      "types": ["object"],
      "nullable": true,
      "optional": false,
      "nested_field_count": 5
    },
    {
      "name": "tags",
      "types": ["array"],
      "nullable": false,
      "optional": true,
      "array_element_types": ["string"]
    }
  ],
  "first_seen": "2025-01-15T10:30:00Z",
  "last_seen": "2025-01-15T14:22:00Z"
}

Export JSON Schema

GET /pipelines/{name}/sensing/schemas/{table}/json-schema

Exports the inferred schema as a standard JSON Schema document.

Response: 200 OK

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "orders",
  "type": "object",
  "properties": {
    "id": { "type": "integer" },
    "metadata": { "type": ["object", "null"] },
    "tags": {
      "type": "array",
      "items": { "type": "string" }
    }
  },
  "required": ["id", "metadata"]
}

Get Sensing Cache Statistics

GET /pipelines/{name}/sensing/stats

Returns cache performance statistics for schema sensing.

Response: 200 OK

{
  "tables": [
    {
      "table": "orders",
      "cached_structures": 3,
      "max_cache_size": 100,
      "cache_hits": 1450,
      "cache_misses": 50
    }
  ],
  "total_cache_hits": 1450,
  "total_cache_misses": 50,
  "hit_rate": 0.9667
}

Drift Detection

Drift detection compares expected database schema against observed data patterns to detect mismatches, unexpected nulls, and type drift.

Get Drift Results

GET /pipelines/{name}/drift

Returns drift detection results for all tables in a pipeline.

Response: 200 OK

[
  {
    "table": "orders",
    "has_drift": true,
    "columns": [
      {
        "column": "amount",
        "expected_type": "decimal(10,2)",
        "observed_types": ["string"],
        "mismatch_count": 42,
        "examples": ["\"99.99\""]
      }
    ],
    "events_analyzed": 1500,
    "events_with_drift": 42
  }
]

Get Table Drift

GET /pipelines/{name}/drift/{table}

Returns drift detection results for a specific table.

Response: 200 OK

{
  "table": "orders",
  "has_drift": false,
  "columns": [],
  "events_analyzed": 1000,
  "events_with_drift": 0
}

Errors:

  • 404 Not Found - Table not found or no drift data available

Error Responses

All error responses follow this format:

{
  "error": "Description of the error"
}
Status CodeMeaning
400 Bad RequestInvalid request body or parameters
404 Not FoundResource doesn’t exist
409 ConflictResource already exists
500 Internal Server ErrorUnexpected server error

Pipelines

Each pipeline is created from a single PipelineSpec. The runtime spawns the source, processors, and sinks defined in the spec and coordinates them with batching and checkpointing.

  • 🔄 Live control: pause, resume, or stop pipelines through the REST API without redeploying.
  • 📦 Coordinated delivery: batching and commit policy keep sinks consistent even when multiple outputs are configured.

Lifecycle controls

The REST API addresses pipelines by metadata.name and returns PipeInfo records containing the live spec and status.

  • GET /healthz - liveness probe.
  • GET /readyz - readiness with pipeline states.
  • GET /pipelines - list pipelines.
  • POST /pipelines - create from a full spec.
  • PATCH /pipelines/{name} - merge a partial spec (for example, adjust batch thresholds) and restart the pipeline.
  • POST /pipelines/{name}/pause - pause ingestion and coordination.
  • POST /pipelines/{name}/resume - resume a paused pipeline.
  • POST /pipelines/{name}/stop - stop a running pipeline.

Pausing halts both source ingestion and the coordinator. Resuming re-enables both ends so buffered events can drain cleanly.

Processors

Processors run in the declared order for each batch. The built-in processor type is JavaScript, powered by deno_core.

  • type: javascript
    • id: processor label.
    • inline: JS source. Export a processBatch(events) function that returns the transformed batch.
    • limits (optional): resource guardrails (cpu_ms, mem_mb, timeout_ms).

Batching

The coordinator builds batches using soft thresholds:

  • max_events: flush after this many events.
  • max_bytes: flush after the serialized size reaches this limit.
  • max_ms: flush after this much time has elapsed since the batch started.
  • respect_source_tx: when true, never split a single source transaction across batches.
  • max_inflight: cap the number of batches being processed concurrently.

Commit policy

When multiple sinks are configured, checkpoints can wait for different acknowledgement rules:

  • all: every sink must acknowledge.
  • required (default): only sinks marked required: true must acknowledge; others are best-effort.
  • quorum: checkpoint after at least quorum sinks acknowledge.

Sources

DeltaForge captures database changes through pluggable source connectors. Each source is configured under spec.source with a type field and a config object. Environment variables are expanded before parsing using ${VAR} syntax.

Supported Sources

SourceStatusDescription
mysql✅ ProductionMySQL binlog CDC with GTID support
postgres✅ ProductionPostgreSQL logical replication via pgoutput
turso🔧 BetaTurso/libSQL CDC with multiple modes

Common Behavior

All sources share these characteristics:

  • Checkpointing: Progress is automatically saved and resumed on restart
  • Schema tracking: Table schemas are loaded and fingerprinted for change detection
  • At-least-once delivery: Events may be redelivered after failures; sinks should be idempotent
  • Batching: Events are batched according to the pipeline’s batch configuration
  • Transaction boundaries: respect_source_tx: true (default) keeps source transactions intact

Source Interface

Sources implement a common trait that provides:

#![allow(unused)]
fn main() {
trait Source {
    fn checkpoint_key(&self) -> &str;
    async fn run(&self, tx: Sender<Event>, checkpoint_store: Arc<dyn CheckpointStore>) -> SourceHandle;
}
}

The returned SourceHandle supports pause/resume and graceful cancellation.

Adding Custom Sources

The source interface is pluggable. To add a new source:

  1. Implement the Source trait
  2. Add configuration parsing in deltaforge-config
  3. Register the source type in the pipeline builder

See existing sources for implementation patterns.

MySQL

MySQL source

DeltaForge tails the MySQL binlog to capture row-level changes with automatic checkpointing and schema tracking.

Prerequisites

MySQL Server Configuration

Ensure your MySQL server has binary logging enabled with row-based format:

-- Required server settings (my.cnf or SET GLOBAL)
log_bin = ON
binlog_format = ROW
binlog_row_image = FULL  -- Recommended for complete before-images

If binlog_row_image is not FULL, DeltaForge will warn at startup and before-images on UPDATE/DELETE events may be incomplete.

User Privileges

Create a dedicated replication user with the required grants:

-- Create user with mysql_native_password (required by binlog connector)
CREATE USER 'deltaforge'@'%' IDENTIFIED WITH mysql_native_password BY 'your_password';

-- Replication privileges (required)
GRANT REPLICATION REPLICA, REPLICATION CLIENT ON *.* TO 'deltaforge'@'%';

-- Schema introspection (required for table discovery)
GRANT SELECT, SHOW VIEW ON your_database.* TO 'deltaforge'@'%';

FLUSH PRIVILEGES;

For capturing all databases, grant SELECT on *.* instead.

Configuration

Set spec.source.type to mysql and provide a config object:

FieldTypeRequiredDescription
idstringYesUnique identifier used for checkpoints, server_id derivation, and metrics
dsnstringYesMySQL connection string with replication privileges
tablesarrayNoTable patterns to capture; omit or leave empty to capture all user tables

Table Patterns

The tables field supports flexible pattern matching using SQL LIKE syntax:

tables:
  - shop.orders          # exact match: database "shop", table "orders"
  - shop.order_%         # LIKE pattern: tables starting with "order_" in "shop"
  - analytics.*          # wildcard: all tables in "analytics" database
  - %.audit_log          # cross-database: "audit_log" table in any database
  # omit entirely to capture all user tables (excludes system schemas)

System schemas (mysql, information_schema, performance_schema, sys) are always excluded.

Example

source:
  type: mysql
  config:
    id: orders-mysql
    dsn: ${MYSQL_DSN}
    tables:
      - shop.orders
      - shop.order_items

Resume Behavior

DeltaForge automatically checkpoints progress and resumes from the last position on restart. The resume strategy follows this priority:

  1. GTID - Preferred if the MySQL server has GTID enabled. Provides the most reliable resume across binlog rotations and failovers.
  2. File:position - Used when GTID is not available. Resumes from the exact binlog file and byte offset.
  3. Binlog tail - On first run with no checkpoint, starts from the current end of the binlog (no historical replay).

Checkpoints are stored using the id field as the key.

Server ID Handling

MySQL replication requires each replica to have a unique server_id. DeltaForge derives this automatically from the source id using a CRC32 hash:

server_id = 1 + (CRC32(id) % 4,000,000,000)

When running multiple DeltaForge instances against the same MySQL server, ensure each has a unique id to avoid server_id conflicts.

Schema Tracking

DeltaForge has a built-in schema registry to track table schemas, per source. For MySQL source:

  • Schemas are preloaded at startup by querying INFORMATION_SCHEMA
  • Each schema is fingerprinted using SHA-256 for change detection
  • Events carry schema_version (fingerprint) and schema_sequence (monotonic counter)
  • Schema-to-checkpoint correlation enables reliable replay

Schema changes (DDL) trigger automatic reload of affected table schemas.

Timeouts and Heartbeats

BehaviorValueDescription
Heartbeat interval15sServer sends heartbeat if no events
Read timeout90sMaximum wait for next binlog event
Inactivity timeout60sTriggers reconnect if no data received
Connect timeout30sMaximum time to establish connection

These values are currently fixed. Reconnection uses exponential backoff with jitter.

Event Format

Each captured row change produces an event with:

  • op: insert, update, or delete
  • before: Previous row state (updates and deletes only, requires binlog_row_image = FULL)
  • after: New row state (inserts and updates only)
  • table: Fully qualified table name (database.table)
  • tx_id: GTID if available
  • checkpoint: Binlog position for resume
  • schema_version: Schema fingerprint
  • schema_sequence: Monotonic sequence for schema correlation

Troubleshooting

Connection Issues

If you see authentication errors mentioning mysql_native_password:

ALTER USER 'deltaforge'@'%' IDENTIFIED WITH mysql_native_password BY 'password';

Missing Before-Images

If UPDATE/DELETE events have incomplete before data:

SET GLOBAL binlog_row_image = 'FULL';

Binlog Not Enabled

Check binary logging status:

SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
SHOW BINARY LOG STATUS;  -- or SHOW MASTER STATUS on older versions

Privilege Issues

Verify grants for your user:

SHOW GRANTS FOR 'deltaforge'@'%';

Required grants include REPLICATION REPLICA and REPLICATION CLIENT.

PostgreSQL

PostgreSQL source

DeltaForge captures row-level changes from PostgreSQL using logical replication with the pgoutput plugin.

Prerequisites

PostgreSQL Server Configuration

Enable logical replication in postgresql.conf:

# Required settings
wal_level = logical
max_replication_slots = 10    # At least 1 per DeltaForge pipeline
max_wal_senders = 10          # At least 1 per DeltaForge pipeline

Restart PostgreSQL after changing these settings.

User Privileges

Create a replication user with the required privileges:

-- Create user with replication capability
CREATE ROLE deltaforge WITH LOGIN REPLICATION PASSWORD 'your_password';

-- Grant connect access
GRANT CONNECT ON DATABASE your_database TO deltaforge;

-- Grant schema usage and table access for schema introspection
GRANT USAGE ON SCHEMA public TO deltaforge;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO deltaforge;

-- For automatic publication/slot creation (optional)
-- If you prefer manual setup, skip this and create them yourself
ALTER ROLE deltaforge SUPERUSER;  -- Or use manual setup below

pg_hba.conf

Ensure your pg_hba.conf allows replication connections:

# TYPE  DATABASE        USER            ADDRESS                 METHOD
host    replication     deltaforge      0.0.0.0/0               scram-sha-256
host    your_database   deltaforge      0.0.0.0/0               scram-sha-256

Replication Slot and Publication

DeltaForge can automatically create the replication slot and publication on first run. Alternatively, create them manually:

-- Create publication for specific tables
CREATE PUBLICATION my_pub FOR TABLE public.orders, public.order_items;

-- Or for all tables
CREATE PUBLICATION my_pub FOR ALL TABLES;

-- Create replication slot
SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput');

Replica Identity

For complete before-images on UPDATE and DELETE operations, set tables to REPLICA IDENTITY FULL:

ALTER TABLE public.orders REPLICA IDENTITY FULL;
ALTER TABLE public.order_items REPLICA IDENTITY FULL;

Without this setting:

  • FULL: Complete row data in before-images
  • DEFAULT (primary key): Only primary key columns in before-images
  • NOTHING: No before-images at all

DeltaForge warns at startup if tables don’t have REPLICA IDENTITY FULL.

Configuration

Set spec.source.type to postgres and provide a config object:

FieldTypeRequiredDefaultDescription
idstringYesUnique identifier for checkpoints and metrics
dsnstringYesPostgreSQL connection string
slotstringYesReplication slot name
publicationstringYesPublication name
tablesarrayYesTable patterns to capture
start_positionstring/objectNoearliestWhere to start when no checkpoint exists

DSN Formats

DeltaForge accepts both URL-style and key=value DSN formats:

# URL style
dsn: "postgres://user:pass@localhost:5432/mydb"

# Key=value style
dsn: "host=localhost port=5432 user=deltaforge password=pass dbname=mydb"

Table Patterns

The tables field supports flexible pattern matching:

tables:
  - public.orders          # exact match: schema "public", table "orders"
  - public.order_%         # LIKE pattern: tables starting with "order_"
  - myschema.*             # wildcard: all tables in "myschema"
  - %.audit_log            # cross-schema: "audit_log" table in any schema
  - orders                 # defaults to public schema: "public.orders"

System schemas (pg_catalog, information_schema, pg_toast) are always excluded.

Start Position

Controls where replication begins when no checkpoint exists:

# Start from the earliest available position (slot's restart_lsn)
start_position: earliest

# Start from current WAL position (skip existing data)
start_position: latest

# Start from a specific LSN
start_position:
  lsn: "0/16B6C50"

Example

source:
  type: postgres
  config:
    id: orders-postgres
    dsn: ${POSTGRES_DSN}
    slot: deltaforge_orders
    publication: orders_pub
    tables:
      - public.orders
      - public.order_items
    start_position: earliest

Resume Behavior

DeltaForge checkpoints progress using PostgreSQL’s LSN (Log Sequence Number):

  1. With checkpoint: Resumes from the stored LSN
  2. Without checkpoint: Uses the slot’s confirmed_flush_lsn or restart_lsn
  3. New slot: Starts from pg_current_wal_lsn() or the configured start_position

Checkpoints are stored using the id field as the key.

Type Handling

DeltaForge preserves PostgreSQL’s native type semantics:

PostgreSQL TypeJSON Representation
booleantrue / false
integer, bigintJSON number
real, double precisionJSON number
numericJSON string (preserves precision)
text, varcharJSON string
json, jsonbParsed JSON object/array
bytea{"_base64": "..."}
uuidJSON string
timestamp, date, timeISO 8601 string
Arrays (int[], text[], etc.)JSON array
TOAST unchanged{"_unchanged": true}

Event Format

Each captured row change produces an event with:

  • op: insert, update, delete, or truncate
  • before: Previous row state (updates and deletes, requires appropriate replica identity)
  • after: New row state (inserts and updates)
  • table: Fully qualified table name (schema.table)
  • tx_id: PostgreSQL transaction ID (xid)
  • checkpoint: LSN position for resume
  • schema_version: Schema fingerprint
  • schema_sequence: Monotonic sequence for schema correlation

WAL Management

Logical replication slots prevent WAL segments from being recycled until the consumer confirms receipt. To avoid disk space issues:

  1. Monitor slot lag: Check pg_replication_slots.restart_lsn vs pg_current_wal_lsn()
  2. Set retention limits: Configure max_slot_wal_keep_size (PostgreSQL 13+)
  3. Handle stale slots: Drop unused slots with pg_drop_replication_slot('slot_name')
-- Check slot status and lag
SELECT slot_name, 
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;

Troubleshooting

Connection Issues

If you see authentication errors:

-- Verify user has replication privilege
SELECT rolname, rolreplication FROM pg_roles WHERE rolname = 'deltaforge';

-- Check pg_hba.conf allows replication connections
-- Ensure the line type includes "replication" database

Missing Before-Images

If UPDATE/DELETE events have incomplete before data:

-- Check current replica identity
SELECT relname, relreplident 
FROM pg_class 
WHERE relname = 'your_table';
-- d = default, n = nothing, f = full, i = index

-- Set to FULL for complete before-images
ALTER TABLE your_table REPLICA IDENTITY FULL;

Slot/Publication Not Found

-- List existing publications
SELECT * FROM pg_publication;

-- List existing slots
SELECT * FROM pg_replication_slots;

-- Create if missing
CREATE PUBLICATION my_pub FOR TABLE public.orders;
SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput');

WAL Disk Usage Growing

-- Check slot lag
SELECT slot_name, 
       active,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;

-- If slot is inactive and not needed, drop it
SELECT pg_drop_replication_slot('unused_slot');

Logical Replication Not Enabled

-- Check wal_level
SHOW wal_level;  -- Should be 'logical'

-- If not, update postgresql.conf and restart PostgreSQL
-- wal_level = logical

Turso Source

⚠️ STATUS: EXPERIMENTAL / PAUSED

The Turso source is not yet ready for production use. Native CDC in Turso/libSQL is still evolving and has limitations:

  • CDC is per-connection (only changes from the enabling connection are captured)
  • File locking prevents concurrent access
  • sqld Docker image doesn’t have CDC support yet

This documentation is retained for reference. The code exists but is not officially supported.


The Turso source captures changes from Turso and libSQL databases. It supports multiple CDC modes to work with different database configurations and Turso versions.

CDC Modes

DeltaForge supports four CDC modes for Turso/libSQL:

ModeDescriptionRequirements
nativeUses Turso’s built-in CDC via turso_cdc tableTurso v0.1.2+ with CDC enabled
triggersShadow tables populated by database triggersStandard SQLite/libSQL
pollingTracks changes via rowid comparisonAny SQLite/libSQL (inserts only)
autoAutomatic fallback: native → triggers → pollingAny

Native Mode

Native mode uses Turso’s built-in CDC capabilities. This is the most efficient mode when available.

Requirements:

  • Turso database with CDC enabled
  • Turso server v0.1.2 or later

How it works:

  1. Queries the turso_cdc system table for changes
  2. Uses bin_record_json_object() to extract row data as JSON
  3. Tracks position via change ID in checkpoints

Triggers Mode

Triggers mode uses shadow tables and database triggers to capture changes. This works with standard SQLite and libSQL without requiring native CDC support.

How it works:

  1. Creates shadow tables (_df_cdc_{table}) for each tracked table
  2. Installs INSERT/UPDATE/DELETE triggers that write to shadow tables
  3. Polls shadow tables for new change records
  4. Cleans up processed records periodically

Polling Mode

Polling mode uses rowid tracking to detect new rows. This is the simplest mode but only captures inserts (not updates or deletes).

How it works:

  1. Tracks the maximum rowid seen per table
  2. Queries for rows with rowid greater than last seen
  3. Emits insert events for new rows

Limitations:

  • Only captures INSERT operations
  • Cannot detect UPDATE or DELETE
  • Requires tables to have accessible rowid (not WITHOUT ROWID tables)

Auto Mode

Auto mode tries each CDC mode in order and uses the first one that works:

  1. Try native mode (check for turso_cdc table)
  2. Try triggers mode (check for existing CDC triggers)
  3. Fall back to polling mode

This is useful for deployments where the database capabilities may vary.

Configuration

source:
  type: turso
  config:
    id: turso-main
    url: "libsql://your-db.turso.io"
    auth_token: "${TURSO_AUTH_TOKEN}"
    tables: ["users", "orders", "order_items"]
    cdc_mode: auto
    poll_interval_ms: 1000
    native_cdc:
      level: data

Configuration Options

FieldTypeRequiredDefaultDescription
idstringYesLogical identifier for metrics and logging
urlstringYesDatabase URL (libsql://, http://, or file path)
auth_tokenstringNoAuthentication token for Turso cloud
tablesarrayYesTables to track (supports wildcards)
cdc_modestringNoautoCDC mode: native, triggers, polling, auto
poll_interval_msintegerNo1000Polling interval in milliseconds
native_cdc.levelstringNodataNative CDC level: binlog or data

Table Patterns

The tables field supports patterns:

tables:
  - users              # Exact match
  - order%             # Prefix match (order, orders, order_items)
  - "*"                # All tables (excluding system tables)

System tables and DeltaForge infrastructure tables are automatically excluded:

  • sqlite_* — SQLite system tables
  • _df_* — DeltaForge CDC shadow tables
  • _litestream* — Litestream replication tables
  • _turso* — Turso internal tables
  • turso_cdc — Turso CDC system table

Native CDC Levels

When using native mode, you can choose the CDC level:

LevelDescription
dataOnly row data changes (default, more efficient)
binlogFull binlog-style events with additional metadata

Examples

Local Development

source:
  type: turso
  config:
    id: local-dev
    url: "http://127.0.0.1:8080"
    tables: ["users", "orders"]
    cdc_mode: auto
    poll_interval_ms: 500

Turso Cloud

source:
  type: turso
  config:
    id: turso-prod
    url: "libsql://mydb-myorg.turso.io"
    auth_token: "${TURSO_AUTH_TOKEN}"
    tables: ["*"]
    cdc_mode: native
    poll_interval_ms: 1000

SQLite File (Polling Only)

source:
  type: turso
  config:
    id: sqlite-file
    url: "file:./data/myapp.db"
    tables: ["events", "audit_log"]
    cdc_mode: polling
    poll_interval_ms: 2000

Checkpoints

Turso checkpoints track position differently depending on the CDC mode:

{
  "mode": "native",
  "last_change_id": 12345,
  "table_positions": {}
}

For polling mode, positions are tracked per-table:

{
  "mode": "polling",
  "last_change_id": null,
  "table_positions": {
    "users": 1000,
    "orders": 2500
  }
}

Schema Loading

The Turso source includes a schema loader that:

  • Queries PRAGMA table_info() for column metadata
  • Detects SQLite type affinities (INTEGER, TEXT, REAL, BLOB, NUMERIC)
  • Identifies primary keys and autoincrement columns
  • Handles WITHOUT ROWID tables
  • Checks for existing CDC triggers

Schema information is available via the REST API at /pipelines/{name}/schemas.

Notes

  • WITHOUT ROWID tables: Polling mode cannot track WITHOUT ROWID tables. Use triggers or native mode instead.
  • Type affinity: SQLite uses type affinity rather than strict types. The schema loader maps declared types to SQLite affinities.
  • Trigger cleanup: In triggers mode, processed change records are cleaned up automatically based on checkpoint position.
  • Connection handling: The source maintains a single connection and reconnects automatically on failure.

Sinks

Sinks receive batches from the coordinator after processors run. Each sink lives under spec.sinks and can be marked as required or best-effort via the required flag. Checkpoint behavior is governed by the pipeline’s commit policy.

Current built-in sinks:

SinkDescription
kafkaKafka producer sink
natsNATS JetStream sink
redisRedis stream sink

Multiple sinks in one pipeline

You can combine multiple sinks in one pipeline to fan out events to different destinations. However, multi-sink pipelines introduce complexity that requires careful consideration.

Why multiple sinks are challenging

Different performance characteristics: Kafka might handle 100K events/sec while a downstream HTTP webhook processes 100/sec. The slowest sink becomes the bottleneck for the entire pipeline.

Independent failure modes: Each sink can fail independently. Redis might be healthy while Kafka experiences broker failures. Without proper handling, a single sink failure could block the entire pipeline or cause data loss.

No distributed transactions: DeltaForge cannot atomically commit across heterogeneous systems. If Kafka succeeds but Redis fails mid-batch, you face a choice: retry Redis (risking duplicates in Kafka) or skip Redis (losing data there).

Checkpoint semantics: The checkpoint represents “how far we’ve processed from the source.” With multiple sinks, when is it safe to advance? After one sink succeeds? All of them? A majority?

The required flag

The required flag on each sink determines whether that sink must acknowledge successful delivery before the checkpoint advances:

sinks:
  - type: kafka
    config:
      id: primary-kafka
      required: true    # Must succeed for checkpoint to advance
      
  - type: redis
    config:
      id: cache-redis
      required: false   # Best-effort; failures don't block checkpoint

When required: true (default): The sink must acknowledge the batch before the checkpoint can advance. If this sink fails, the pipeline blocks and retries until it succeeds or the operator intervenes.

When required: false: The sink is best-effort. Failures are logged but don’t prevent the checkpoint from advancing. Use this for non-critical destinations where some data loss is acceptable.

Commit policy

The commit_policy works with the required flag to determine checkpoint behavior:

PolicyBehavior
allEvery sink (regardless of required flag) must acknowledge
requiredOnly sinks with required: true must acknowledge (default)
quorumAt least N sinks must acknowledge
commit_policy:
  mode: required   # Only wait for required sinks

sinks:
  - type: kafka
    config:
      required: true   # Checkpoint waits for this
  - type: redis  
    config:
      required: false  # Checkpoint doesn't wait for this
  - type: nats
    config:
      required: true   # Checkpoint waits for this

Practical patterns

Primary + secondary: One critical sink (Kafka for durability) marked required: true, with secondary sinks (Redis for caching) marked required: false.

Quorum for redundancy: Three sinks with commit_policy.mode: quorum and quorum: 2. Checkpoint advances when any two succeed, providing fault tolerance.

All-or-nothing: Use commit_policy.mode: all when every destination is critical and you need the strongest consistency guarantee (at the cost of availability or rate of delivery).

Redis

Redis sink

The Redis sink publishes events to a Redis Stream for real-time consumption with consumer groups.

When to use Redis

Redis Streams shine when you need low-latency event delivery with simple operational requirements and built-in consumer group support.

Real-world applications

Use CaseDescription
Real-time notificationsPush database changes instantly to WebSocket servers for live UI updates
Cache invalidationTrigger cache eviction when source records change; keep Redis cache consistent
Session synchronizationReplicate user session changes across application instances in real-time
Rate limiting stateStream counter updates for distributed rate limiting decisions
Live dashboardsFeed real-time metrics and KPIs to dashboard backends
Job queuingUse CDC events to trigger background job processing with consumer groups
Feature flagsPropagate feature flag changes instantly across all application instances

Pros and cons

ProsCons
Ultra-low latency - Sub-millisecond publish; ideal for real-time appsMemory-bound - All data in RAM; expensive for high-volume retention
Simple operations - Single binary, minimal configurationLimited retention - Not designed for long-term event storage
Consumer groups - Built-in competing consumers with acknowledgementsDurability trade-offs - AOF/RDB persistence has limitations
Familiar tooling - redis-cli, widespread client library supportSingle-threaded - CPU-bound for very high throughput
Versatile - Combine with caching, pub/sub, and data structuresNo native replay - XRANGE exists but no offset management
Atomic operations - MULTI/EXEC for transactional guaranteesCluster complexity - Sharding requires careful key design

Configuration

sinks:
  - type: redis
    config:
      id: orders-redis
      uri: ${REDIS_URI}
      stream: orders.events
      required: true
FieldTypeDefaultDescription
idstringSink identifier
uristringRedis connection URI
streamstringRedis stream key
requiredbooltrueGates checkpoints

Consuming events

# Create consumer group
redis-cli XGROUP CREATE orders.events mygroup $ MKSTREAM

# Read as consumer (blocking)
redis-cli XREADGROUP GROUP mygroup consumer1 BLOCK 5000 COUNT 10 STREAMS orders.events >

# Acknowledge processing
redis-cli XACK orders.events mygroup 1234567890123-0

# Check pending (unacknowledged) messages
redis-cli XPENDING orders.events mygroup

Simple subscription

# Read latest entries
redis-cli XREAD COUNT 10 STREAMS orders.events 0-0

# Block for new entries
redis-cli XREAD BLOCK 5000 STREAMS orders.events $

Go consumer example

import "github.com/redis/go-redis/v9"

rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})

// Create consumer group (once)
rdb.XGroupCreateMkStream(ctx, "orders.events", "mygroup", "0")

for {
    streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
        Group:    "mygroup",
        Consumer: "worker1",
        Streams:  []string{"orders.events", ">"},
        Count:    10,
        Block:    5 * time.Second,
    }).Result()
    
    if err != nil {
        continue
    }
    
    for _, stream := range streams {
        for _, msg := range stream.Messages {
            var event Event
            json.Unmarshal([]byte(msg.Values["event"].(string)), &event)
            process(event)
            rdb.XAck(ctx, "orders.events", "mygroup", msg.ID)
        }
    }
}

Rust consumer example

#![allow(unused)]
fn main() {
use redis::AsyncCommands;

let client = redis::Client::open("redis://localhost:6379")?;
let mut con = client.get_async_connection().await?;

// Create consumer group
let _: () = redis::cmd("XGROUP")
    .arg("CREATE").arg("orders.events").arg("mygroup").arg("0").arg("MKSTREAM")
    .query_async(&mut con).await.unwrap_or(());

loop {
    let results: Vec<StreamReadReply> = con.xread_options(
        &["orders.events"],
        &[">"],
        &StreamReadOptions::default()
            .group("mygroup", "worker1")
            .count(10)
            .block(5000)
    ).await?;
    
    for stream in results {
        for msg in stream.ids {
            let event: Event = serde_json::from_str(&msg.map["event"])?;
            process(event);
            con.xack("orders.events", "mygroup", &[&msg.id]).await?;
        }
    }
}
}

Python consumer example

import redis
import json

r = redis.Redis.from_url("redis://localhost:6379")

# Create consumer group (once)
try:
    r.xgroup_create("orders.events", "mygroup", id="0", mkstream=True)
except redis.ResponseError:
    pass  # Group already exists

# Consume events
while True:
    events = r.xreadgroup("mygroup", "worker1", {"orders.events": ">"}, count=10, block=5000)
    for stream, messages in events:
        for msg_id, data in messages:
            event = json.loads(data[b"event"])
            process(event)
            r.xack("orders.events", "mygroup", msg_id)

Failure modes

FailureSymptomsDeltaForge behaviorResolution
Server unavailableConnection refusedRetries with backoff; blocks checkpointRestore Redis; check network
Authentication failureNOAUTH / WRONGPASSFails fast, no retryFix auth details in URI
OOM (Out of Memory)OOM command not allowedFails batch; retriesIncrease maxmemory; enable eviction or trim streams
Stream doesn’t existAuto-created by XADDNo failureN/A (XADD creates stream)
Connection timeoutCommand hangsTimeout after configured durationCheck network; increase timeout
Cluster MOVED/ASKRedirect errorsAutomatic redirect (if cluster mode)Ensure cluster client configured
Replication lagWrites to replica failFails with READONLYWrite to master only
Max stream lengthIf MAXLEN enforcedOldest entries trimmedExpected behavior; not a failure
Network partitionIntermittent timeoutsRetries; may have gapsRestore network

Failure scenarios and data guarantees

Redis OOM during batch delivery

  1. DeltaForge sends batch of 100 events via pipeline
  2. 50 events written, Redis hits maxmemory
  3. Pipeline fails atomically (all or nothing per pipeline)
  4. DeltaForge retries entire batch
  5. If OOM persists: batch blocked until memory available
  6. Checkpoint only saved after ALL events acknowledged

DeltaForge crash after XADD, before checkpoint

  1. Batch written to Redis stream successfully
  2. DeltaForge crashes before saving checkpoint
  3. On restart: replays from last checkpoint
  4. Result: Duplicate events in stream (at-least-once)
  5. Consumer must handle idempotently (check event.id)

Redis failover (Sentinel/Cluster)

  1. Master fails, Sentinel promotes replica
  2. In-flight XADD may fail with connection error
  3. DeltaForge reconnects to new master
  4. Retries failed batch
  5. Possible duplicates if original write succeeded

Handling duplicates in consumers

# Idempotent consumer using event ID
processed_ids = set()  # Or use Redis SET for distributed dedup

for msg_id, data in messages:
    event = json.loads(data[b"event"])
    event_id = event["id"]
    
    if event_id in processed_ids:
        r.xack("orders.events", "mygroup", msg_id)
        continue  # Skip duplicate
    
    process(event)
    processed_ids.add(event_id)
    r.xack("orders.events", "mygroup", msg_id)

Monitoring

DeltaForge exposes these metrics for Redis sink monitoring:

# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink}      # Events delivered
deltaforge_sink_batch_total{pipeline,sink}       # Batches delivered  
deltaforge_sink_latency_seconds{pipeline,sink}   # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"}  # Stage timing

For Redis server visibility, use Redis’s built-in monitoring:

# Monitor commands in real-time
redis-cli MONITOR

# Get server stats
redis-cli INFO stats

# Check memory usage
redis-cli INFO memory

# Stream-specific info
redis-cli XINFO STREAM orders.events
redis-cli XINFO GROUPS orders.events

Stream management

# Check stream length
redis-cli XLEN orders.events

# Trim to last 10000 entries (approximate)
redis-cli XTRIM orders.events MAXLEN ~ 10000

# Trim to exact length
redis-cli XTRIM orders.events MAXLEN 10000

# View consumer group info
redis-cli XINFO GROUPS orders.events

# Check pending messages
redis-cli XPENDING orders.events mygroup

# Claim stuck messages (after 60 seconds)
redis-cli XCLAIM orders.events mygroup worker2 60000 <message-id>

# Delete processed messages (careful!)
redis-cli XDEL orders.events <message-id>

Notes

  • Redis Streams provide at-least-once delivery with consumer group acknowledgements
  • Use MAXLEN ~ trimming to prevent unbounded memory growth (approximate is faster)
  • Consider Redis Cluster for horizontal scaling with multiple streams
  • Combine with Redis pub/sub for fan-out to ephemeral subscribers
  • For durability, enable AOF persistence with appendfsync everysec or always
  • Monitor memory usage closely; Redis will reject writes when maxmemory is reached

Kafka

Kafka sink

The Kafka sink publishes batches to a Kafka topic using rdkafka.

When to use Kafka

Kafka excels as the backbone for event-driven architectures where durability, ordering, and replay capabilities are critical.

Real-world applications

Use CaseDescription
Event sourcingStore all state changes as an immutable log; rebuild application state by replaying events
Microservices integrationDecouple services with async messaging; each service consumes relevant topics
Real-time analytics pipelinesFeed CDC events to Spark, Flink, or ksqlDB for streaming transformations
Data lake ingestionStream database changes to S3/HDFS via Kafka Connect for analytics and ML
Audit loggingCapture every database mutation for compliance, debugging, and forensics
Cross-datacenter replicationUse MirrorMaker 2 to replicate topics across regions for DR

Pros and cons

ProsCons
Durability - Configurable replication ensures no data lossOperational complexity - Requires ZooKeeper/KRaft, careful tuning
Ordering guarantees - Per-partition ordering with consumer groupsLatency - Batching and replication add milliseconds of delay
Replay capability - Configurable retention allows reprocessingResource intensive - High disk I/O and memory requirements
Ecosystem - Connect, Streams, Schema Registry, ksqlDBLearning curve - Partitioning, offsets, consumer groups to master
Throughput - Handles millions of messages per secondCold start - Cluster setup and topic configuration overhead
Exactly-once semantics - Transactions for critical workloadsCost - Managed services can be expensive at scale

Configuration

sinks:
  - type: kafka
    config:
      id: orders-kafka
      brokers: ${KAFKA_BROKERS}
      topic: orders
      required: true
      exactly_once: false
      client_conf:
        message.timeout.ms: "5000"
        acks: "all"
FieldTypeDefaultDescription
idstringSink identifier
brokersstringComma-separated broker list
topicstringDestination topic
requiredbooltrueGates checkpoints
exactly_onceboolfalseEnable EOS semantics
client_confmap{}librdkafka overrides
SettingRecommendedDescription
acksallWait for all replicas for durability
message.timeout.ms30000Total time to deliver a message
retries2147483647Retry indefinitely (with backoff)
enable.idempotencetruePrevent duplicates on retry
compression.typelz4Balance between CPU and bandwidth

Consuming events

Kafka CLI

# Consume from beginning
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --from-beginning

# Consume with consumer group
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --group deltaforge-consumers

Go consumer example

config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest

group, _ := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)

for {
    err := group.Consume(ctx, []string{"orders"}, &handler{})
    if err != nil {
        log.Printf("Consumer error: %v", err)
    }
}

type handler struct{}

func (h *handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        var event Event
        json.Unmarshal(msg.Value, &event)
        process(event)
        session.MarkMessage(msg, "")
    }
    return nil
}

Rust consumer example

#![allow(unused)]
fn main() {
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;

let consumer: StreamConsumer = ClientConfig::new()
    .set("bootstrap.servers", "localhost:9092")
    .set("group.id", "my-group")
    .set("auto.offset.reset", "earliest")
    .create()?;

consumer.subscribe(&["orders"])?;

loop {
    match consumer.recv().await {
        Ok(msg) => {
            let payload = msg.payload_view::<str>().unwrap()?;
            let event: Event = serde_json::from_str(payload)?;
            process(event);
            consumer.commit_message(&msg, CommitMode::Async)?;
        }
        Err(e) => eprintln!("Kafka error: {}", e),
    }
}
}

Python consumer example

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    event = message.value
    process(event)
    consumer.commit()

Failure modes

FailureSymptomsDeltaForge behaviorResolution
Broker unavailableConnection refused, timeoutRetries with backoff; blocks checkpointRestore broker; check network
Topic not foundUnknownTopicOrPartitionFails batch; retriesCreate topic or enable auto-create
Authentication failureSaslAuthenticationFailedFails fast, no retryFix credentials in config
Authorization failureTopicAuthorizationFailedFails fast, no retryGrant ACLs for producer
Message too largeMessageSizeTooLargeFails message permanentlyIncrease message.max.bytes or filter large events
Leader electionNotLeaderForPartitionAutomatic retry after metadata refreshWait for election; usually transient
Disk fullKafkaStorageExceptionRetries indefinitelyAdd disk space; purge old segments
Network partitionTimeouts, partial failuresRetries; may produce duplicatesRestore network; idempotence prevents dups
SSL/TLS errorsHandshake failuresFails fastFix certificates, verify truststore

Failure scenarios and data guarantees

Broker failure during batch delivery

  1. DeltaForge sends batch of 100 events
  2. 50 events delivered, broker crashes
  3. rdkafka detects failure, retries remaining 50
  4. If idempotence enabled: no duplicates
  5. If not: possible duplicates of events near failure point
  6. Checkpoint only saved after ALL events acknowledged

DeltaForge crash after Kafka ack, before checkpoint

  1. Batch delivered to Kafka successfully
  2. DeltaForge crashes before saving checkpoint
  3. On restart: replays from last checkpoint
  4. Result: Duplicate events in Kafka (at-least-once)
  5. Consumer must handle idempotently

Monitoring recommendations

DeltaForge exposes these metrics for Kafka sink monitoring:

# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink}      # Events delivered
deltaforge_sink_batch_total{pipeline,sink}       # Batches delivered
deltaforge_sink_latency_seconds{pipeline,sink}   # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"}  # Stage timing

For deeper Kafka broker visibility, monitor your Kafka cluster directly:

  • Broker metrics via JMX or Kafka’s built-in metrics
  • Consumer lag via kafka-consumer-groups.sh
  • Topic throughput via broker dashboards

Note: Internal rdkafka producer statistics (message queues, broker RTT, etc.) are not currently exposed by DeltaForge. This is a potential future enhancement.

Notes

  • Combine Kafka with other sinks to fan out data; use commit policy to control checkpoint behavior
  • For exactly-once semantics, ensure your Kafka cluster supports transactions (2.5+)
  • Adjust client_conf for durability (acks=all) or performance based on your requirements
  • Consider partitioning strategy for ordering guarantees within partitions
  • Enable enable.idempotence=true to prevent duplicates during retries

NATS

NATS sink

The NATS sink publishes events to a NATS JetStream stream for durable, at-least-once delivery.

When to use NATS

NATS JetStream is ideal when you need a lightweight, high-performance messaging system with persistence, without the operational overhead of Kafka.

Real-world applications

Use CaseDescription
Edge computingLightweight footprint perfect for IoT gateways and edge nodes syncing to cloud
Microservices meshRequest-reply and pub/sub patterns with automatic load balancing
Multi-cloud syncLeaf nodes and superclusters for seamless cross-cloud data replication
Kubernetes-native eventsNATS Operator for cloud-native deployment; sidecar-friendly architecture
Real-time gamingLow-latency state synchronization for multiplayer game servers
Financial data feedsStream market data with subject-based routing and wildcards
Command and controlDistribute configuration changes and commands to distributed systems

Pros and cons

ProsCons
Lightweight - Single binary ~20MB; minimal resource footprintSmaller ecosystem - Fewer connectors and integrations than Kafka
Simple operations - Zero external dependencies; easy clusteringYounger persistence - JetStream newer than Kafka’s battle-tested log
Low latency - Sub-millisecond message deliveryCommunity size - Smaller community than Kafka or Redis
Flexible patterns - Pub/sub, queues, request-reply, streamsTooling maturity - Fewer monitoring and management tools
Subject hierarchy - Powerful wildcard routing (orders.>, *.events)Learning curve - JetStream concepts differ from traditional queues
Multi-tenancy - Built-in accounts and security isolationLess enterprise adoption - Fewer case studies at massive scale
Cloud-native - Designed for Kubernetes and distributed systems

Configuration

sinks:
  - type: nats
    config:
      id: orders-nats
      url: ${NATS_URL}
      subject: orders.events
      stream: ORDERS
      required: true
      send_timeout_secs: 5
      batch_timeout_secs: 30
FieldTypeDefaultDescription
idstringSink identifier
urlstringNATS server URL
subjectstringSubject to publish to
streamstringJetStream stream name
requiredbooltrueGates checkpoints
send_timeout_secsint5Publish timeout
batch_timeout_secsint30Batch timeout
connect_timeout_secsint10Connection timeout

Authentication options

# Credentials file
credentials_file: /etc/nats/creds/user.creds

# Username/password
username: ${NATS_USER}
password: ${NATS_PASSWORD}

# Token
token: ${NATS_TOKEN}

JetStream setup

Before using the NATS sink with JetStream, create a stream that captures your subject:

# Using NATS CLI
nats stream add ORDERS \
  --subjects "orders.>" \
  --retention limits \
  --storage file \
  --replicas 3 \
  --max-age 7d

# Verify stream
nats stream info ORDERS

Consuming events

NATS CLI

# Subscribe to subject (ephemeral)
nats sub "orders.>"

# Create durable consumer
nats consumer add ORDERS orders-processor \
  --pull \
  --ack explicit \
  --deliver all \
  --max-deliver 3 \
  --filter "orders.events"

# Consume messages
nats consumer next ORDERS orders-processor --count 10

Go consumer example

nc, _ := nats.Connect("nats://localhost:4222")
js, _ := nc.JetStream()

// Create or bind to consumer
sub, _ := js.PullSubscribe("orders.events", "orders-processor",
    nats.Durable("orders-processor"),
    nats.AckExplicit(),
)

for {
    msgs, _ := sub.Fetch(10, nats.MaxWait(5*time.Second))
    for _, msg := range msgs {
        var event Event
        json.Unmarshal(msg.Data, &event)
        process(event)
        msg.Ack()
    }
}

Rust consumer example

#![allow(unused)]
fn main() {
use async_nats::jetstream;

let client = async_nats::connect("nats://localhost:4222").await?;
let js = jetstream::new(client);

let stream = js.get_stream("ORDERS").await?;
let consumer = stream.get_consumer("orders-processor").await?;

let mut messages = consumer.messages().await?;
while let Some(msg) = messages.next().await {
    let msg = msg?;
    let event: Event = serde_json::from_slice(&msg.payload)?;
    process(event);
    msg.ack().await?;
}
}

Monitoring

DeltaForge exposes these metrics for NATS sink monitoring:

# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink}      # Events delivered
deltaforge_sink_batch_total{pipeline,sink}       # Batches delivered
deltaforge_sink_latency_seconds{pipeline,sink}   # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"}  # Stage timing

For NATS server visibility, use the NATS CLI or monitoring endpoint:

# Server info
nats server info

# JetStream account info
nats account info

# Stream statistics
nats stream info ORDERS

# Consumer statistics  
nats consumer info ORDERS orders-processor

# Real-time event monitoring
nats events

NATS also exposes a monitoring endpoint (default :8222) with JSON stats:

  • http://localhost:8222/varz - General server stats
  • http://localhost:8222/jsz - JetStream stats
  • http://localhost:8222/connz - Connection stats

Subject design patterns

PatternExampleUse Case
Hierarchicalorders.us.createdRegional routing
Wildcard singleorders.*.createdAny region, specific event
Wildcard multiorders.>All order events
Versionedv1.orders.eventsAPI versioning

Failure modes

FailureSymptomsDeltaForge behaviorResolution
Server unavailableConnection refusedRetries with backoff; blocks checkpointRestore NATS; check network
Stream not foundstream not found errorFails batch; no retryCreate stream or remove stream config
Authentication failureauthorization violationFails fast, no retryFix credentials
Subject mismatchno responders (core NATS)Fails if no subscribersAdd subscribers or use JetStream
JetStream disabledjetstream not enabledFails fastEnable JetStream on server
Storage fullinsufficient resourcesRetries; eventually failsAdd storage; adjust retention
Message too largemessage size exceeds maximumFails message permanentlyIncrease max_payload or filter large events
Cluster partitionIntermittent failuresRetries with backoffRestore network; wait for quorum
Slow consumerPublish backpressureSlows down; may timeoutScale consumers; increase buffer
TLS errorsHandshake failuresFails fastFix certificates

Failure scenarios and data guarantees

NATS server restart during batch delivery

  1. DeltaForge sends batch of 100 events
  2. 50 events published, server restarts
  3. async_nats detects disconnect, starts reconnecting
  4. After reconnect, DeltaForge retries remaining 50
  5. JetStream deduplication prevents duplicates (if enabled)
  6. Checkpoint only saved after ALL events acknowledged

DeltaForge crash after JetStream ack, before checkpoint

  1. Batch published to JetStream successfully
  2. DeltaForge crashes before saving checkpoint
  3. On restart: replays from last checkpoint
  4. Result: Duplicate events in stream (at-least-once)
  5. Consumer must handle idempotently (check event.id)

Stream storage exhausted

  1. JetStream stream hits max_bytes or max_msgs limit
  2. With discard: old → oldest messages removed, publish succeeds
  3. With discard: new → publish rejected
  4. DeltaForge retries on rejection
  5. Resolution: Increase limits or enable discard: old

JetStream acknowledgement levels

# Stream configuration affects durability
nats stream add ORDERS \
  --replicas 3 \           # R=3 for production
  --retention limits \     # or 'workqueue' for single consumer
  --discard old \          # Remove oldest when full
  --max-age 7d \           # Auto-expire after 7 days
  --storage file           # Persistent (vs memory)
ReplicasGuaranteeUse Case
R=1Single node; lost if node failsDevelopment, non-critical
R=3Survives 1 node failureProduction default
R=5Survives 2 node failuresCritical data

Handling duplicates in consumers

// Use event ID for idempotency
processedIDs := make(map[string]bool)  // Or use Redis/DB

for _, msg := range msgs {
    var event Event
    json.Unmarshal(msg.Data, &event)
    
    if processedIDs[event.ID] {
        msg.Ack()  // Already processed
        continue
    }
    
    if err := process(event); err == nil {
        processedIDs[event.ID] = true
    }
    msg.Ack()
}

Notes

  • When stream is specified, the sink verifies the stream exists at connection time
  • Without stream, events are published to core NATS (no persistence guarantees)
  • Connection pooling ensures efficient reuse across batches
  • Use replicated streams (--replicas 3) for production durability
  • Combine with other sinks to fan out data; use commit policy to control checkpoint behavior
  • JetStream provides exactly-once semantics when combined with message deduplication

Architecture

This document describes DeltaForge’s internal architecture, design decisions, and how the major components interact.

Design Principles

Source-Owned Semantics

DeltaForge avoids imposing a universal data model on all sources. Instead, each database source defines and owns its schema semantics:

  • MySQL captures MySQL-specific types, collations, and engine information
  • Future sources (PostgreSQL, MongoDB, ClickHouse, Turso) will capture their native semantics

This approach means downstream consumers receive schemas that accurately reflect the source database rather than a lowest-common-denominator normalization.

Delivery Guarantees First

The checkpoint system is designed around a single invariant:

Checkpoints are only saved after events have been successfully delivered.

This ordering guarantees at-least-once delivery. A crash between checkpoint and delivery would lose events; DeltaForge prevents this by always checkpointing after sink acknowledgment.

Configuration Over Code

Pipelines are defined declaratively in YAML. This enables:

  • Version-controlled pipeline definitions
  • Environment-specific configuration via variable expansion
  • Rapid iteration without recompilation

Component Overview

┌─────────────────────────────────────────────────────────────────┐
│                        DeltaForge Runtime                       │
├─────────────┬─────────────┬─────────────┬─────────────┬─────────┤
│   Sources   │   Schema    │ Coordinator │    Sinks    │ Control │
│             │  Registry   │  + Batch    │             │  Plane  │
├─────────────┼─────────────┼─────────────┼─────────────┼─────────┤
│ MySQL       │ InMemory    │ Batching    │ Kafka       │ REST API│
│ PostgreSQL  │ Registry    │ Commit      │ Redis       │ Metrics │
│ (future)    │             │ Policy      │ (future)    │ Health  │
└─────────────┴─────────────┴─────────────┴─────────────┴─────────┘
                               │
                    ┌──────────┴──────────┐
                    │  Checkpoint Store   │
                    │  (File/SQLite/Mem)  │
                    └─────────────────────┘

Data Flow

Event Lifecycle

1. Source reads from database log (binlog/WAL)
        │
        ▼
2. Schema loader maps table_id to schema
        │
        ▼
3. Event constructed with before/after images
        │
        ▼
4. Event sent to coordinator via channel
        │
        ▼
5. Coordinator batches events
        │
        ▼
6. Processors transform batch (JavaScript)
        │
        ▼
7. Sinks deliver batch concurrently
        │
        ▼
8. Commit policy evaluated
        │
        ▼
9. Checkpoint saved (if policy satisfied)

Event Structure

Every CDC event shares a common structure:

#![allow(unused)]
fn main() {
pub struct Event {
    pub source_id: String,          // Source identifier
    pub database: String,           // Database name
    pub table: String,              // Table name
    pub op: Op,                     // Insert, Update, Delete, Ddl
    pub tx_id: Option<u64>,         // Source transaction ID
    pub before: Option<Value>,      // Previous row state
    pub after: Option<Value>,       // New row state
    pub schema_version: Option<String>,  // Schema fingerprint
    pub schema_sequence: Option<u64>,    // For replay lookups
    pub ddl: Option<Value>,         // DDL payload if op == Ddl
    pub timestamp: DateTime<Utc>,   // Event timestamp
    pub checkpoint: Option<CheckpointMeta>,  // Position info
    pub size_bytes: usize,          // For batching
}
}

Schema Registry

Role

The schema registry serves three purposes:

  1. Map table IDs to schemas: Binlog events reference tables by ID; the registry resolves these to full schema metadata
  2. Detect schema changes: Fingerprint comparison identifies when DDL has modified a table
  3. Enable replay: Sequence numbers correlate events with the schema active when they were produced

Schema Registration Flow

1. Schema loader fetches from INFORMATION_SCHEMA
        │
        ▼
2. Compute fingerprint (SHA-256 of structure)
        │
        ▼
3. Check registry for existing schema with same fingerprint
        │
        ├── Found: Return existing version (idempotent)
        │
        └── Not found: Allocate new version number
                │
                ▼
4. Store with: version, fingerprint, JSON, timestamp, sequence, checkpoint

Sequence Numbers

The registry maintains a global monotonic counter. Each schema version receives a sequence number at registration. Events carry this sequence, enabling accurate schema lookup during replay:

Timeline:
─────────────────────────────────────────────────────────────►
     │              │                    │
Schema v1      Schema v2           Schema v3
(seq=1)        (seq=15)            (seq=42)
     │              │                    │
     └──events 1-14─┘──events 15-41─────┘──events 42+──►

Replay at seq=20: Use schema v2 (registered at seq=15, before seq=42)

Checkpoint Store

Timing Guarantee

The checkpoint is saved only after sinks acknowledge delivery:

┌────────┐   events   ┌────────┐   ack    ┌────────────┐
│ Source │ ─────────▶ │  Sink  │ ───────▶ │ Checkpoint │
└────────┘            └────────┘          │   Store    │
                                          └────────────┘

If the process crashes after sending to sink but before checkpoint, events will be replayed. This is the “at-least-once” guarantee - duplicates are possible, but loss is not.

Storage Backends

BackendVersioningPersistenceUse Case
FileCheckpointStoreNoYesProduction (simple)
SqliteCheckpointStoreYesYesDevelopment, debugging
MemCheckpointStoreNoNoTesting

Checkpoint-Schema Correlation

When registering schemas, the current checkpoint can be attached:

#![allow(unused)]
fn main() {
registry.register_with_checkpoint(
    tenant, db, table,
    &fingerprint,
    &schema_json,
    Some(&checkpoint_bytes),  // Current binlog position
).await?;
}

This creates a link between schema versions and source positions, enabling coordinated rollback and point-in-time schema queries.

Coordinator

The coordinator orchestrates event flow between source and sinks:

Batching

Events are accumulated until a threshold triggers flush:

  • max_events: Event count limit
  • max_bytes: Total serialized size limit
  • max_ms: Time since batch started
  • respect_source_tx: Never split source transactions

Commit Policy

When multiple sinks are configured, the commit policy determines when the checkpoint advances:

#![allow(unused)]
fn main() {
match policy {
    All => required_acks == total_sinks,
    Required => required_acks == sinks.filter(|s| s.required).count(),
    Quorum(n) => required_acks >= n,
}
}

Processor Pipeline

Processors run in declared order, transforming batches:

events ──▶ Processor 1 ──▶ Processor 2 ──▶ ... ──▶ transformed events

Each processor can filter, transform, or enrich events. The JavaScript processor uses deno_core for sandboxed execution.

Hot Paths

Critical performance paths have been optimized:

  1. Event construction - Minimal allocations, reuse buffers
  2. Checkpoint serialization - Opaque bytes avoid repeated JSON encoding
  3. Sink delivery - Batch operations reduce round trips
  4. Schema lookup - In-memory cache with stable fingerprints

Benchmarking

Performance is tracked via:

  • Micro-benchmarks for specific operations
  • End-to-end benchmarks using the Coordinator component
  • Regression detection in CI

Future Architecture

Planned enhancements:

  • Persistent schema registry: SQLite backend initially, mirroring the checkpoint storage pattern
  • Production storage backends: PostgreSQL, S3/GCS for cloud-native and HA deployments
  • Event store: Time-based replay and schema evolution
  • Distributed coordination: Leader election for HA deployments
  • Additional sources: Turso/SQLite, ClickHouse, MongoDB

Checkpoints

Checkpoints record pipeline progress so ingestion can resume from the last successfully delivered position. DeltaForge’s checkpoint system is designed to guarantee at-least-once delivery by carefully coordinating when checkpoints are saved relative to event delivery.

Core Guarantee: At-Least-Once Delivery

The fundamental rule of DeltaForge checkpointing:

Checkpoints are only saved after events have been successfully delivered to sinks.

This ordering is critical. If a checkpoint were saved before events were delivered, a crash between checkpoint save and delivery would cause those events to be lost - the pipeline would resume from a position past events that were never delivered.

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Source    │────▶│  Processor  │────▶│    Sink     │────▶│ Checkpoint  │
│   (read)    │     │ (transform) │     │  (deliver)  │     │   (save)    │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘
                                               │
                                               ▼
                                        Sink acknowledges
                                        successful delivery
                                               │
                                               ▼
                                        Then checkpoint
                                        is saved

What This Means in Practice

  • On clean shutdown: All buffered events are flushed and checkpointed
  • On crash: Events since the last checkpoint are replayed (hence “at-least-once”)
  • Duplicate handling: Consumers should be idempotent or use deduplication

Checkpoint Storage

Storage Backends

DeltaForge supports pluggable checkpoint storage:

BackendDescriptionUse Case
FileCheckpointStoreJSON file on diskDevelopment, simple deployments
MemCheckpointStoreIn-memory (ephemeral)Testing
SqliteCheckpointStoreSQLite with versioningSingle-instance production

The default stores checkpoints to ./data/df_checkpoints.json.

For HA deployments requiring shared state across instances, additional backends (PostgreSQL, S3/GCS) are planned but not yet implemented.

Storage Interface

All backends implement the CheckpointStore trait:

#![allow(unused)]
fn main() {
#[async_trait]
pub trait CheckpointStore: Send + Sync {
    /// Get raw checkpoint bytes
    async fn get_raw(&self, source_id: &str) -> CheckpointResult<Option<Vec<u8>>>;
    
    /// Store raw checkpoint bytes
    async fn put_raw(&self, source_id: &str, bytes: &[u8]) -> CheckpointResult<()>;
    
    /// Delete checkpoint
    async fn delete(&self, source_id: &str) -> CheckpointResult<bool>;
    
    /// List all checkpoint keys
    async fn list(&self) -> CheckpointResult<Vec<String>>;
    
    /// Whether this backend supports versioning
    fn supports_versioning(&self) -> bool;
}
}

Typed Access

The CheckpointStoreExt trait provides convenient typed access:

#![allow(unused)]
fn main() {
// Store typed checkpoint (automatically serialized to JSON)
store.put("pipeline-1", MySqlCheckpoint { 
    file: "binlog.000042".into(),
    pos: 12345,
    gtid_set: None,
}).await?;

// Retrieve typed checkpoint
let cp: Option<MySqlCheckpoint> = store.get("pipeline-1").await?;
}

Checkpoint Contents

MySQL Checkpoints

MySQL checkpoints track binlog position:

#![allow(unused)]
fn main() {
pub struct MySqlCheckpoint {
    pub file: String,        // e.g., "binlog.000042"
    pub pos: u64,            // Byte position in binlog file
    pub gtid_set: Option<String>,  // GTID set if enabled
}
}

The checkpoint is taken from the last event in a successfully delivered batch, ensuring resumption starts exactly where delivery left off.

Checkpoint in Events

Events carry checkpoint metadata for end-to-end tracking:

#![allow(unused)]
fn main() {
pub struct Event {
    // ... other fields ...
    
    /// Checkpoint info from source
    pub checkpoint: Option<CheckpointMeta>,
}

pub enum CheckpointMeta {
    Opaque(Arc<[u8]>),  // Serialized source-specific checkpoint
}
}

Using Arc<[u8]> allows zero-copy sharing of checkpoint data across the pipeline without repeated allocations.

Commit Policy

When multiple sinks are configured, the commit policy determines when checkpoints advance:

PolicyBehavior
allEvery sink must acknowledge
requiredOnly required: true sinks must acknowledge
quorumAt least N sinks must acknowledge

Configuration

spec:
  batch:
    commit_policy: required  # or: all, quorum
    quorum: 2                # for quorum policy

  sinks:
    - type: kafka
      required: true  # Must succeed for checkpoint
      config: { ... }
    
    - type: redis
      required: false  # Best-effort, doesn't block checkpoint
      config: { ... }

Commit Logic

The coordinator tracks acknowledgments from each sink and only advances the checkpoint when the policy is satisfied:

#![allow(unused)]
fn main() {
// Simplified commit logic
let required_acks = sinks.iter().filter(|s| s.required).count();
let actual_acks = batch.acknowledgments.iter().filter(|a| a.success).count();

if actual_acks >= required_acks {
    checkpoint_store.put(&key, batch.last_checkpoint).await?;
} else {
    warn!("commit policy not satisfied; checkpoint NOT advanced");
}
}

Batching and Checkpoints

Checkpoints are saved at batch boundaries, not per-event. This provides:

  • Efficiency: Fewer checkpoint writes
  • Atomicity: Batch success or failure is all-or-nothing
  • Transaction preservation: respect_source_tx: true keeps source transactions in single batches

Batch Configuration

spec:
  batch:
    max_events: 1000      # Flush after N events
    max_bytes: 8388608    # Flush after 8MB
    max_ms: 200           # Flush after 200ms
    respect_source_tx: true  # Never split source transactions
    max_inflight: 1       # Concurrent batches in flight

Checkpoint Timing in Batches

Within a batch:

  1. Events are collected until a threshold is reached
  2. Processors transform the batch
  3. Sinks receive and deliver events
  4. Sinks acknowledge success/failure
  5. Commit policy is evaluated
  6. If satisfied, checkpoint advances to the last event’s position

Versioned Checkpoints

The SQLite backend supports checkpoint versioning for:

  • Rollback: Return to a previous checkpoint position
  • Audit: Track checkpoint progression over time
  • Debugging: Understand checkpoint history during incident analysis

Version Operations

#![allow(unused)]
fn main() {
// Store with versioning
let version = store.put_raw_versioned("pipeline-1", bytes).await?;

// Get specific version
let old_bytes = store.get_version_raw("pipeline-1", version - 1).await?;

// List all versions
let versions = store.list_versions("pipeline-1").await?;

// Rollback to previous version
store.rollback("pipeline-1", target_version).await?;
}

Version Metadata

#![allow(unused)]
fn main() {
pub struct VersionInfo {
    pub version: u64,
    pub created_at: DateTime<Utc>,
    pub size_bytes: usize,
}
}

Schema-Checkpoint Correlation

For replay scenarios, DeltaForge correlates schemas with checkpoints. When a schema is registered, it can optionally include the current checkpoint position:

#![allow(unused)]
fn main() {
registry.register_with_checkpoint(
    tenant, db, table,
    &fingerprint,
    &schema_json,
    Some(checkpoint_bytes),  // Binlog position when schema was observed
).await?;
}

This enables:

  • Accurate replay: Events are interpreted with the schema active at their checkpoint position
  • Schema time-travel: Find what schema was active at any checkpoint
  • Coordinated rollback: Roll back both checkpoint and schema state together

Operational Considerations

Clean Shutdown

Before maintenance, cleanly stop pipelines to flush checkpoints:

# Pause ingestion
curl -X POST http://localhost:8080/pipelines/{name}/pause

# Wait for in-flight batches to complete
sleep 5

# Stop pipeline
curl -X POST http://localhost:8080/pipelines/{name}/stop

Checkpoint Inspection

View current checkpoint state:

# List all checkpoints
curl http://localhost:8080/checkpoints

# Get specific pipeline checkpoint
curl http://localhost:8080/checkpoints/{pipeline-name}

Monitoring

Key metrics to monitor:

  • deltaforge_checkpoint_lag_seconds: Time since last checkpoint
  • deltaforge_checkpoint_bytes: Size of last checkpoint
  • deltaforge_batch_commit_total: Successful batch commits
  • deltaforge_batch_commit_failed_total: Failed commits (policy not satisfied)

Recovery Scenarios

ScenarioBehavior
Process crashResume from last checkpoint, replay events
Network partition (sink unreachable)Retry delivery, checkpoint doesn’t advance
Corrupt checkpoint fileManual intervention required
Source unavailable at checkpointRetry connection with backoff

Best Practices

  1. Use durable storage for production checkpoint backends (not in-memory)
  2. Monitor checkpoint lag to detect stuck pipelines
  3. Configure appropriate batch sizes — smaller batches mean more frequent checkpoints but more overhead
  4. Set required: true only on sinks that must succeed for correctness
  5. Test recovery by killing pipelines and verifying no events are lost
  6. Back up checkpoint files if using file-based storage

Future Enhancements

Planned checkpoint improvements:

  • PostgreSQL backend for HA deployments with shared state
  • S3/GCS backends for cloud-native deployments
  • Distributed coordination for multi-instance leader election
  • Checkpoint compression for large state
  • Point-in-time recovery with event store integration

Schema Registry

DeltaForge’s schema registry tracks table schemas across time, enabling accurate event interpretation during replay and providing change detection for DDL operations.

Design Philosophy: Source-Owned Schemas

DeltaForge takes a fundamentally different approach to schema handling than many CDC tools. Rather than normalizing all database schemas into a universal type system, each source defines and owns its schema semantics.

This means:

  • MySQL schemas capture MySQL semantics - column types like bigint(20) unsigned, varchar(255), and json are preserved exactly as MySQL defines them
  • PostgreSQL schemas capture PostgreSQL semantics - arrays, custom types, and pg-specific attributes remain intact
  • No lossy normalization - you don’t lose precision or database-specific information by forcing everything into a common format

This design avoids the maintenance burden of keeping a universal type system synchronized across all databases, and it ensures that downstream consumers receive schemas that accurately reflect the source database’s capabilities and constraints.

The SourceSchema Trait

Every CDC source implements the SourceSchema trait, which provides a common interface for fingerprinting and column access while allowing source-specific schema representations:

#![allow(unused)]
fn main() {
pub trait SourceSchema: Serialize + DeserializeOwned + Send + Sync {
    /// Source type identifier (e.g., "mysql", "postgres", "mongodb").
    fn source_kind(&self) -> &'static str;

    /// Content-addressable fingerprint for change detection.
    /// Two schemas with the same fingerprint are identical.
    fn fingerprint(&self) -> String;

    /// Column/field names in ordinal order.
    fn column_names(&self) -> Vec<&str>;

    /// Primary key column names.
    fn primary_key(&self) -> Vec<&str>;

    /// Human-readable description.
    fn describe(&self) -> String;
}
}

Fingerprinting

Schema fingerprints use SHA-256 hashing over JSON-serialized content to provide:

  • Stability - the same schema always produces the same fingerprint
  • Change detection - any structural change produces a different fingerprint
  • Content-addressability - fingerprints can be used as cache keys or deduplication identifiers
#![allow(unused)]
fn main() {
pub fn compute_fingerprint<T: Serialize>(value: &T) -> String {
    let json = serde_json::to_vec(value).unwrap_or_default();
    let hash = Sha256::digest(&json);
    format!("sha256:{}", hex::encode(hash))
}
}

The fingerprint only includes structurally significant fields. For MySQL, this means columns and primary key are included, but engine and charset are excluded since they don’t affect how CDC events should be interpreted.

MySQL Schema Implementation

The MySqlTableSchema struct captures comprehensive MySQL table metadata:

#![allow(unused)]
fn main() {
pub struct MySqlTableSchema {
    /// Columns in ordinal order
    pub columns: Vec<MySqlColumn>,
    
    /// Primary key column names
    pub primary_key: Vec<String>,
    
    /// Storage engine (InnoDB, MyISAM, etc.)
    pub engine: Option<String>,
    
    /// Default charset
    pub charset: Option<String>,
    
    /// Default collation
    pub collation: Option<String>,
}

pub struct MySqlColumn {
    pub name: String,
    pub column_type: String,      // e.g., "bigint(20) unsigned"
    pub data_type: String,        // e.g., "bigint"
    pub nullable: bool,
    pub ordinal_position: u32,
    pub default_value: Option<String>,
    pub extra: Option<String>,    // e.g., "auto_increment"
    pub comment: Option<String>,
    pub char_max_length: Option<i64>,
    pub numeric_precision: Option<i64>,
    pub numeric_scale: Option<i64>,
}
}

Schema information is fetched from INFORMATION_SCHEMA at startup and cached for the pipeline’s lifetime.

Schema Registry Architecture

The schema registry serves three core functions:

  1. Version tracking - maintains a history of schema versions per table
  2. Change detection - compares fingerprints to detect DDL changes
  3. Replay correlation - associates schemas with checkpoint positions for accurate replay

Schema Versions

Each registered schema version includes:

FieldDescription
versionPer-table version number (starts at 1)
hashContent fingerprint for deduplication
schema_jsonFull schema as JSON
registered_atRegistration timestamp
sequenceGlobal monotonic sequence number
checkpointSource checkpoint at registration time

Sequence Numbers for Replay

The registry maintains a global monotonic sequence counter. When a schema is registered, it receives the next sequence number. Events carry this sequence number, enabling the replay engine to look up the correct schema version:

#![allow(unused)]
fn main() {
// During replay: find schema active at event's sequence
let schema = registry.get_at_sequence(tenant, db, table, event.schema_sequence);
}

This ensures events are always interpreted with the schema that was active when they were produced, even if the table structure has since changed.

Checkpoint Correlation

Schemas can be registered with an associated checkpoint, creating a correlation between schema versions and source positions:

#![allow(unused)]
fn main() {
registry.register_with_checkpoint(
    tenant, db, table, 
    &fingerprint, 
    &schema_json,
    Some(checkpoint_bytes),  // Optional: binlog position when schema was observed
).await?;
}

This correlation supports scenarios like:

  • Replaying events from a specific checkpoint with the correct schema
  • Determining which schema was active at a particular binlog position
  • Rolling back schema state along with checkpoint rollback

Schema Loader

The MySqlSchemaLoader handles schema discovery and caching:

Pattern Expansion

Tables are specified using patterns that support wildcards:

PatternDescription
db.tableExact match
db.*All tables in database
db.prefix%Tables starting with prefix
%.tableTable in any database

Preloading

At startup, the loader expands patterns and preloads all matching schemas:

#![allow(unused)]
fn main() {
let schema_loader = MySqlSchemaLoader::new(dsn, registry, tenant);
let tracked_tables = schema_loader.preload(&["shop.orders", "shop.order_%"]).await?;
}

This ensures schemas are available before the first CDC event arrives.

Caching

Loaded schemas are cached to avoid repeated INFORMATION_SCHEMA queries:

#![allow(unused)]
fn main() {
// Fast path: return cached schema
if let Some(cached) = cache.get(&(db, table)) {
    return Ok(cached.clone());
}

// Slow path: fetch from database, register, cache
let schema = fetch_schema(db, table).await?;
let version = registry.register(...).await?;
cache.insert((db, table), loaded_schema);
}

DDL Handling

When the binlog contains DDL events, the schema loader responds:

DDL TypeAction
CREATE TABLESchema loaded on first row event
ALTER TABLECache invalidated, reloaded on next row
DROP TABLECache entry removed
TRUNCATENo schema change
RENAME TABLEOld removed, new loaded on first row

DDL detection uses the QueryEvent type in the binlog. On DDL, the entire database’s schema cache is invalidated since MySQL doesn’t always specify the exact table in DDL events.

API Endpoints

Reload Schemas

Force reload schemas from the database:

curl -X POST http://localhost:8080/pipelines/{name}/schemas/reload

This clears the cache and re-fetches schemas for all tracked tables.

List Cached Schemas

View currently cached schemas:

curl http://localhost:8080/pipelines/{name}/schemas

Limitations

  • In-memory registry - Schema versions are lost on restart. Persistent backends (SQLite, then PostgreSQL for HA) are planned.
  • No cross-pipeline sharing - Each pipeline maintains its own registry instance
  • Pattern expansion at startup - New tables matching patterns require pipeline restart or reload

Best Practices

  1. Use explicit table patterns in production to avoid accidentally capturing unwanted tables
  2. Monitor schema reload times - slow reloads may indicate overly broad patterns
  3. Trigger schema reload after DDL if your deployment process modifies schemas
  4. Include schema version in downstream events for consumers that need schema evolution awareness

Troubleshooting

Unknown table_id Errors

WARN write_rows for unknown table_id, table_id=42

The binlog contains row events for a table not in the table_map. This happens when:

  • A table was created after the CDC stream started
  • Table patterns don’t match the table

Solution: Trigger a schema reload via the REST API.

Schema Fetch Returned 0 Columns

WARN schema fetch returned 0 columns, db=shop, table=orders

Usually indicates:

  • Table doesn’t exist
  • MySQL user lacks SELECT privilege on INFORMATION_SCHEMA
  • Table was dropped between detection and schema load

Slow Schema Loading

WARN slow schema fetch, db=shop, table=orders, ms=350

Consider:

  • Narrowing table patterns to reduce the number of tables
  • Using exact table names instead of wildcards
  • Verifying network latency to the MySQL server

Schema Sensing

Schema sensing automatically infers and tracks schema structure from JSON event payloads. This complements the schema registry by discovering schema from data rather than database metadata.

When to Use Schema Sensing

Schema sensing is useful when:

  • Source doesn’t provide schema: Some sources emit JSON without metadata
  • JSON columns: Database JSON/JSONB columns have dynamic structure
  • Schema evolution tracking: Detect when payload structure changes over time
  • Downstream integration: Generate JSON Schema for consumers

How It Works

┌──────────────┐     ┌─────────────────┐     ┌──────────────────┐
│    Event     │────▶│  Schema Sensor  │────▶│  Inferred Schema │
│   Payload    │     │   (sampling)    │     │   + Fingerprint  │
└──────────────┘     └─────────────────┘     └──────────────────┘
                              │
                              ▼
                     ┌─────────────────┐
                     │ Structure Cache │
                     └─────────────────┘
  1. Observation: Events flow through the sensor during batch processing
  2. Sampling: Not every event is fully analyzed (configurable rate)
  3. Deep inspection: Nested JSON structures are recursively analyzed
  4. Fingerprinting: Schema changes are detected via SHA-256 fingerprints
  5. Caching: Repeated structures skip full analysis for performance

Configuration

Enable schema sensing in your pipeline spec:

spec:
  schema_sensing:
    enabled: true
    
    # Deep inspection for nested JSON
    deep_inspect:
      enabled: true
      max_depth: 3
      max_sample_size: 500
    
    # Sampling configuration
    sampling:
      warmup_events: 50
      sample_rate: 5
      structure_cache: true
      structure_cache_size: 50
    
    # Output configuration
    output:
      include_stats: true

Configuration Options

Top Level

FieldTypeDefaultDescription
enabledboolfalseEnable schema sensing

Deep Inspection (deep_inspect)

FieldTypeDefaultDescription
enabledboolfalseEnable deep inspection of nested objects
max_depthinteger3Maximum nesting depth to analyze
max_sample_sizeinteger500Max events to sample for deep analysis

Sampling (sampling)

FieldTypeDefaultDescription
warmup_eventsinteger50Events to analyze before sampling kicks in
sample_rateinteger5Analyze 1 in N events after warmup
structure_cachebooltrueCache structure hashes for performance
structure_cache_sizeinteger50Max cached structures per table

Inferred Types

Schema sensing infers these JSON types:

TypeDescription
nullJSON null value
booleantrue/false
integerWhole numbers
numberFloating point numbers
stringText values
arrayJSON arrays (element types tracked)
objectNested objects (fields recursively analyzed)

For fields with varying types across events, all observed types are recorded.

Schema Evolution

When schema structure changes, the sensor:

  1. Detects change: Fingerprint differs from previous version
  2. Increments sequence: Monotonic version number increases
  3. Logs evolution: Emits structured log with old/new fingerprints
  4. Updates cache: New structure becomes current

Evolution events are available via the REST API and can trigger alerts.

Stabilization

After observing enough events, a schema “stabilizes”:

  • Warmup phase completes
  • Structure stops changing
  • Sampling rate takes effect
  • Cache hit rate increases

Stabilized schemas have stabilized: true in API responses.

API Access

List Inferred Schemas

curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas

Get Schema Details

curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders

Export as JSON Schema

curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders/json-schema

Cache Statistics

curl http://localhost:8080/pipelines/my-pipeline/sensing/stats

Drift Detection

Schema sensing integrates with drift detection to compare:

  • Expected schema: From database metadata (schema registry)
  • Observed schema: From event payloads (schema sensing)

When mismatches occur, drift events are recorded:

Drift TypeDescription
unexpected_nullNon-nullable column has null values
type_mismatchObserved type differs from declared type
undeclared_columnField in data not in schema
missing_columnSchema field never seen in data
json_structure_changeJSON column structure changed

Access drift data via:

curl http://localhost:8080/pipelines/my-pipeline/drift

Performance Considerations

Sampling Tradeoffs

SettingEffect
Higher warmup_eventsBetter initial accuracy, slower stabilization
Higher sample_rateLower CPU usage, slower evolution detection
Larger structure_cache_sizeMore memory, better hit rate

High-throughput pipelines (>10k events/sec):

sampling:
  warmup_events: 100
  sample_rate: 10
  structure_cache: true
  structure_cache_size: 100

Schema evolution monitoring:

sampling:
  warmup_events: 25
  sample_rate: 2
  structure_cache: true

Development/debugging:

sampling:
  warmup_events: 10
  sample_rate: 1  # Analyze every event

Example: JSON Column Sensing

For tables with JSON columns, sensing reveals the internal structure:

# Database schema shows: metadata JSON
# Sensing reveals:
fields:
  - name: "metadata.user_agent"
    types: ["string"]
    nullable: false
  - name: "metadata.ip_address"
    types: ["string"]
    nullable: true
  - name: "metadata.tags"
    types: ["array"]
    array_element_types: ["string"]

This enables downstream consumers to understand JSON column structure without manual documentation.

Metrics

Schema sensing emits these metrics:

MetricTypeDescription
deltaforge_schema_evolutions_total{pipeline}CounterSchema evolution events detected
deltaforge_schema_drift_detected{pipeline}CounterIncremented when drift is detected in a batch
deltaforge_stage_latency_seconds{pipeline,stage="schema_sensing"}HistogramTime spent in schema sensing per batch

Cache statistics (hits, misses, hit rate) are available via the REST API at /pipelines/{name}/sensing/stats but are not currently exposed as Prometheus metrics.

Observability playbook

DeltaForge already ships a Prometheus exporter, structured logging, and a panic hook. The runtime now emits source ingress counters, batching/processor histograms, and sink latency/throughput so operators can build production dashboards immediately. The tables below capture what is wired today and the remaining gaps to make the platform production ready for data and infra engineers.

What exists today

  • Prometheus endpoint served at /metrics (default 0.0.0.0:9000) with descriptors for pipeline counts, source/sink counters, and a stage latency histogram. The recorder is installed automatically when metrics are enabled.
  • Structured logging via tracing_subscriber with JSON output by default, optional targets, and support for RUST_LOG overrides.
  • Panic hook increments a deltaforge_panics_total counter and logs captured panics before delegating to the default hook.

Instrumentation gaps and recommendations

The sections below call out concrete metrics and log events to add per component. All metrics should include pipeline, tenant, and component identifiers where applicable so users can aggregate across fleets.

Sources (MySQL/Postgres)

StatusMetric/logRationale
✅ Implementeddeltaforge_source_events_total{pipeline,source,table} counter increments when MySQL events are handed to the coordinator.Surfaces ingress per table and pipeline.
✅ Implementeddeltaforge_source_reconnects_total{pipeline,source} counter when binlog reads reconnect.Makes retry storms visible.
🚧 Gapdeltaforge_source_lag_seconds{pipeline,source} gauge based on binlog/WAL position vs. server time.Alert when sources fall behind.
🚧 Gapdeltaforge_source_idle_seconds{pipeline,source} gauge updated when no events arrive within the inactivity window.Catch stuck readers before downstream backlogs form.

Coordinator and batching

StatusMetric/logRationale
✅ Implementeddeltaforge_batch_events{pipeline} and deltaforge_batch_bytes{pipeline} histograms in Coordinator::process_deliver_and_maybe_commit.Tune batching policies with data.
✅ Implementeddeltaforge_stage_latency_seconds{pipeline,stage,trigger} histogram for processor stage.Provides batch timing per trigger (timer/limits/shutdown).
✅ Implementeddeltaforge_processor_latency_seconds{pipeline,processor} histogram around every processor invocation.Identify slow user functions.
🚧 Gapdeltaforge_pipeline_channel_depth{pipeline} gauge from mpsc::Sender::capacity()/len().Detect backpressure between sources and coordinator.
🚧 GapCheckpoint outcome counters/logs (deltaforge_checkpoint_success_total / _failure_total).Alert on persistence regressions and correlate to data loss risk.

Sinks (Kafka/Redis/custom)

StatusMetric/logRationale
✅ Implementeddeltaforge_sink_events_total{pipeline,sink} counter and deltaforge_sink_latency_seconds{pipeline,sink} histogram around each send.Throughput and responsiveness per sink.
✅ Implementeddeltaforge_sink_batch_total{pipeline,sink} counter for send.Number of batches sent per sink.
🚧 GapError taxonomy in deltaforge_sink_failures_total (add kind/details).Easier alerting on specific failure classes (auth, timeout, schema).
🚧 GapBackpressure gauge for client buffers (rdkafka queue, Redis pipeline depth).Early signal before errors occur.
🚧 GapDrop/skip counters from processors/sinks.Auditing and reconciliation.

Control plane and health endpoints

NeedSuggested metric/logRationale
API request accountingdeltaforge_api_requests_total{route,method,status} counter and latency histogram using Axum middleware.Production-grade visibility of operator actions.
Ready/Liveness transitionsLogs with pipeline counts and per-pipeline status when readiness changes.Explain probe failures in log aggregation.
Pipeline lifecycleCounters for create/patch/stop actions with success/error labels; include tenant and caller metadata in logs.Auditable control-plane operations.

Examples

MySQL to Redis

This example streams MySQL binlog events into a Redis stream with an inline JavaScript transformation.

metadata:
  name: orders-mysql-to-redis
  tenant: acme
spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders
  processors:
    - type: javascript
      id: redact-email
      inline: |
        function processBatch(events) {
          return events.map((event) => {
            if (event.after && event.after.email) {
              event.after.email = "[redacted]";
            }
            return event;
          });
        }
      limits:
        timeout_ms: 500
  sinks:
    - type: redis
      config:
        id: orders-redis
        uri: ${REDIS_URI}
        stream: orders
        required: true
  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 1000
  commit_policy:
    mode: required

Feel free to add a Kafka sink alongside Redis. Mark only the critical sink as required if you want checkpoints to proceed when optional sinks are unavailable.

Example: Turso to Kafka

This example demonstrates streaming changes from a Turso database to Kafka with schema sensing enabled.

Use Case

You have a Turso database (or local libSQL) and want to:

  • Stream table changes to a Kafka topic
  • Automatically detect schema structure from JSON payloads
  • Transform events with JavaScript before publishing

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: turso2kafka
  tenant: acme

spec:
  source:
    type: turso
    config:
      id: turso-main
      # Local libSQL for development
      url: "http://127.0.0.1:8080"
      # For Turso cloud:
      # url: "libsql://your-db.turso.io"
      # auth_token: "${TURSO_AUTH_TOKEN}"
      tables: ["users", "orders", "order_items"]
      poll_interval_ms: 1000
      cdc_mode: auto

  processors:
    - type: javascript
      id: enrich
      inline: |
        function processBatch(events) {
          return events.map(event => {
            // Add custom metadata to events
            event.source_type = "turso";
            event.processed_at = new Date().toISOString();
            return event;
          });
        }

  sinks:
    - type: kafka
      config:
        id: kafka-main
        brokers: "${KAFKA_BROKERS}"
        topic: turso.changes
        required: true
        exactly_once: false
        client_conf:
          message.timeout.ms: "5000"
          acks: "all"

  batch:
    max_events: 100
    max_bytes: 1048576
    max_ms: 500
    respect_source_tx: false
    max_inflight: 2

  commit_policy:
    mode: required

  schema_sensing:
    enabled: true
    deep_inspect:
      enabled: true
      max_depth: 3
    sampling:
      warmup_events: 50
      sample_rate: 5
      structure_cache: true

Running the Example

1. Start Infrastructure

# Start Kafka and other services
./dev.sh up

# Create the target topic
./dev.sh k-create turso.changes 6

2. Start Local libSQL (Optional)

For local development without Turso cloud:

# Using sqld (libSQL server)
sqld --http-listen-addr 127.0.0.1:8080

# Or with Docker
docker run -p 8080:8080 ghcr.io/libsql/sqld:latest

3. Create Test Tables

CREATE TABLE users (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  name TEXT NOT NULL,
  email TEXT UNIQUE,
  metadata TEXT  -- JSON column
);

CREATE TABLE orders (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  user_id INTEGER NOT NULL,
  total REAL NOT NULL,
  status TEXT DEFAULT 'pending',
  created_at TEXT DEFAULT CURRENT_TIMESTAMP
);

4. Run DeltaForge

# Save config as turso-kafka.yaml
cargo run -p runner -- --config turso-kafka.yaml

5. Insert Test Data

INSERT INTO users (name, email, metadata) 
VALUES ('Alice', 'alice@example.com', '{"role": "admin", "tags": ["vip"]}');

INSERT INTO orders (user_id, total, status) 
VALUES (1, 99.99, 'completed');

6. Verify Events in Kafka

./dev.sh k-consume turso.changes --from-beginning

You should see events like:

{
  "event_id": "550e8400-e29b-41d4-a716-446655440000",
  "tenant_id": "acme",
  "table": "users",
  "op": "insert",
  "after": {
    "id": 1,
    "name": "Alice",
    "email": "alice@example.com",
    "metadata": "{\"role\": \"admin\", \"tags\": [\"vip\"]}"
  },
  "source_type": "turso",
  "processed_at": "2025-01-15T10:30:00.000Z",
  "timestamp": "2025-01-15T10:30:00.123Z"
}

Monitoring

Check Pipeline Status

curl http://localhost:8080/pipelines/turso2kafka

View Inferred Schemas

# List all inferred schemas
curl http://localhost:8080/pipelines/turso2kafka/sensing/schemas

# Get details for users table
curl http://localhost:8080/pipelines/turso2kafka/sensing/schemas/users

# Export as JSON Schema
curl http://localhost:8080/pipelines/turso2kafka/sensing/schemas/users/json-schema

Check Drift Detection

curl http://localhost:8080/pipelines/turso2kafka/drift

Turso Cloud Configuration

For production with Turso cloud:

source:
  type: turso
  config:
    id: turso-prod
    url: "libsql://mydb-myorg.turso.io"
    auth_token: "${TURSO_AUTH_TOKEN}"
    tables: ["*"]
    cdc_mode: native
    poll_interval_ms: 1000
    native_cdc:
      level: data

Set the auth token via environment variable:

export TURSO_AUTH_TOKEN="your-token-here"

Notes

  • CDC Mode: auto tries native CDC first, then falls back to triggers or polling
  • Poll Interval: Lower values reduce latency but increase database load
  • Schema Sensing: Automatically discovers JSON structure in text columns
  • Exactly Once: Set to false for higher throughput; use true if Kafka cluster supports EOS

Troubleshooting

Common issues and quick checks when running DeltaForge.

  • 🩺 Health-first: start with /healthz and /readyz to pinpoint failing components.

Runner fails to start

  • Confirm the config path passed to --config exists and is readable.
  • Validate YAML syntax and that required fields like metadata.name and spec.source are present.
  • Ensure environment variables referenced in the spec are set (dsn, brokers, uri, etc.).

Pipelines remain unready

  • Check the /readyz endpoint for per-pipeline status and error messages.
  • Verify upstream credentials allow replication (MySQL binlog). Other engines are experimental unless explicitly documented.
  • Inspect sink connectivity; a required sink that cannot connect will block checkpoints.

Slow throughput

  • Increase batch.max_events or batch.max_bytes to reduce flush frequency.
  • Adjust max_inflight to allow more concurrent batches if sinks can handle parallelism.
  • Reduce processor work or add guardrails (limits) to prevent slow JavaScript from stalling the pipeline.

Checkpoints not advancing

  • Review the commit policy: mode: all or required sinks that are unavailable will block progress.
  • Look for sink-specific errors (for example, Kafka broker unreachability or Redis backpressure).
  • Pause and resume the pipeline to force a clean restart after addressing the underlying issue.

DeltaForge

Development Guide

Use this guide to build, test, and extend DeltaForge. It covers local workflows, optional dependency containers, and how to work with Docker images.

All contributions are welcome and highly appreciated.

Local prerequisites

  • Rust toolchain 1.89+ (install via rustup).
  • Optional: Docker or Podman for running the dev dependency stack and the container image.

Workspace layout

  • crates/deltaforge-core : shared event model, pipeline engine, and checkpointing primitives.
  • crates/sources : database CDC readers (MySQL binlog, Postgres logical replication) implemented as pluggable sources.
  • crates/processors : JavaScript-based processors and support code for transforming batches.
  • crates/sinks : sink implementations (Kafka producer, Redis streams) plus sink utilities.
  • crates/rest-api : HTTP control plane with health/readiness and pipeline lifecycle endpoints.
  • crates/runner : CLI entrypoint that wires the runtime, metrics, and control plane together.

Use these crate boundaries as reference points when adding new sources, sinks, or pipeline behaviors.

Start dev dependencies

Bring up the optional backing services (MySQL, Kafka, Redis) with Docker Compose:

docker compose -f docker-compose.dev.yml up -d

Each service is exposed on localhost for local runs (5432, 3306, 9092, 6379). The MySQL container seeds demo data from ./init-scripts and configures binlog settings required for CDC.

Prefer the convenience dev.sh wrapper to keep common tasks consistent:

./dev.sh up     # start the dependency stack
./dev.sh down   # stop and remove it
./dev.sh ps     # see container status

Build and test locally

Run the usual Rust workflow from the repo root:

cargo fmt --all
cargo clippy --workspace --all-targets --all-features
cargo test --workspace

Or use the helper script for a single command that mirrors CI expectations:

./dev.sh build         # build project (debug)
./dev.sh build-release # build project (release)
./dev.sh run           # run with examples/dev.yaml
./dev.sh fmt           # format code
./dev.sh lint          # clippy with warnings as errors
./dev.sh test          # full test suite
./dev.sh check         # fmt --check + clippy + tests (mirrors CI)
./dev.sh cov           # generate coverage report

Docker images

Use pre-built images

Multi-arch images (amd64/arm64) are published to GHCR and Docker Hub:

# Minimal (~57MB, scratch-based, no shell)
docker pull ghcr.io/vnvo/deltaforge:latest
docker pull vnvohub/deltaforge:latest

# Debug (~140MB, includes shell for troubleshooting)
docker pull ghcr.io/vnvo/deltaforge:latest-debug
docker pull vnvohub/deltaforge:latest-debug
VariantSizeBaseUse case
latest~57MBscratchProduction
latest-debug~140MBdebian-slimTroubleshooting, has shell

Build locally

Two Dockerfiles are provided:

# Minimal image (~57MB)
docker build -t deltaforge:local .

# Debug image (~140MB, includes shell)
docker build -t deltaforge:local-debug -f Dockerfile.debug .

Or use the dev helper:

./dev.sh docker            # build minimal image
./dev.sh docker-debug      # build debug image
./dev.sh docker-test       # test minimal image runs
./dev.sh docker-test-debug # test debug image runs
./dev.sh docker-all        # build and test all variants
./dev.sh docker-shell      # open shell in debug container

Build multi-arch locally

To build for both amd64 and arm64:

./dev.sh docker-multi-setup  # create buildx builder (once)
./dev.sh docker-multi        # build both architectures

Note: Multi-arch builds use QEMU emulation and take ~30-35 minutes. The images are not loaded locally - use --push to push to a registry.

Run the image

Run the container by mounting your pipeline specs and exposing the API and metrics ports:

docker run --rm \
  -p 8080:8080 -p 9000:9000 \
  -v $(pwd)/examples/dev.yaml:/etc/deltaforge/pipeline.yaml:ro \
  -v deltaforge-checkpoints:/app/data \
  ghcr.io/vnvo/deltaforge:latest \
  --config /etc/deltaforge/pipeline.yaml

Notes:

  • The container listens on 0.0.0.0:8080 for the control plane API with metrics on :9000.
  • Checkpoints are written to /app/data/df_checkpoints.json; mount a volume to persist them across restarts.
  • Environment variables inside the YAML are expanded before parsing.
  • Pass any other runner flags as needed (e.g., --api-addr or --metrics-addr).

Debug a running container

Use the debug image to troubleshoot:

# Run with shell access
docker run --rm -it --entrypoint /bin/bash ghcr.io/vnvo/deltaforge:latest-debug

# Exec into a running container
docker exec -it <container_id> /bin/bash

Dev helper commands

The dev.sh script provides shortcuts for common tasks:

./dev.sh help  # show all commands

Infrastructure

./dev.sh up           # start MySQL, Kafka, Redis
./dev.sh down         # stop and remove containers
./dev.sh ps           # list running services

Kafka

./dev.sh k-list                         # list topics
./dev.sh k-create <topic>               # create topic
./dev.sh k-consume <topic> --from-beginning
./dev.sh k-produce <topic>              # interactive producer

Redis

./dev.sh redis-cli                      # open redis-cli
./dev.sh redis-read <stream>            # read from stream

Database shells

./dev.sh pg-sh       # psql into Postgres
./dev.sh mysql-sh    # mysql into MySQL

Documentation

./dev.sh docs        # serve docs locally (opens browser)
./dev.sh docs-build  # build docs

Pre-release checks

./dev.sh release-check  # run all checks + build all Docker variants

Contributing

  1. Fork the repository
  2. Create a branch from main (e.g., feature/new-sink, fix/checkpoint-bug)
  3. Make your changes
  4. Run ./dev.sh check to ensure CI will pass
  5. Submit a PR against main

Things to Remember

Tests

There a few #[ignore] tests, run them when making deep changes to the sources, pipeline coordination and anything with impact on core functionality.

Logging hygiene

  • Include pipeline, tenant, source_id/sink_id, and batch_id fields on all warnings/errors to make traces joinable in log aggregation tools.
  • Normalize retry/backoff logs so they include the attempt count and sleep duration; consider a structured reason field alongside error details for dashboards.
  • Add info-level summaries on interval (e.g., every N batches) reporting batches processed, average batch size, lag, and sink latency percentiles pulled from the metrics registry to create human-friendly breadcrumbs.
  • Add metrics in a backward-compatible way: prefer new metric names over redefining existing ones to avoid breaking dashboards. Validate cardinality (bounded label sets) before merging.
  • Gate noisy logs behind levels (debug for per-event traces, info for batch summaries, warn/error for retries and failures).
  • Exercise the new metrics in integration tests by asserting counters change when sending synthetic events through pipelines.

Roadmap