Introduction
DeltaForge is a versatile, high-performance Change Data Capture (CDC) engine built in Rust. It streams database changes into downstream systems like Kafka, Redis, and NATS - giving you full control over how events are routed, transformed, and delivered. Built-in schema discovery automatically infers and tracks the shape of your data as it flows through, including deep inspection of nested JSON structures.
Pipelines are defined declaratively in YAML, making it straightforward to onboard new use cases without custom code.
| Built with | Sources | Processors | Sinks | Output Formats |
|
Rust |
MySQL · PostgreSQL |
JavaScript · Outbox |
Kafka · Redis · NATS |
|
Why DeltaForge?
Core Capabilities
- ⚡ Powered by Rust : Predictable performance, memory safety, and minimal resource footprint.
- 🔌 Pluggable architecture : Sources, processors, and sinks are modular and independently extensible.
- 🧩 Declarative pipelines : Define sources, transforms, sinks, and commit policies in version-controlled YAML with environment variable expansion for secrets.
- 📦 Reliable checkpointing : Resume safely after restarts with at-least-once delivery guarantees.
- 🔀 Dynamic routing : Route events to per-table topics, streams, or subjects using templates or JavaScript logic.
- 📤 Transactional outbox : Publish domain events atomically with database writes. Per-aggregate routing, raw payload delivery, zero polling.
- 🛠️ Cloud-native ready : Single binary, Docker images, JSON logs, Prometheus metrics, and liveness/readiness probes for Kubernetes.
Schema Intelligence
- 🔍 Schema sensing : Automatically infer and track schema from event payloads, including deep inspection of nested JSON structures.
- 🗺️ High-cardinality handling : Detect and normalize dynamic map keys (session IDs, trace IDs) to prevent false schema evolution events.
- 🏷️ Schema fingerprinting : SHA-256 based change detection with schema-to-checkpoint correlation for reliable replay.
- 🗃️ Source-owned semantics : Preserves native database types (PostgreSQL arrays, MySQL JSON, etc.) instead of normalizing to a universal type system.
Operational Features
- 🔄 Graceful failover : Handles source failover with automatic schema revalidation - no manual intervention needed.
- 🧬 Zero-downtime schema evolution : Detects DDL changes and reloads schemas automatically, no pipeline restart needed.
- 🎯 Flexible table selection : Wildcard patterns (
db.*,schema.prefix%) for easy onboarding. - 📀 Transaction boundaries : Optionally keep source transactions intact across batches.
- ⚙️ Commit policies : Control checkpoint behavior with
all,required, orquorummodes across multiple sinks. - 🔧 Live pipeline management : Pause, resume, patch, and inspect running pipelines via the REST API.
- 🗄️ Safe initial snapshot : Consistent parallel backfill of existing tables before streaming begins, with binlog/WAL retention validation, background guards, and crash-resume at table granularity.
Use Cases
DeltaForge is designed for:
- Real-time data synchronization : Keep caches, search indexes, and analytics systems in sync with your primary database.
- Event-driven architectures : Stream database changes to Kafka or NATS for downstream microservices.
- Transactional messaging : Use the outbox pattern to publish domain events atomically with database writes - no distributed transactions needed.
- Audit trails and compliance : Capture every mutation with full before/after images for SOC2, HIPAA, or GDPR requirements.
- Lightweight ETL : Transform, filter, and route data in-flight with JavaScript processors - no Spark or Flink cluster needed.
DeltaForge is not a DAG-based stream processor. It is a focused CDC engine meant to replace tools like Debezium when you need a lighter, cloud-native, and more customizable runtime.
Getting Started
| Guide | Description |
|---|---|
| Quickstart | Get DeltaForge running in minutes |
| CDC Overview | Understand Change Data Capture concepts |
| Configuration | Pipeline spec reference |
| Development | Build from source, contribute |
Quick Example
metadata:
name: orders-to-kafka
tenant: acme
spec:
source:
type: mysql
config:
dsn: ${MYSQL_DSN}
tables: [shop.orders]
processors:
- type: javascript
inline: |
(event) => {
event.processed_at = Date.now();
return [event];
}
sinks:
- type: kafka
config:
brokers: ${KAFKA_BROKERS}
topic: order-events
Installation
Docker (recommended):
docker pull ghcr.io/vnvo/deltaforge:latest
From source:
git clone https://github.com/vnvo/deltaforge.git
cd deltaforge
cargo build --release
See the Development Guide for detailed build instructions and available Docker image variants.
License
DeltaForge is dual-licensed under MIT and Apache 2.0.
Change Data Capture (CDC)
Change Data Capture (CDC) is the practice of streaming database mutations as ordered events so downstream systems can stay in sync without periodic full loads. Rather than asking “what does my data look like now?”, CDC answers “what changed, and when?”
DeltaForge is a CDC engine built to make log-based change streams reliable, observable, and operationally simple in modern, containerized environments. It focuses on row-level CDC from MySQL binlog and Postgres logical replication to keep consumers accurate and latency-aware while minimizing impact on source databases.
In short: DeltaForge tails committed transactions from MySQL and Postgres logs, preserves ordering and transaction boundaries, and delivers events to Kafka and Redis with checkpointed delivery, configurable batching, and Prometheus metrics - without requiring a JVM or distributed coordinator.
Why CDC matters
Traditional data integration relies on periodic batch jobs that query source systems, compare snapshots, and push differences downstream. This approach worked for decades, but modern architectures demand something better.
The batch ETL problem: A nightly sync means your analytics are always a day stale. An hourly sync still leaves gaps and hammers your production database with expensive SELECT * queries during business hours. As data volumes grow, these jobs take longer, fail more often, and compete for the same resources your customers need.
CDC flips the model. Instead of pulling data on a schedule, you subscribe to changes as they happen.
| Aspect | Batch ETL | CDC |
|---|---|---|
| Latency | Minutes to hours | Seconds to milliseconds |
| Source load | High (repeated scans) | Minimal (log tailing) |
| Data freshness | Stale between runs | Near real-time |
| Failure recovery | Re-run entire job | Resume from checkpoint |
| Change detection | Diff comparison | Native from source |
These benefits compound as systems scale and teams decentralize:
- Deterministic replay: Ordered events allow consumers to reconstruct state or power exactly-once delivery with checkpointing.
- Polyglot delivery: The same change feed can serve caches, queues, warehouses, and search indexes simultaneously without additional source queries.
How CDC works
All CDC implementations share a common goal: detect changes and emit them as events. The approaches differ in how they detect those changes.
Log-based CDC
Databases maintain transaction logs (MySQL binlog, Postgres WAL) that record every committed change for durability and replication. Log-based CDC reads these logs directly, capturing changes without touching application tables.
┌─────────────┐ commits ┌─────────────┐ tails ┌─────────────┐
│ Application │──────────────▶│ Database │─────────────▶│ CDC Engine │
└─────────────┘ │ + WAL │ └──────┬──────┘
└─────────────┘ │
▼
┌─────────────┐
│ Kafka / │
│ Redis │
└─────────────┘
Advantages:
- Zero impact on source table performance
- Captures all changes including those from triggers and stored procedures
- Preserves transaction boundaries and ordering
- Can capture deletes without soft-delete columns
Trade-offs:
- Requires database configuration (replication slots, binlog retention)
- Schema changes need careful handling
- Log retention limits how far back you can replay
DeltaForge uses log-based CDC exclusively. This allows DeltaForge to provide stronger ordering guarantees, lower source impact, and simpler operational semantics than hybrid approaches that mix log tailing with polling or triggers.
Trigger-based CDC
Database triggers fire on INSERT, UPDATE, and DELETE operations, writing change records to a shadow table that a separate process polls.
Advantages:
- Works on databases without accessible transaction logs
- Can capture application-level context unavailable in logs
Trade-offs:
- Adds write overhead to every transaction
- Triggers can be disabled or forgotten during schema migrations
- Shadow tables require maintenance and can grow unbounded
Polling-based CDC
A process periodically queries tables for rows modified since the last check, typically using an updated_at timestamp or incrementing ID.
Advantages:
- Simple to implement
- No special database configuration required
Trade-offs:
- Cannot reliably detect deletes
- Requires
updated_atcolumns on every table - Polling frequency trades off latency against database load
- Clock skew and transaction visibility can cause missed or duplicate events
Anatomy of a CDC event
A well-designed CDC event contains everything downstream consumers need to process the change correctly.
{
"id": "evt_01J7K9X2M3N4P5Q6R7S8T9U0V1",
"source": {
"database": "shop",
"table": "orders",
"server_id": "mysql-prod-1"
},
"operation": "update",
"timestamp": "2025-01-15T14:32:01.847Z",
"transaction": {
"id": "gtid:3E11FA47-71CA-11E1-9E33-C80AA9429562:42",
"position": 15847293
},
"before": {
"id": 12345,
"status": "pending",
"total": 99.99,
"updated_at": "2025-01-15T14:30:00.000Z"
},
"after": {
"id": 12345,
"status": "shipped",
"total": 99.99,
"updated_at": "2025-01-15T14:32:01.000Z"
},
"schema_version": "v3"
}
Key components:
| Field | Purpose |
|---|---|
operation | INSERT, UPDATE, DELETE, or DDL |
before / after | Row state before and after the change (enables diff logic) |
transaction | Groups changes from the same database transaction |
timestamp | When the change was committed at the source |
schema_version | Helps consumers handle schema evolution |
Not all fields are present for every operation-before is omitted for INSERTs, after is omitted for DELETEs - but the event envelope and metadata fields are consistent across MySQL and Postgres sources.
Real-world use cases
Cache invalidation and population
Problem: Your Redis cache serves product catalog data, but cache invalidation logic is scattered across dozens of services. Some forget to invalidate, others invalidate too aggressively, and debugging staleness issues takes hours.
CDC solution: Stream changes from the products table to a message queue. A dedicated consumer reads the stream and updates or deletes cache keys based on the operation type. This centralizes invalidation logic, provides replay capability for cache rebuilds, and removes cache concerns from application code entirely.
┌──────────┐ CDC ┌─────────────┐ consumer ┌─────────────┐
│ MySQL │─────────────▶│ Stream │─────────────▶│ Cache Keys │
│ products │ │ (Kafka/ │ │ product:123 │
└──────────┘ │ Redis) │ └─────────────┘
└─────────────┘
Event-driven microservices
Problem: Your order service needs to notify inventory, shipping, billing, and analytics whenever an order changes state. Direct service-to-service calls create tight coupling and cascade failures.
CDC solution: Publish order changes to a durable message queue. Each downstream service subscribes independently, processes at its own pace, and can replay events after failures or during onboarding. The order service doesn’t need to know about its consumers.
┌─────────────┐ CDC ┌─────────────┐
│ Orders │─────────────▶│ Message │
│ Database │ │ Queue │
└─────────────┘ └──────┬──────┘
┌───────────────┼───────────────┐
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Inventory │ │ Shipping │ │ Analytics │
└───────────┘ └───────────┘ └───────────┘
Search index synchronization
Problem: Your Elasticsearch index drifts from the source of truth. Full reindexing takes hours and blocks search updates during the process.
CDC solution: Stream changes continuously to keep the index synchronized. Use the before image to remove old documents and the after image to index new content.
Data warehouse incremental loads
Problem: Nightly full loads into your warehouse take 6 hours and block analysts until noon. Business users complain that dashboards show yesterday’s numbers.
CDC solution: Stream changes to a staging topic, then micro-batch into your warehouse every few minutes. Analysts get near real-time data without impacting source systems.
Audit logging and compliance
Problem: Regulations require you to maintain a complete history of changes to sensitive data. Application-level audit logging is inconsistent and can be bypassed.
CDC solution: The transaction log captures every committed change regardless of how it was made - application code, admin scripts, or direct SQL. Stream these events to immutable storage for compliance.
Cross-region replication
Problem: You need to replicate data to a secondary region for disaster recovery, but built-in replication doesn’t support the transformations you need.
CDC solution: Stream changes through a CDC pipeline that filters, transforms, and routes events to the target region’s databases or message queues.
Architecture patterns
The outbox pattern
When you need to update a database and publish an event atomically, the outbox pattern provides exactly-once semantics without distributed transactions.
┌─────────────────────────────────────────┐
│ Single Transaction │
│ ┌─────────────┐ ┌─────────────────┐ │
│ │ UPDATE │ │ INSERT INTO │ │
│ │ orders SET │ + │ outbox (event) │ │
│ │ status=... │ │ │ │
│ └─────────────┘ └─────────────────┘ │
└─────────────────────────────────────────┘
│
▼ CDC
┌─────────────┐
│ Kafka │
└─────────────┘
- The application writes business data and an event record in the same transaction.
- CDC tails the outbox table and publishes events to Kafka.
- A cleanup process (or CDC itself) removes processed outbox rows.
This guarantees that events are published if and only if the transaction commits.
When to skip the outbox: If your only requirement is to react to committed database state (not application intent), direct CDC from business tables is often simpler than introducing an outbox. The outbox pattern adds value when you need custom event payloads, explicit event versioning, or when the event schema differs significantly from table structure.
Event sourcing integration
CDC complements event sourcing by bridging legacy systems that weren’t built event-first. Stream changes from existing tables into event stores, then gradually migrate to native event sourcing.
CQRS (Command Query Responsibility Segregation)
CDC naturally supports CQRS by populating read-optimized projections from the write model. Changes flow through the CDC pipeline to update denormalized views, search indexes, or cache layers.
Challenges and solutions
Schema evolution
Databases change. Columns get added, renamed, or removed. Types change. CDC pipelines need to handle this gracefully.
Strategies:
- Schema registry: Store and version schemas centrally (e.g., Confluent Schema Registry with Avro/Protobuf).
- Forward compatibility: Add columns as nullable; avoid removing columns that consumers depend on.
- Consumer tolerance: Design consumers to ignore unknown fields and handle missing optional fields.
- Processor transforms: Use DeltaForge’s JavaScript processors to normalize schemas before sinks.
Ordering guarantees
Events must arrive in the correct order for consumers to reconstruct state accurately. A DELETE arriving before its corresponding INSERT would be catastrophic.
DeltaForge guarantees:
- Source-order preservation per table partition (always enabled)
- Transaction boundary preservation when
respect_source_tx: trueis configured in batch settings
Kafka sink uses consistent partitioning by primary key to maintain ordering within a partition at the consumer.
Exactly-once delivery
Network failures, process crashes, and consumer restarts can cause duplicates or gaps. True exactly-once semantics require coordination between source, pipeline, and sink.
DeltaForge approach:
- Checkpoints track the last committed position in the source log.
- Configurable commit policies (
all,required,quorum) control when checkpoints advance. - Kafka sink supports idempotent producers; transactional writes available via
exactly_once: true.
Default behavior: DeltaForge provides at-least-once delivery out of the box. Exactly-once semantics require sink support and explicit configuration.
High availability
Production CDC pipelines need to handle failures without data loss or extended downtime.
Best practices:
- Run multiple pipeline instances with leader election.
- Store checkpoints in durable storage (DeltaForge persists to local files, mountable volumes in containers).
- Monitor lag between source position and checkpoint position.
- Set up alerts for pipeline failures and excessive lag.
Expectations: DeltaForge checkpoints ensure no data loss on restart, but does not currently include built-in leader election. For HA deployments, use external coordination (Kubernetes leader election, etcd locks) or run active-passive with health-check-based failover.
Backpressure
When sinks can’t keep up with the change rate, pipelines need to slow down gracefully rather than dropping events or exhausting memory.
DeltaForge handles backpressure through:
- Configurable batch sizes (
max_events,max_bytes,max_ms). - In-flight limits (
max_inflight) that bound concurrent sink writes. - Blocking reads from source when batches queue up.
Performance considerations
Batching trade-offs
| Setting | Low value | High value |
|---|---|---|
max_events | Lower latency, more overhead | Higher throughput, more latency |
max_ms | Faster flush, smaller batches | Larger batches, delayed flush |
max_bytes | Memory-safe, frequent commits | Efficient for large rows |
Start with DeltaForge defaults and tune based on observed latency and throughput.
Source database impact
Log-based CDC has minimal impact, but consider:
- Replication slot retention: Paused pipelines cause WAL/binlog accumulation.
- Connection limits: Each pipeline holds a replication connection.
- Network bandwidth: High-volume tables generate significant log traffic.
Sink throughput
- Kafka: Tune
batch.size,linger.ms, and compression inclient_conf. - Redis: Use pipelining and connection pooling for high-volume streams.
Monitoring and observability
CDC pipelines are long-running, stateful processes; without metrics and alerts, failures are silent by default. A healthy pipeline requires visibility into lag, throughput, and errors.
Key metrics to track
| Metric | Description | Alert threshold |
|---|---|---|
cdc_lag_seconds | Time between event timestamp and processing | > 60s |
events_processed_total | Throughput counter | Sudden drops |
checkpoint_lag_events | Events since last checkpoint | > 10,000 |
sink_errors_total | Failed sink writes | Any sustained errors |
batch_size_avg | Events per batch | Outside expected range |
DeltaForge exposes Prometheus metrics on the configurable metrics endpoint (default :9000).
Health checks
GET /healthz: Liveness probe - is the process running?GET /readyz: Readiness probe - are pipelines connected and processing?GET /pipelines: Detailed status of each pipeline including configuration.
Choosing a CDC solution
When evaluating CDC tools, consider:
| Factor | Questions to ask |
|---|---|
| Source support | Does it support your databases? MySQL binlog? Postgres logical replication? |
| Sink flexibility | Can it write to your target systems? Kafka, Redis, HTTP, custom? |
| Transformation | Can you filter, enrich, or reshape events in-flight? |
| Operational overhead | How much infrastructure does it require? JVM? Distributed coordinator? |
| Resource efficiency | What’s the memory/CPU footprint per pipeline? |
| Cloud-native | Does it containerize cleanly? Support health checks? Emit metrics? |
Where DeltaForge fits
DeltaForge intentionally avoids the operational complexity of JVM-based CDC stacks (Kafka Connect-style deployments with Zookeeper, Connect workers, and converter configurations) while remaining compatible with Kafka-centric architectures.
DeltaForge is designed for teams that want:
- Lightweight runtime: Single binary, minimal memory footprint, no JVM warmup.
- Config-driven pipelines: YAML specs instead of code for common patterns.
- Inline transformation: JavaScript processors for custom logic without recompilation.
- Container-native operations: Built for Kubernetes with health endpoints and Prometheus metrics.
DeltaForge is not designed for:
- Complex DAG-based stream processing with windowed aggregations
- Stateful joins across multiple streams
- Sources beyond MySQL and Postgres (currently)
- Built-in schema registry integration (use external registries)
If you need those capabilities, consider dedicated stream processors or the broader Kafka ecosystem. DeltaForge excels at getting data out of databases and into your event infrastructure reliably and efficiently.
Getting started
Ready to try CDC with DeltaForge? Head to the Quickstart guide to run your first pipeline in minutes.
For production deployments, review the Development guide for container builds and operational best practices.
Quickstart
Get DeltaForge running in minutes.
1. Prepare a pipeline spec
Create a YAML file that defines your CDC pipeline. Environment variables are expanded at runtime, so secrets stay out of version control.
metadata:
name: orders-mysql-to-kafka
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
processors:
- type: javascript
id: transform
inline: |
(event) => {
event.tags = ["processed"];
return [event];
}
sinks:
- type: kafka
config:
id: orders-kafka
brokers: ${KAFKA_BROKERS}
topic: orders
required: true
batch:
max_events: 500
max_bytes: 1048576
max_ms: 1000
2. Start DeltaForge
Using Docker (recommended):
docker run --rm \
-p 8080:8080 -p 9000:9000 \
-e MYSQL_DSN="mysql://user:pass@host:3306/db" \
-e KAFKA_BROKERS="kafka:9092" \
-v $(pwd)/pipeline.yaml:/etc/deltaforge/pipeline.yaml:ro \
-v deltaforge-checkpoints:/app/data \
ghcr.io/vnvo/deltaforge:latest \
--config /etc/deltaforge/pipeline.yaml
From source:
cargo run -p runner -- --config ./pipeline.yaml
Runner options
| Flag | Default | Description |
|---|---|---|
--config | (required) | Path to pipeline spec file or directory |
--api-addr | 0.0.0.0:8080 | REST API address |
--metrics-addr | 0.0.0.0:9095 | Prometheus metrics address |
3. Verify it’s running
Check health and pipeline status:
# Liveness probe
curl http://localhost:8080/healthz
# Readiness with pipeline status
curl http://localhost:8080/readyz
# List all pipelines
curl http://localhost:8080/pipelines
4. Manage pipelines
Control pipelines via the REST API:
# Pause a pipeline
curl -X POST http://localhost:8080/pipelines/orders-mysql-to-kafka/pause
# Resume a pipeline
curl -X POST http://localhost:8080/pipelines/orders-mysql-to-kafka/resume
# Stop a pipeline
curl -X POST http://localhost:8080/pipelines/orders-mysql-to-kafka/stop
Next steps
- CDC Overview : Understand how Change Data Capture works
- Configuration : Full pipeline spec reference
- Sources : MySQL and Postgres setup
- Sinks : Kafka, Redis, and NATS configuration
- Dynamic Routing : Route events to per-table destinations
- Envelopes : Native, Debezium, and CloudEvents output formats
- Development : Build from source, run locally
Configuration
Pipelines are defined as YAML documents that map directly to the PipelineSpec type. Environment variables are expanded before parsing using ${VAR} syntax, so secrets and connection strings can be injected at runtime. Unknown variables (e.g. ${source.table}) pass through for use as routing templates.
Document structure
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: <pipeline-name>
tenant: <tenant-id>
spec:
source: { ... }
processors: [ ... ]
sinks: [ ... ]
batch: { ... }
commit_policy: { ... }
schema_sensing: { ... }
Metadata
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Unique pipeline identifier. Used in API routes and metrics. |
tenant | string | Yes | Business-oriented tenant label for multi-tenancy. |
Spec fields
| Field | Type | Required | Description |
|---|---|---|---|
source | object | Yes | Database source configuration. See Sources. |
processors | array | No | Ordered list of processors. See Processors. |
sinks | array | Yes (at least one) | One or more sinks that receive each batch. See Sinks. |
sharding | object | No | Optional hint for downstream distribution. |
connection_policy | object | No | How the runtime establishes upstream connections. |
batch | object | No | Commit unit thresholds. See Batching. |
commit_policy | object | No | How sink acknowledgements gate checkpoints. See Commit policy. |
schema_sensing | object | No | Automatic schema inference from event payloads. See Schema sensing. |
Sources
MySQL
Captures row-level changes via binlog replication. See MySQL source documentation for prerequisites and detailed configuration.
|
|
Table patterns support SQL LIKE syntax:
db.table- exact matchdb.prefix%- tables matching prefixdb.%- all tables in database
PostgreSQL
Captures row-level changes via logical replication. See PostgreSQL source documentation for prerequisites and detailed configuration.
|
|
Note: The source-level
outboxfield only tags matching events with the__outboxsentinel. Routing and transformation are handled by theoutboxprocessor.
Processors
Processors transform events between source and sinks. They run in order and can filter, enrich, or modify events.
JavaScript
|
|
Flatten
processors:
- type: flatten
id: flat
separator: "__"
max_depth: 3
on_collision: last
empty_object: preserve
lists: preserve
empty_list: drop
| Field | Type | Default | Description |
|---|---|---|---|
id | string | "flatten" | Processor identifier |
separator | string | "__" | Separator between path segments |
max_depth | int | unlimited | Stop recursing at this depth; objects at the boundary kept as-is |
on_collision | string | last | Key collision policy: last, first, or error |
empty_object | string | preserve | Empty object policy: preserve, drop, or null |
lists | string | preserve | Array policy: preserve or index |
empty_list | string | preserve | Empty array policy: preserve, drop, or null |
Outbox
Transforms raw outbox events into routed, sink-ready events. Requires the source to have outbox configured so events are tagged before reaching this processor. See Outbox pattern documentation for full details.
processors:
- type: outbox
id: outbox
topic: "${aggregate_type}.${event_type}"
default_topic: "events.unrouted"
raw_payload: true
columns:
payload: data
additional_headers:
x-trace-id: trace_id
| Field | Type | Default | Description |
|---|---|---|---|
id | string | "outbox" | Processor identifier |
tables | array | [] | Filter: only process outbox events matching these patterns. Empty = all outbox events. |
topic | string | — | Topic template resolved against the raw payload using ${field} placeholders |
default_topic | string | — | Fallback topic when template resolution fails and no topic column exists |
key | string | — | Key template resolved against raw payload. Default: aggregate_id value. |
raw_payload | bool | false | Deliver the extracted payload as-is to sinks, bypassing envelope wrapping |
strict | bool | false | Fail the batch if required fields are missing rather than silently falling back |
columns.payload | string | payload | Column containing the event payload |
columns.aggregate_type | string | aggregate_type | Column for aggregate type |
columns.aggregate_id | string | aggregate_id | Column for aggregate ID |
columns.event_type | string | event_type | Column for event type |
columns.topic | string | topic | Column for pre-computed topic override |
columns.event_id | string | id | Column extracted as df-event-id header |
additional_headers | map | {} | Forward extra payload fields as routing headers: header-name: column-name |
Sinks
Sinks deliver events to downstream systems. Each sink supports configurable envelope formats and wire encodings to match consumer expectations. See the Sinks documentation for detailed information on multi-sink patterns, commit policies, and failure handling.
Envelope and encoding
All sinks support these serialization options:
| Field | Type | Default | Description |
|---|---|---|---|
envelope | object | native | Output structure format. See Envelopes. |
encoding | string | json | Wire encoding format |
Envelope types:
native- Direct Debezium payload structure (default, most efficient)debezium- Full{"payload": ...}wrappercloudevents- CloudEvents 1.0 specification (requirestype_prefix)
# Native (default)
envelope:
type: native
# Debezium wrapper
envelope:
type: debezium
# CloudEvents
envelope:
type: cloudevents
type_prefix: "com.example.cdc"
Kafka
See Kafka sink documentation for detailed configuration options and best practices.
|
|
CloudEvents example:
sinks:
- type: kafka
config:
id: events-kafka
brokers: ${KAFKA_BROKERS}
topic: events
envelope:
type: cloudevents
type_prefix: "com.acme.cdc"
encoding: json
Redis
See Redis sink documentation for detailed configuration options and best practices.
|
|
NATS
See NATS sink documentation for detailed configuration options and best practices.
|
|
Batching
|
|
Commit policy
|
|
Schema sensing
Schema sensing automatically infers and tracks schema from event payloads. See the Schema Sensing documentation for detailed information on how it works, drift detection, and API endpoints.
Performance tip: Schema sensing can be CPU-intensive, especially with deep JSON inspection. Consider your throughput requirements when configuring:
- Set
enabled: falseif you don’t need runtime schema inference- Limit
deep_inspect.max_depthto avoid traversing deeply nested structures- Increase
sampling.sample_rateto analyze fewer events (e.g., 1 in 100 instead of 1 in 10)- Reduce
sampling.warmup_eventsif you’re confident in schema stability
|
|
Complete examples
MySQL to Kafka with Debezium envelope
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-mysql-to-kafka
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
processors:
- type: javascript
id: transform
inline: |
function processBatch(events) {
return events.map(event => {
event.tags = (event.tags || []).concat(["normalized"]);
return event;
});
}
limits:
cpu_ms: 50
mem_mb: 128
timeout_ms: 500
sinks:
- type: kafka
config:
id: orders-kafka
brokers: ${KAFKA_BROKERS}
topic: orders
envelope:
type: debezium
encoding: json
required: true
exactly_once: false
client_conf:
message.timeout.ms: "5000"
batch:
max_events: 500
max_bytes: 1048576
max_ms: 1000
respect_source_tx: true
max_inflight: 2
commit_policy:
mode: required
PostgreSQL to Kafka with CloudEvents
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: users-postgres-to-kafka
tenant: acme
spec:
source:
type: postgres
config:
id: users-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_users
publication: users_pub
tables:
- public.users
- public.user_sessions
start_position: earliest
sinks:
- type: kafka
config:
id: users-kafka
brokers: ${KAFKA_BROKERS}
topic: user-events
envelope:
type: cloudevents
type_prefix: "com.acme.users"
encoding: json
required: true
batch:
max_events: 500
max_ms: 1000
respect_source_tx: true
commit_policy:
mode: required
Multi-sink with different formats
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-multi-sink
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
sinks:
# Kafka Connect expects Debezium format
- type: kafka
config:
id: connect-sink
brokers: ${KAFKA_BROKERS}
topic: connect-events
envelope:
type: debezium
required: true
# Lambda expects CloudEvents
- type: kafka
config:
id: lambda-sink
brokers: ${KAFKA_BROKERS}
topic: lambda-events
envelope:
type: cloudevents
type_prefix: "com.acme.cdc"
required: false
# Redis cache uses native format
- type: redis
config:
id: cache-redis
uri: ${REDIS_URI}
stream: orders-cache
envelope:
type: native
required: false
batch:
max_events: 500
max_ms: 1000
respect_source_tx: true
commit_policy:
mode: required
MySQL to NATS
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-mysql-to-nats
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.order_items
sinks:
- type: nats
config:
id: orders-nats
url: ${NATS_URL}
subject: orders.events
stream: ORDERS
envelope:
type: native
encoding: json
required: true
batch:
max_events: 500
max_ms: 1000
respect_source_tx: true
commit_policy:
mode: required
REST API Reference
DeltaForge exposes a REST API for health checks, pipeline management, schema inspection, and drift detection. All endpoints return JSON.
Base URL
Default: http://localhost:8080
Configure with --api-addr:
deltaforge --config pipelines.yaml --api-addr 0.0.0.0:9090
Health Endpoints
Liveness Probe
GET /healthz
Returns ok if the process is running. Use for Kubernetes liveness probes.
Response: 200 OK
ok
Readiness Probe
GET /readyz
Returns pipeline states. Use for Kubernetes readiness probes.
Response: 200 OK
{
"status": "ready",
"pipelines": [
{
"name": "orders-cdc",
"status": "running",
"spec": { ... }
}
]
}
Pipeline Management
List Pipelines
GET /pipelines
Returns all pipelines with current status.
Response: 200 OK
[
{
"name": "orders-cdc",
"status": "running",
"spec": {
"metadata": { "name": "orders-cdc", "tenant": "acme" },
"spec": { ... }
}
}
]
Get Pipeline
GET /pipelines/{name}
Returns a single pipeline by name.
Response: 200 OK
{
"name": "orders-cdc",
"status": "running",
"spec": { ... }
}
Errors:
404 Not Found- Pipeline doesn’t exist
Create Pipeline
POST /pipelines
Content-Type: application/json
Creates a new pipeline from a full spec.
Request:
{
"metadata": {
"name": "orders-cdc",
"tenant": "acme"
},
"spec": {
"source": {
"type": "mysql",
"config": {
"id": "mysql-1",
"dsn": "mysql://user:pass@host/db",
"tables": ["shop.orders"]
}
},
"processors": [],
"sinks": [
{
"type": "kafka",
"config": {
"id": "kafka-1",
"brokers": "localhost:9092",
"topic": "orders"
}
}
]
}
}
Response: 200 OK
{
"name": "orders-cdc",
"status": "running",
"spec": { ... }
}
Errors:
409 Conflict- Pipeline already exists
Update Pipeline
PATCH /pipelines/{name}
Content-Type: application/json
Applies a partial update to an existing pipeline. The pipeline is stopped, updated, and restarted.
Request:
{
"spec": {
"batch": {
"max_events": 1000,
"max_ms": 500
}
}
}
Response: 200 OK
{
"name": "orders-cdc",
"status": "running",
"spec": { ... }
}
Errors:
404 Not Found- Pipeline doesn’t exist400 Bad Request- Name mismatch in patch
Delete Pipeline
DELETE /pipelines/{name}
Permanently deletes a pipeline. This removes the pipeline from the runtime and cannot be undone.
Response: 204 No Content
Errors:
404 Not Found- Pipeline doesn’t exist
Pause Pipeline
POST /pipelines/{name}/pause
Pauses ingestion. Events in the buffer are not processed until resumed.
Response: 200 OK
{
"name": "orders-cdc",
"status": "paused",
"spec": { ... }
}
Resume Pipeline
POST /pipelines/{name}/resume
Resumes a paused pipeline.
Response: 200 OK
{
"name": "orders-cdc",
"status": "running",
"spec": { ... }
}
Stop Pipeline
POST /pipelines/{name}/stop
Stops a pipeline. Final checkpoint is saved.
Response: 200 OK
{
"name": "orders-cdc",
"status": "stopped",
"spec": { ... }
}
Schema Management
List Database Schemas
GET /pipelines/{name}/schemas
Returns all tracked database schemas for a pipeline. These are the schemas loaded directly from the source database.
Response: 200 OK
[
{
"database": "shop",
"table": "orders",
"column_count": 5,
"primary_key": ["id"],
"fingerprint": "sha256:a1b2c3d4e5f6...",
"registry_version": 2
},
{
"database": "shop",
"table": "customers",
"column_count": 8,
"primary_key": ["id"],
"fingerprint": "sha256:f6e5d4c3b2a1...",
"registry_version": 1
}
]
Get Schema Details
GET /pipelines/{name}/schemas/{db}/{table}
Returns detailed schema information including all columns.
Response: 200 OK
{
"database": "shop",
"table": "orders",
"columns": [
{
"name": "id",
"type": "bigint(20) unsigned",
"nullable": false,
"default": null,
"extra": "auto_increment"
},
{
"name": "customer_id",
"type": "bigint(20)",
"nullable": false,
"default": null
}
],
"primary_key": ["id"],
"fingerprint": "sha256:a1b2c3d4..."
}
Schema Sensing
Schema sensing automatically infers schema structure from JSON event payloads. This is useful for sources that don’t provide schema metadata or for detecting schema evolution in JSON columns.
List Inferred Schemas
GET /pipelines/{name}/sensing/schemas
Returns all schemas inferred via sensing for a pipeline.
Response: 200 OK
[
{
"table": "orders",
"fingerprint": "sha256:abc123...",
"sequence": 3,
"event_count": 1500,
"stabilized": true,
"first_seen": "2025-01-15T10:30:00Z",
"last_seen": "2025-01-15T14:22:00Z"
}
]
| Field | Description |
|---|---|
table | Table name (or table:column for JSON column sensing) |
fingerprint | SHA-256 content hash of current schema |
sequence | Monotonic version number (increments on evolution) |
event_count | Total events observed |
stabilized | Whether schema has stopped sampling (structure stable) |
first_seen | First observation timestamp |
last_seen | Most recent observation timestamp |
Get Inferred Schema Details
GET /pipelines/{name}/sensing/schemas/{table}
Returns detailed inferred schema including all fields.
Response: 200 OK
{
"table": "orders",
"fingerprint": "sha256:abc123...",
"sequence": 3,
"event_count": 1500,
"stabilized": true,
"fields": [
{
"name": "id",
"types": ["integer"],
"nullable": false,
"optional": false
},
{
"name": "metadata",
"types": ["object"],
"nullable": true,
"optional": false,
"nested_field_count": 5
},
{
"name": "tags",
"types": ["array"],
"nullable": false,
"optional": true,
"array_element_types": ["string"]
}
],
"first_seen": "2025-01-15T10:30:00Z",
"last_seen": "2025-01-15T14:22:00Z"
}
Export JSON Schema
GET /pipelines/{name}/sensing/schemas/{table}/json-schema
Exports the inferred schema as a standard JSON Schema document.
Response: 200 OK
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "orders",
"type": "object",
"properties": {
"id": { "type": "integer" },
"metadata": { "type": ["object", "null"] },
"tags": {
"type": "array",
"items": { "type": "string" }
}
},
"required": ["id", "metadata"]
}
Get Sensing Cache Statistics
GET /pipelines/{name}/sensing/stats
Returns cache performance statistics for schema sensing.
Response: 200 OK
{
"tables": [
{
"table": "orders",
"cached_structures": 3,
"max_cache_size": 100,
"cache_hits": 1450,
"cache_misses": 50
}
],
"total_cache_hits": 1450,
"total_cache_misses": 50,
"hit_rate": 0.9667
}
Drift Detection
Drift detection compares expected database schema against observed data patterns to detect mismatches, unexpected nulls, and type drift.
Get Drift Results
GET /pipelines/{name}/drift
Returns drift detection results for all tables in a pipeline.
Response: 200 OK
[
{
"table": "orders",
"has_drift": true,
"columns": [
{
"column": "amount",
"expected_type": "decimal(10,2)",
"observed_types": ["string"],
"mismatch_count": 42,
"examples": ["\"99.99\""]
}
],
"events_analyzed": 1500,
"events_with_drift": 42
}
]
Get Table Drift
GET /pipelines/{name}/drift/{table}
Returns drift detection results for a specific table.
Response: 200 OK
{
"table": "orders",
"has_drift": false,
"columns": [],
"events_analyzed": 1000,
"events_with_drift": 0
}
Errors:
404 Not Found- Table not found or no drift data available
Error Responses
All error responses follow this format:
{
"error": "Description of the error"
}
| Status Code | Meaning |
|---|---|
400 Bad Request | Invalid request body or parameters |
404 Not Found | Resource doesn’t exist |
409 Conflict | Resource already exists |
500 Internal Server Error | Unexpected server error |
Pipelines
Each pipeline is created from a single PipelineSpec. The runtime spawns the source, processors, and sinks defined in the spec and coordinates them with batching and checkpointing.
- 🔄 Live control: pause, resume, or stop pipelines through the REST API without redeploying.
- 📦 Coordinated delivery: batching and commit policy keep sinks consistent even when multiple outputs are configured.
Lifecycle controls
The REST API addresses pipelines by metadata.name and returns PipeInfo records containing the live spec and status.
GET /healthz- liveness probe.GET /readyz- readiness with pipeline states.GET /pipelines- list pipelines.POST /pipelines- create from a full spec.PATCH /pipelines/{name}- merge a partial spec (for example, adjust batch thresholds) and restart the pipeline.POST /pipelines/{name}/pause- pause ingestion and coordination.POST /pipelines/{name}/resume- resume a paused pipeline.POST /pipelines/{name}/stop- stop a running pipeline.
Pausing halts both source ingestion and the coordinator. Resuming re-enables both ends so buffered events can drain cleanly.
Processors
Processors run in the declared order for each batch. The built-in processor type is JavaScript, powered by deno_core.
type: javascriptid: processor label.inline: JS source. Export aprocessBatch(events)function that returns the transformed batch.limits(optional): resource guardrails (cpu_ms,mem_mb,timeout_ms).
Batching
The coordinator builds batches using soft thresholds:
max_events: flush after this many events.max_bytes: flush after the serialized size reaches this limit.max_ms: flush after this much time has elapsed since the batch started.respect_source_tx: when true, never split a single source transaction across batches.max_inflight: cap the number of batches being processed concurrently.
Commit policy
When multiple sinks are configured, checkpoints can wait for different acknowledgement rules:
all: every sink must acknowledge.required(default): only sinks markedrequired: truemust acknowledge; others are best-effort.quorum: checkpoint after at leastquorumsinks acknowledge.
Sources
DeltaForge captures database changes through pluggable source connectors. Each source is configured under spec.source with a type field and a config object. Environment variables are expanded before parsing using ${VAR} syntax.
Supported Sources
| Source | Status | Description |
|---|---|---|
mysql | ✅ Production | MySQL binlog CDC with GTID support |
postgres | ✅ Production | PostgreSQL logical replication via pgoutput |
turso | 🔧 Beta | Turso/libSQL CDC with multiple modes |
Common Behavior
All sources share these characteristics:
- Checkpointing: Progress is automatically saved and resumed on restart
- Schema tracking: Table schemas are loaded and fingerprinted for change detection
- At-least-once delivery: Events may be redelivered after failures; sinks should be idempotent
- Batching: Events are batched according to the pipeline’s
batchconfiguration - Transaction boundaries:
respect_source_tx: true(default) keeps source transactions intact
Source Interface
Sources implement a common trait that provides:
#![allow(unused)]
fn main() {
trait Source {
fn checkpoint_key(&self) -> &str;
async fn run(&self, tx: Sender<Event>, checkpoint_store: Arc<dyn CheckpointStore>) -> SourceHandle;
}
}
The returned SourceHandle supports pause/resume and graceful cancellation.
Adding Custom Sources
The source interface is pluggable. To add a new source:
- Implement the
Sourcetrait - Add configuration parsing in
deltaforge-config - Register the source type in the pipeline builder
See existing sources for implementation patterns.
MySQL source
DeltaForge tails the MySQL binlog to capture row-level changes with automatic checkpointing and schema tracking.
Prerequisites
MySQL Server Configuration
Ensure your MySQL server has binary logging enabled with row-based format:
-- Required server settings (my.cnf or SET GLOBAL)
log_bin = ON
binlog_format = ROW
binlog_row_image = FULL -- Recommended for complete before-images
If binlog_row_image is not FULL, DeltaForge will warn at startup and before-images on UPDATE/DELETE events may be incomplete.
User Privileges
Create a dedicated replication user with the required grants:
-- Create user with mysql_native_password (required by binlog connector)
CREATE USER 'deltaforge'@'%' IDENTIFIED WITH mysql_native_password BY 'your_password';
-- Replication privileges (required)
GRANT REPLICATION REPLICA, REPLICATION CLIENT ON *.* TO 'deltaforge'@'%';
-- Schema introspection (required for table discovery)
GRANT SELECT, SHOW VIEW ON your_database.* TO 'deltaforge'@'%';
FLUSH PRIVILEGES;
For capturing all databases, grant SELECT on *.* instead.
Configuration
Set spec.source.type to mysql and provide a config object:
| Field | Type | Required | Description |
|---|---|---|---|
id | string | Yes | Unique identifier used for checkpoints, server_id derivation, and metrics |
dsn | string | Yes | MySQL connection string with replication privileges |
tables | array | No | Table patterns to capture; omit or leave empty to capture all user tables |
Table Patterns
The tables field supports flexible pattern matching using SQL LIKE syntax:
tables:
- shop.orders # exact match: database "shop", table "orders"
- shop.order_% # LIKE pattern: tables starting with "order_" in "shop"
- analytics.* # wildcard: all tables in "analytics" database
- %.audit_log # cross-database: "audit_log" table in any database
# omit entirely to capture all user tables (excludes system schemas)
System schemas (mysql, information_schema, performance_schema, sys) are always excluded.
Example
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.order_items
Resume Behavior
DeltaForge automatically checkpoints progress and resumes from the last position on restart. The resume strategy follows this priority:
- GTID: Preferred if the MySQL server has GTID enabled. Provides the most reliable resume across binlog rotations and failovers.
- File:position: Used when GTID is not available. Resumes from the exact binlog file and byte offset.
- Binlog tail: On first run with no checkpoint, starts from the current end of the binlog (no historical replay).
Checkpoints are stored using the id field as the key.
Snapshot (Initial Load)
DeltaForge can perform a consistent initial snapshot of existing table data before starting binlog streaming. This is the recommended approach for migrating existing tables without manual backfills.
How it works
DeltaForge opens all worker connections simultaneously, each with
START TRANSACTION WITH CONSISTENT SNAPSHOT. The binlog position is captured
after all workers have started - InnoDB guarantees every visible row was committed
at or before that position, so CDC streaming from there has no gaps.
No FLUSH TABLES WITH READ LOCK or RELOAD privilege is required.
Additional privileges
-- Required for snapshot (SELECT is already needed for introspection)
GRANT SELECT ON your_database.* TO 'deltaforge'@'%';
Configuration
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
snapshot:
mode: initial # initial | always | never (default: never)
max_parallel_tables: 8 # tables snapshotted concurrently
chunk_size: 10000 # rows per chunk for integer-PK tables
| Field | Default | Description |
|---|---|---|
mode | never | initial: run once if no checkpoint exists; always: re-snapshot on every restart; never: skip |
max_parallel_tables | 8 | Tables snapshotted concurrently |
chunk_size | 10000 | Rows per range chunk (integer single-column PK tables only; others do a full scan) |
Snapshot events
Snapshot rows are emitted as Op::Read events (Debezium op: "r"), distinguishable
from live CDC Op::Create events. The binlog position captured at snapshot time becomes
the CDC resume point, so no rows are missed or duplicated.
Resume after interruption
If the snapshot is interrupted, DeltaForge resumes at table granularity on the next restart - already-completed tables are skipped.
Binlog retention safety
DeltaForge validates binlog retention before starting a snapshot and monitors it throughout. This prevents the silent failure mode where a long snapshot completes successfully but CDC startup then fails because the captured binlog position was purged.
Preflight checks (before any rows are read):
- Fails hard if
log_bin=0orbinlog_format != ROW - Estimates snapshot duration from table sizes and
max_parallel_tables - Warns at ≥50% of
binlog_expire_logs_secondsusage; HIGH RISK at ≥80%
During snapshot:
- Background task polls
SHOW BINARY LOGSevery 30s - Cancels the snapshot immediately if the captured file is purged
After all tables complete:
- Synchronous final check before writing
finished=true finished=truemeans the position is confirmed valid for CDC resume, not just that rows were emitted
If you see retention risk warnings, the recommended actions are:
- Increase
binlog_expire_logs_secondsto cover the estimated snapshot duration - Use a read replica as the snapshot source to avoid load on the primary
Server ID Handling
MySQL replication requires each replica to have a unique server_id. DeltaForge derives this automatically from the source id using a CRC32 hash:
server_id = 1 + (CRC32(id) % 4,000,000,000)
When running multiple DeltaForge instances against the same MySQL server, ensure each has a unique id to avoid server_id conflicts.
Schema Tracking
DeltaForge has a built-in schema registry to track table schemas, per source. For MySQL source:
- Schemas are preloaded at startup by querying
INFORMATION_SCHEMA - Each schema is fingerprinted using SHA-256 for change detection
- Events carry
schema_version(fingerprint) andschema_sequence(monotonic counter) - Schema-to-checkpoint correlation enables reliable replay
Schema changes (DDL) trigger automatic reload of affected table schemas.
Timeouts and Heartbeats
| Behavior | Value | Description |
|---|---|---|
| Heartbeat interval | 15s | Server sends heartbeat if no events |
| Read timeout | 90s | Maximum wait for next binlog event |
| Inactivity timeout | 60s | Triggers reconnect if no data received |
| Connect timeout | 30s | Maximum time to establish connection |
These values are currently fixed. Reconnection uses exponential backoff with jitter.
Event Format
Each captured row change produces an event with:
op:insert,update, ordeletebefore: Previous row state (updates and deletes only, requiresbinlog_row_image = FULL)after: New row state (inserts and updates only)table: Fully qualified table name (database.table)tx_id: GTID if availablecheckpoint: Binlog position for resumeschema_version: Schema fingerprintschema_sequence: Monotonic sequence for schema correlation
Troubleshooting
Connection Issues
If you see authentication errors mentioning mysql_native_password:
ALTER USER 'deltaforge'@'%' IDENTIFIED WITH mysql_native_password BY 'password';
Missing Before-Images
If UPDATE/DELETE events have incomplete before data:
SET GLOBAL binlog_row_image = 'FULL';
Binlog Not Enabled
Check binary logging status:
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
SHOW BINARY LOG STATUS; -- or SHOW MASTER STATUS on older versions
Privilege Issues
Verify grants for your user:
SHOW GRANTS FOR 'deltaforge'@'%';
Required grants include REPLICATION REPLICA and REPLICATION CLIENT.
PostgreSQL source
DeltaForge captures row-level changes from PostgreSQL using logical replication with the pgoutput plugin.
Prerequisites
PostgreSQL Server Configuration
Enable logical replication in postgresql.conf:
# Required settings
wal_level = logical
max_replication_slots = 10 # At least 1 per DeltaForge pipeline
max_wal_senders = 10 # At least 1 per DeltaForge pipeline
Restart PostgreSQL after changing these settings.
User Privileges
Create a replication user with the required privileges:
-- Create user with replication capability
CREATE ROLE deltaforge WITH LOGIN REPLICATION PASSWORD 'your_password';
-- Grant connect access
GRANT CONNECT ON DATABASE your_database TO deltaforge;
-- Grant schema usage and table access for schema introspection
GRANT USAGE ON SCHEMA public TO deltaforge;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO deltaforge;
-- For automatic publication/slot creation (optional)
-- If you prefer manual setup, skip this and create them yourself
ALTER ROLE deltaforge SUPERUSER; -- Or use manual setup below
pg_hba.conf
Ensure your pg_hba.conf allows replication connections:
# TYPE DATABASE USER ADDRESS METHOD
host replication deltaforge 0.0.0.0/0 scram-sha-256
host your_database deltaforge 0.0.0.0/0 scram-sha-256
Replication Slot and Publication
DeltaForge can automatically create the replication slot and publication on first run. Alternatively, create them manually:
-- Create publication for specific tables
CREATE PUBLICATION my_pub FOR TABLE public.orders, public.order_items;
-- Or for all tables
CREATE PUBLICATION my_pub FOR ALL TABLES;
-- Create replication slot
SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput');
Replica Identity
For complete before-images on UPDATE and DELETE operations, set tables to REPLICA IDENTITY FULL:
ALTER TABLE public.orders REPLICA IDENTITY FULL;
ALTER TABLE public.order_items REPLICA IDENTITY FULL;
Without this setting:
- FULL: Complete row data in before-images
- DEFAULT (primary key): Only primary key columns in before-images
- NOTHING: No before-images at all
DeltaForge warns at startup if tables don’t have REPLICA IDENTITY FULL.
Configuration
Set spec.source.type to postgres and provide a config object:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
id | string | Yes | — | Unique identifier for checkpoints and metrics |
dsn | string | Yes | — | PostgreSQL connection string |
slot | string | Yes | — | Replication slot name |
publication | string | Yes | — | Publication name |
tables | array | Yes | — | Table patterns to capture |
start_position | string/object | No | earliest | Where to start when no checkpoint exists |
DSN Formats
DeltaForge accepts both URL-style and key=value DSN formats:
# URL style
dsn: "postgres://user:pass@localhost:5432/mydb"
# Key=value style
dsn: "host=localhost port=5432 user=deltaforge password=pass dbname=mydb"
Table Patterns
The tables field supports flexible pattern matching:
tables:
- public.orders # exact match: schema "public", table "orders"
- public.order_% # LIKE pattern: tables starting with "order_"
- myschema.* # wildcard: all tables in "myschema"
- %.audit_log # cross-schema: "audit_log" table in any schema
- orders # defaults to public schema: "public.orders"
System schemas (pg_catalog, information_schema, pg_toast) are always excluded.
Start Position
Controls where replication begins when no checkpoint exists:
# Start from the earliest available position (slot's restart_lsn)
start_position: earliest
# Start from current WAL position (skip existing data)
start_position: latest
# Start from a specific LSN
start_position:
lsn: "0/16B6C50"
Example
source:
type: postgres
config:
id: orders-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_orders
publication: orders_pub
tables:
- public.orders
- public.order_items
start_position: earliest
Resume Behavior
DeltaForge checkpoints progress using PostgreSQL’s LSN (Log Sequence Number):
- With checkpoint: Resumes from the stored LSN
- Without checkpoint: Uses the slot’s
confirmed_flush_lsnorrestart_lsn - New slot: Starts from
pg_current_wal_lsn()or the configuredstart_position
Checkpoints are stored using the id field as the key.
Snapshot (Initial Load)
DeltaForge performs a consistent initial snapshot using PostgreSQL’s exported snapshot mechanism before starting logical replication.
How it works
A coordinator connection exports a snapshot and captures the current WAL LSN
in a single round trip. Worker connections each import the shared snapshot into
their own REPEATABLE READ transaction - all workers see the same consistent
DB state with no locks held on the source.
Tables with a single integer primary key use PK-range chunking. All others fall back to ctid page-range chunking.
Configuration
source:
type: postgres
config:
id: orders-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_orders
publication: orders_pub
tables:
- public.orders
snapshot:
mode: initial # initial | always | never (default: never)
max_parallel_tables: 8 # tables snapshotted concurrently
chunk_size: 10000 # rows per chunk for integer-PK tables
| Field | Default | Description |
|---|---|---|
mode | never | initial: run once if no checkpoint exists; always: re-snapshot on every restart; never: skip |
max_parallel_tables | 8 | Tables snapshotted concurrently |
chunk_size | 10000 | Rows per range chunk (integer PK tables only; others use ctid chunking) |
Snapshot events
Snapshot rows are emitted as Op::Read events (Debezium op: "r"),
distinguishable from live CDC Op::Create events. The WAL LSN captured at
snapshot time becomes the CDC resume point - no rows are missed or duplicated.
Resume after interruption
If the snapshot is interrupted, DeltaForge resumes at table granularity on the next restart - already-completed tables are skipped.
WAL slot retention safety
DeltaForge validates replication slot health before starting a snapshot and monitors it throughout. This prevents the slot from being invalidated during a long snapshot, which would make the captured LSN unreachable for CDC resume.
Preflight checks (before any rows are read):
- Fails hard if the slot does not exist or is already invalidated
- Fails hard if
wal_status=lost - Warns if
wal_status=unreserved(WAL retention no longer guaranteed) - Estimates WAL generated during snapshot (~2× data size) against
max_slot_wal_keep_size; warns at ≥50%, HIGH RISK at ≥80%
During snapshot:
- Background task polls
pg_replication_slotsevery 30s - Cancels immediately on slot invalidation or disappearance
- Warns but continues on
wal_status=unreserved
After all tables complete:
- Synchronous final check before writing
finished=true finished=truemeans the position is confirmed valid for CDC resume, not just that rows were emitted
If you see WAL retention risk warnings:
ALTER SYSTEM SET max_slot_wal_keep_size = '10GB';
SELECT pg_reload_conf();
Type Handling
DeltaForge preserves PostgreSQL’s native type semantics:
| PostgreSQL Type | JSON Representation |
|---|---|
boolean | true / false |
integer, bigint | JSON number |
real, double precision | JSON number |
numeric | JSON string (preserves precision) |
text, varchar | JSON string |
json, jsonb | Parsed JSON object/array |
bytea | {"_base64": "..."} |
uuid | JSON string |
timestamp, date, time | ISO 8601 string |
Arrays (int[], text[], etc.) | JSON array |
| TOAST unchanged | {"_unchanged": true} |
Event Format
Each captured row change produces an event with:
op:insert,update,delete, ortruncatebefore: Previous row state (updates and deletes, requires appropriate replica identity)after: New row state (inserts and updates)table: Fully qualified table name (schema.table)tx_id: PostgreSQL transaction ID (xid)checkpoint: LSN position for resumeschema_version: Schema fingerprintschema_sequence: Monotonic sequence for schema correlation
WAL Management
Logical replication slots prevent WAL segments from being recycled until the consumer confirms receipt. To avoid disk space issues:
- Monitor slot lag: Check
pg_replication_slots.restart_lsnvspg_current_wal_lsn() - Set retention limits: Configure
max_slot_wal_keep_size(PostgreSQL 13+) - Handle stale slots: Drop unused slots with
pg_drop_replication_slot('slot_name')
-- Check slot status and lag
SELECT slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;
Troubleshooting
Connection Issues
If you see authentication errors:
-- Verify user has replication privilege
SELECT rolname, rolreplication FROM pg_roles WHERE rolname = 'deltaforge';
-- Check pg_hba.conf allows replication connections
-- Ensure the line type includes "replication" database
Missing Before-Images
If UPDATE/DELETE events have incomplete before data:
-- Check current replica identity
SELECT relname, relreplident
FROM pg_class
WHERE relname = 'your_table';
-- d = default, n = nothing, f = full, i = index
-- Set to FULL for complete before-images
ALTER TABLE your_table REPLICA IDENTITY FULL;
Slot/Publication Not Found
-- List existing publications
SELECT * FROM pg_publication;
-- List existing slots
SELECT * FROM pg_replication_slots;
-- Create if missing
CREATE PUBLICATION my_pub FOR TABLE public.orders;
SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput');
WAL Disk Usage Growing
-- Check slot lag
SELECT slot_name,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;
-- If slot is inactive and not needed, drop it
SELECT pg_drop_replication_slot('unused_slot');
Logical Replication Not Enabled
-- Check wal_level
SHOW wal_level; -- Should be 'logical'
-- If not, update postgresql.conf and restart PostgreSQL
-- wal_level = logical
Turso Source
⚠️ STATUS: EXPERIMENTAL / PAUSED
The Turso source is not yet ready for production use. Native CDC in Turso/libSQL is still evolving and has limitations:
- CDC is per-connection (only changes from the enabling connection are captured)
- File locking prevents concurrent access
- sqld Docker image doesn’t have CDC support yet
This documentation is retained for reference. The code exists but is not officially supported.
The Turso source captures changes from Turso and libSQL databases. It supports multiple CDC modes to work with different database configurations and Turso versions.
CDC Modes
DeltaForge supports four CDC modes for Turso/libSQL:
| Mode | Description | Requirements |
|---|---|---|
native | Uses Turso’s built-in CDC via turso_cdc table | Turso v0.1.2+ with CDC enabled |
triggers | Shadow tables populated by database triggers | Standard SQLite/libSQL |
polling | Tracks changes via rowid comparison | Any SQLite/libSQL (inserts only) |
auto | Automatic fallback: native → triggers → polling | Any |
Native Mode
Native mode uses Turso’s built-in CDC capabilities. This is the most efficient mode when available.
Requirements:
- Turso database with CDC enabled
- Turso server v0.1.2 or later
How it works:
- Queries the
turso_cdcsystem table for changes - Uses
bin_record_json_object()to extract row data as JSON - Tracks position via change ID in checkpoints
Triggers Mode
Triggers mode uses shadow tables and database triggers to capture changes. This works with standard SQLite and libSQL without requiring native CDC support.
How it works:
- Creates shadow tables (
_df_cdc_{table}) for each tracked table - Installs INSERT/UPDATE/DELETE triggers that write to shadow tables
- Polls shadow tables for new change records
- Cleans up processed records periodically
Polling Mode
Polling mode uses rowid tracking to detect new rows. This is the simplest mode but only captures inserts (not updates or deletes).
How it works:
- Tracks the maximum rowid seen per table
- Queries for rows with rowid greater than last seen
- Emits insert events for new rows
Limitations:
- Only captures INSERT operations
- Cannot detect UPDATE or DELETE
- Requires tables to have accessible rowid (not WITHOUT ROWID tables)
Auto Mode
Auto mode tries each CDC mode in order and uses the first one that works:
- Try native mode (check for
turso_cdctable) - Try triggers mode (check for existing CDC triggers)
- Fall back to polling mode
This is useful for deployments where the database capabilities may vary.
Configuration
source:
type: turso
config:
id: turso-main
url: "libsql://your-db.turso.io"
auth_token: "${TURSO_AUTH_TOKEN}"
tables: ["users", "orders", "order_items"]
cdc_mode: auto
poll_interval_ms: 1000
native_cdc:
level: data
Configuration Options
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
id | string | Yes | — | Logical identifier for metrics and logging |
url | string | Yes | — | Database URL (libsql://, http://, or file path) |
auth_token | string | No | — | Authentication token for Turso cloud |
tables | array | Yes | — | Tables to track (supports wildcards) |
cdc_mode | string | No | auto | CDC mode: native, triggers, polling, auto |
poll_interval_ms | integer | No | 1000 | Polling interval in milliseconds |
native_cdc.level | string | No | data | Native CDC level: binlog or data |
Table Patterns
The tables field supports patterns:
tables:
- users # Exact match
- order% # Prefix match (order, orders, order_items)
- "*" # All tables (excluding system tables)
System tables and DeltaForge infrastructure tables are automatically excluded:
sqlite_*— SQLite system tables_df_*— DeltaForge CDC shadow tables_litestream*— Litestream replication tables_turso*— Turso internal tablesturso_cdc— Turso CDC system table
Native CDC Levels
When using native mode, you can choose the CDC level:
| Level | Description |
|---|---|
data | Only row data changes (default, more efficient) |
binlog | Full binlog-style events with additional metadata |
Examples
Local Development
source:
type: turso
config:
id: local-dev
url: "http://127.0.0.1:8080"
tables: ["users", "orders"]
cdc_mode: auto
poll_interval_ms: 500
Turso Cloud
source:
type: turso
config:
id: turso-prod
url: "libsql://mydb-myorg.turso.io"
auth_token: "${TURSO_AUTH_TOKEN}"
tables: ["*"]
cdc_mode: native
poll_interval_ms: 1000
SQLite File (Polling Only)
source:
type: turso
config:
id: sqlite-file
url: "file:./data/myapp.db"
tables: ["events", "audit_log"]
cdc_mode: polling
poll_interval_ms: 2000
Checkpoints
Turso checkpoints track position differently depending on the CDC mode:
{
"mode": "native",
"last_change_id": 12345,
"table_positions": {}
}
For polling mode, positions are tracked per-table:
{
"mode": "polling",
"last_change_id": null,
"table_positions": {
"users": 1000,
"orders": 2500
}
}
Schema Loading
The Turso source includes a schema loader that:
- Queries
PRAGMA table_info()for column metadata - Detects SQLite type affinities (INTEGER, TEXT, REAL, BLOB, NUMERIC)
- Identifies primary keys and autoincrement columns
- Handles WITHOUT ROWID tables
- Checks for existing CDC triggers
Schema information is available via the REST API at /pipelines/{name}/schemas.
Notes
- WITHOUT ROWID tables: Polling mode cannot track WITHOUT ROWID tables. Use triggers or native mode instead.
- Type affinity: SQLite uses type affinity rather than strict types. The schema loader maps declared types to SQLite affinities.
- Trigger cleanup: In triggers mode, processed change records are cleaned up automatically based on checkpoint position.
- Connection handling: The source maintains a single connection and reconnects automatically on failure.
Processors
Processors can transform, modify or take extra action per event batch between the source and sinks. They run in the order listed in spec.processors, and each receives the output of the previous one. A processor can modify events, filter them out, or add routing metadata.
Available Processors
| Processor | Description |
|---|---|
javascript | Custom transformations using V8-powered JavaScript |
outbox | Transactional outbox pattern - extracts payload, resolves topic, sets routing headers |
flatten | Flatten nested JSON objects into conmbined keys |
JavaScript
Run arbitrary JavaScript against each event batch. Uses the V8 engine via deno_core for near-native speed with configurable resource limits.
processors:
- type: javascript
id: enrich
inline: |
function processBatch(events) {
return events.map(e => {
e.tags = ["processed"];
return e;
});
}
limits:
cpu_ms: 50
mem_mb: 128
timeout_ms: 500
| Field | Type | Default | Description |
|---|---|---|---|
id | string | (required) | Processor identifier |
inline | string | (required) | JavaScript source code |
limits.cpu_ms | int | 50 | CPU time limit per batch |
limits.mem_mb | int | 128 | V8 heap memory limit |
limits.timeout_ms | int | 500 | Wall-clock timeout per batch |
Performance
JavaScript processing adds a fixed overhead per batch plus linear scaling per event. Benchmarks show ~70K+ events/sec throughput with typical transforms — roughly 61× slower than native Rust processors, but sufficient for most workloads. If you need higher throughput, keep transform logic simple or move heavy processing downstream.
Tips
- Return an empty array to drop all events in a batch.
- Return multiple copies of an event to fan out.
- JavaScript numbers are
f64— integer columns wider than 53 bits may lose precision. Use string representation for such values.
Outbox
The outbox processor transforms events captured by the outbox pattern into routed, sink-ready events. It extracts business fields from the raw outbox payload, resolves the destination topic, and passes through all non-outbox events unchanged.
processors:
- type: outbox
topic: "${aggregate_type}.${event_type}"
default_topic: "events.unrouted"
| Field | Type | Default | Description |
|---|---|---|---|
id | string | "outbox" | Processor identifier |
tables | array | [] | Only process outbox events matching these patterns. Empty = all. |
topic | string | — | Topic template with ${field} placeholders (resolved against raw payload columns) |
default_topic | string | — | Fallback when template resolution fails |
columns | object | (defaults below) | Field name mappings |
additional_headers | map | {} | Forward extra payload fields as routing headers. Key = header name, value = column name. |
raw_payload | bool | false | Deliver payload as-is, bypassing envelope wrapping |
Column Defaults
| Key | Default | Description |
|---|---|---|
payload | "payload" | Field containing the event body |
aggregate_type | "aggregate_type" | Domain aggregate type |
aggregate_id | "aggregate_id" | Aggregate identifier |
event_type | "event_type" | Domain event type |
topic | "topic" | Optional explicit topic override in payload |
See the Outbox Pattern guide for source configuration, complete examples, and multi-outbox routing.
Flatten
Flattens nested JSON objects in event payloads into top-level keys joined by a configurable separator. Works on every object-valued field present on the event without assuming any particular envelope structure - CDC before/after, outbox business payloads, or any custom fields introduced by upstream processors are all handled uniformly.
processors:
- type: flatten
id: flat
separator: "__"
max_depth: 3
on_collision: last
empty_object: preserve
lists: preserve
empty_list: drop
Input:
{
"after": {
"order_id": "abc",
"customer": {
"id": 1,
"address": { "city": "Berlin", "zip": "10115" }
},
"tags": ["vip"],
"meta": {}
}
}
Output (with defaults — separator: "__", empty_object: preserve, lists: preserve):
{
"after": {
"order_id": "abc",
"customer__id": 1,
"customer__address__city": "Berlin",
"customer__address__zip": "10115",
"tags": ["vip"],
"meta": {}
}
}
Configuration
| Field | Type | Default | Description |
|---|---|---|---|
id | string | "flatten" | Processor identifier |
separator | string | "__" | Separator inserted between path segments |
max_depth | int | unlimited | Stop recursing at this depth; objects at the boundary are kept as opaque leaves |
on_collision | string | last | What to do when two paths produce the same key. last, first, or error |
empty_object | string | preserve | How to handle {} values. preserve, drop, or null |
lists | string | preserve | How to handle array values. preserve (keep as-is) or index (expand to field__0, field__1, …) |
empty_list | string | preserve | How to handle [] values. preserve, drop, or null |
max_depth in practice
max_depth counts nesting levels from the top of the payload object. Objects at the boundary are treated as opaque leaves rather than expanded further, still subject to empty_object policy.
# max_depth: 2
depth 0: customer -> object, recurse
depth 1: customer__address -> object, recurse
depth 2: customer__address__geo -> STOP, kept as leaf {"lat": 52.5, "lng": 13.4}
Without max_depth, a deeply nested or recursive payload could produce a large number of keys. Setting a limit is recommended for payloads with variable or unknown nesting depth.
Collision policy
A collision occurs when two input paths produce the same flattened key - typically when a payload already contains a pre-flattened key (e.g. "a__b": 1) alongside a nested object ("a": {"b": 2}).
last- the last path to write a key wins (default, never fails)first- the first path to write a key wins, subsequent writes are ignorederror- the batch fails immediately, useful in strict pipelines where collisions indicate a schema problem
Working with outbox payloads
After the outbox processor runs, event.after holds the extracted business payload - there is no before. The flatten processor handles this naturally since it operates on whatever fields are present:
processors:
- type: outbox
topic: "${aggregate_type}.${event_type}"
- type: flatten
id: flat
separator: "."
empty_list: drop
Envelope interaction
The flatten processor runs on the raw Event struct before sink delivery. Envelope wrapping happens inside the sink, after all processors have run. This means the envelope always wraps already-flattened data, no special configuration needed.
Source → [flatten processor] → Sink (envelope → bytes)
All envelope formats work as expected:
// Native
{ "before": null, "after": { "customer__id": 1, "customer__address__city": "Berlin" }, "op": "c" }
// Debezium
{ "payload": { "before": null, "after": { "customer__id": 1, "customer__address__city": "Berlin" }, "op": "c" } }
// CloudEvents
{ "specversion": "1.0", ..., "data": { "before": null, "after": { "customer__id": 1, "customer__address__city": "Berlin" } } }
Outbox + raw_payload: true
When the outbox processor is configured with raw_payload: true, the sink delivers event.after directly, bypassing the envelope entirely. If the flatten processor runs after outbox, the raw payload delivered to the sink is the flattened object — which is the intended behavior for analytics sinks that can’t handle nested JSON.
processors:
- type: outbox
topic: "${aggregate_type}.${event_type}"
raw_payload: true # sink delivers event.after directly, no envelope
- type: flatten
id: flat
separator: "__"
empty_list: drop
The flatten processor runs second, so by the time the sink delivers the raw payload it is already flat.
Analytics sink example
When sending to column-oriented sinks (ClickHouse, BigQuery, S3 Parquet) that don’t handle nested JSON:
processors:
- type: flatten
id: flat
separator: "__"
lists: index # expand arrays to indexed columns
empty_object: drop # remove sparse marker objects
empty_list: drop # remove empty arrays
Processor Chain
Processors execute in order. Events flow through each processor sequentially:
Source → [Processor 1] → [Processor 2] → ... → Sinks
Each processor receives a Vec<Event> and returns a Vec<Event>. This means processors can:
- Transform: Modify event fields in place
- Filter: Return a subset of events (drop unwanted ones)
- Fan-out: Return more events than received
- Route: Set
event.routing.topicto override sink defaults - Enrich: Add headers via
event.routing.headers
Non-outbox events always pass through the outbox processor untouched, so it is safe to combine it with JavaScript processors in any order.
Adding Custom Processors
The processor interface is a simple trait:
#![allow(unused)]
fn main() {
#[async_trait]
pub trait Processor: Send + Sync {
fn id(&self) -> &str;
async fn process(&self, events: Vec<Event>) -> Result<Vec<Event>>;
}
}
To add a new processor:
- Implement the
Processortrait incrates/processors - Add a config variant to
ProcessorCfgindeltaforge-config - Register the build step in
build_processors()
Sinks
Sinks receive batches from the coordinator after processors run. Each sink lives under spec.sinks and can be marked as required or best-effort via the required flag. Checkpoint behavior is governed by the pipeline’s commit policy.
Envelope and Encoding
All sinks support configurable envelope formats and wire encodings. See the Envelopes and Encodings page for detailed documentation.
| Option | Values | Default | Description |
|---|---|---|---|
envelope | native, debezium, cloudevents | native | Output JSON structure |
encoding | json | json | Wire format |
Quick example:
sinks:
- type: kafka
config:
id: events-kafka
brokers: localhost:9092
topic: events
envelope:
type: cloudevents
type_prefix: "com.example.cdc"
encoding: json
Available Sinks
Multiple sinks in one pipeline
You can combine multiple sinks in one pipeline to fan out events to different destinations. However, multi-sink pipelines introduce complexity that requires careful consideration.
Why multiple sinks are challenging
Different performance characteristics: Kafka might handle 100K events/sec while a downstream HTTP webhook processes 100/sec. The slowest sink becomes the bottleneck for the entire pipeline.
Independent failure modes: Each sink can fail independently. Redis might be healthy while Kafka experiences broker failures. Without proper handling, a single sink failure could block the entire pipeline or cause data loss.
No distributed transactions: DeltaForge cannot atomically commit across heterogeneous systems. If Kafka succeeds but Redis fails mid-batch, you face a choice: retry Redis (risking duplicates in Kafka) or skip Redis (losing data there).
Checkpoint semantics: The checkpoint represents “how far we’ve processed from the source.” With multiple sinks, when is it safe to advance? After one sink succeeds? All of them? A majority?
Read the required and commit_policy sections below for options to manage these challenges.
The required flag
The required flag on each sink determines whether that sink must acknowledge successful delivery before the checkpoint advances:
sinks:
- type: kafka
config:
id: primary-kafka
required: true # Must succeed for checkpoint to advance
- type: redis
config:
id: cache-redis
required: false # Best-effort; failures don't block checkpoint
When required: true (default): The sink must acknowledge the batch before the checkpoint can advance. If this sink fails, the pipeline blocks and retries until it succeeds or the operator intervenes.
When required: false: The sink is best-effort. Failures are logged but don’t prevent the checkpoint from advancing. Use this for non-critical destinations where some data loss is acceptable.
Commit policy
The commit_policy works with the required flag to determine checkpoint behavior:
| Policy | Behavior |
|---|---|
all | Every sink (regardless of required flag) must acknowledge |
required | Only sinks with required: true must acknowledge (default) |
quorum | At least N sinks must acknowledge |
commit_policy:
mode: required # Only wait for required sinks
sinks:
- type: kafka
config:
required: true # Checkpoint waits for this
- type: redis
config:
required: false # Checkpoint doesn't wait for this
- type: nats
config:
required: true # Checkpoint waits for this
Practical patterns
Primary + secondary: One critical sink (Kafka for durability) marked required: true, with secondary sinks (Redis for caching, testing or experimentation) marked required: false.
Quorum for redundancy: Three sinks with commit_policy.mode: quorum and quorum: 2. Checkpoint advances when any two succeed, providing fault tolerance.
All-or-nothing: Use commit_policy.mode: all when every destination is critical and you need the strongest consistency guarantee (but affecting rate of delivery).
Multi-format fan-out
For sending the same events to different consumers that expect different formats:
sinks:
# Kafka Connect expects Debezium format
- type: kafka
config:
id: connect-sink
brokers: ${KAFKA_BROKERS}
topic: connect-events
envelope:
type: debezium
required: true
# Lambda expects CloudEvents
- type: kafka
config:
id: lambda-sink
brokers: ${KAFKA_BROKERS}
topic: lambda-events
envelope:
type: cloudevents
type_prefix: "com.acme.cdc"
required: false
# Analytics wants raw events
- type: redis
config:
id: analytics-redis
uri: ${REDIS_URI}
stream: analytics
envelope:
type: native
required: false
This allows each consumer to receive events in their preferred format without post-processing.
Redis sink
The Redis sink publishes events to a Redis Stream for real-time consumption with consumer groups.
When to use Redis
Redis Streams shine when you need low-latency event delivery with simple operational requirements and built-in consumer group support.
Real-world applications
| Use Case | Description |
|---|---|
| Real-time notifications | Push database changes instantly to WebSocket servers for live UI updates |
| Cache invalidation | Trigger cache eviction when source records change; keep Redis cache consistent |
| Session synchronization | Replicate user session changes across application instances in real-time |
| Rate limiting state | Stream counter updates for distributed rate limiting decisions |
| Live dashboards | Feed real-time metrics and KPIs to dashboard backends |
| Job queuing | Use CDC events to trigger background job processing with consumer groups |
| Feature flags | Propagate feature flag changes instantly across all application instances |
Pros and cons
| Pros | Cons |
|---|---|
| ✅ Ultra-low latency - Sub-millisecond publish; ideal for real-time apps | ❌ Memory-bound - All data in RAM; expensive for high-volume retention |
| ✅ Simple operations - Single binary, minimal configuration | ❌ Limited retention - Not designed for long-term event storage |
| ✅ Consumer groups - Built-in competing consumers with acknowledgements | ❌ Durability trade-offs - AOF/RDB persistence has limitations |
| ✅ Familiar tooling - redis-cli, widespread client library support | ❌ Single-threaded - CPU-bound for very high throughput |
| ✅ Versatile - Combine with caching, pub/sub, and data structures | ❌ No native replay - XRANGE exists but no offset management |
| ✅ Atomic operations - MULTI/EXEC for transactional guarantees | ❌ Cluster complexity - Sharding requires careful key design |
Configuration
|
|
Consuming events
Consumer groups (recommended)
# Create consumer group
redis-cli XGROUP CREATE orders.events mygroup $ MKSTREAM
# Read as consumer (blocking)
redis-cli XREADGROUP GROUP mygroup consumer1 BLOCK 5000 COUNT 10 STREAMS orders.events >
# Acknowledge processing
redis-cli XACK orders.events mygroup 1234567890123-0
# Check pending (unacknowledged) messages
redis-cli XPENDING orders.events mygroup
Simple subscription
# Read latest entries
redis-cli XREAD COUNT 10 STREAMS orders.events 0-0
# Block for new entries
redis-cli XREAD BLOCK 5000 STREAMS orders.events $
Go consumer example
import "github.com/redis/go-redis/v9"
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
// Create consumer group (once)
rdb.XGroupCreateMkStream(ctx, "orders.events", "mygroup", "0")
for {
streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "mygroup",
Consumer: "worker1",
Streams: []string{"orders.events", ">"},
Count: 10,
Block: 5 * time.Second,
}).Result()
if err != nil {
continue
}
for _, stream := range streams {
for _, msg := range stream.Messages {
var event Event
json.Unmarshal([]byte(msg.Values["event"].(string)), &event)
process(event)
rdb.XAck(ctx, "orders.events", "mygroup", msg.ID)
}
}
}
Rust consumer example
#![allow(unused)]
fn main() {
use redis::AsyncCommands;
let client = redis::Client::open("redis://localhost:6379")?;
let mut con = client.get_async_connection().await?;
// Create consumer group
let _: () = redis::cmd("XGROUP")
.arg("CREATE").arg("orders.events").arg("mygroup").arg("0").arg("MKSTREAM")
.query_async(&mut con).await.unwrap_or(());
loop {
let results: Vec<StreamReadReply> = con.xread_options(
&["orders.events"],
&[">"],
&StreamReadOptions::default()
.group("mygroup", "worker1")
.count(10)
.block(5000)
).await?;
for stream in results {
for msg in stream.ids {
let event: Event = serde_json::from_str(&msg.map["event"])?;
process(event);
con.xack("orders.events", "mygroup", &[&msg.id]).await?;
}
}
}
}
Python consumer example
import redis
import json
r = redis.Redis.from_url("redis://localhost:6379")
# Create consumer group (once)
try:
r.xgroup_create("orders.events", "mygroup", id="0", mkstream=True)
except redis.ResponseError:
pass # Group already exists
# Consume events
while True:
events = r.xreadgroup("mygroup", "worker1", {"orders.events": ">"}, count=10, block=5000)
for stream, messages in events:
for msg_id, data in messages:
event = json.loads(data[b"event"])
process(event)
r.xack("orders.events", "mygroup", msg_id)
Failure modes
| Failure | Symptoms | DeltaForge behavior | Resolution |
|---|---|---|---|
| Server unavailable | Connection refused | Retries with backoff; blocks checkpoint | Restore Redis; check network |
| Authentication failure | NOAUTH / WRONGPASS | Fails fast, no retry | Fix auth details in URI |
| OOM (Out of Memory) | OOM command not allowed | Fails batch; retries | Increase maxmemory; enable eviction or trim streams |
| Stream doesn’t exist | Auto-created by XADD | No failure | N/A (XADD creates stream) |
| Connection timeout | Command hangs | Timeout after configured duration | Check network; increase timeout |
| Cluster MOVED/ASK | Redirect errors | Automatic redirect (if cluster mode) | Ensure cluster client configured |
| Replication lag | Writes to replica fail | Fails with READONLY | Write to master only |
| Max stream length | If MAXLEN enforced | Oldest entries trimmed | Expected behavior; not a failure |
| Network partition | Intermittent timeouts | Retries; may have gaps | Restore network |
Failure scenarios and data guarantees
Redis OOM during batch delivery
- DeltaForge sends batch of 100 events via pipeline
- 50 events written, Redis hits maxmemory
- Pipeline fails atomically (all or nothing per pipeline)
- DeltaForge retries entire batch
- If OOM persists: batch blocked until memory available
- Checkpoint only saved after ALL events acknowledged
DeltaForge crash after XADD, before checkpoint
- Batch written to Redis stream successfully
- DeltaForge crashes before saving checkpoint
- On restart: replays from last checkpoint
- Result: Duplicate events in stream (at-least-once)
- Consumer must handle idempotently (check event.id)
Redis failover (Sentinel/Cluster)
- Master fails, Sentinel promotes replica
- In-flight XADD may fail with connection error
- DeltaForge reconnects to new master
- Retries failed batch
- Possible duplicates if original write succeeded
Handling duplicates in consumers
# Idempotent consumer using event ID
processed_ids = set() # Or use Redis SET for distributed dedup
for msg_id, data in messages:
event = json.loads(data[b"event"])
event_id = event["id"]
if event_id in processed_ids:
r.xack("orders.events", "mygroup", msg_id)
continue # Skip duplicate
process(event)
processed_ids.add(event_id)
r.xack("orders.events", "mygroup", msg_id)
Monitoring
DeltaForge exposes these metrics for Redis sink monitoring:
# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink} # Events delivered
deltaforge_sink_batch_total{pipeline,sink} # Batches delivered
deltaforge_sink_latency_seconds{pipeline,sink} # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"} # Stage timing
For Redis server visibility, use Redis’s built-in monitoring:
# Monitor commands in real-time
redis-cli MONITOR
# Get server stats
redis-cli INFO stats
# Check memory usage
redis-cli INFO memory
# Stream-specific info
redis-cli XINFO STREAM orders.events
redis-cli XINFO GROUPS orders.events
Stream management
# Check stream length
redis-cli XLEN orders.events
# Trim to last 10000 entries (approximate)
redis-cli XTRIM orders.events MAXLEN ~ 10000
# Trim to exact length
redis-cli XTRIM orders.events MAXLEN 10000
# View consumer group info
redis-cli XINFO GROUPS orders.events
# Check pending messages
redis-cli XPENDING orders.events mygroup
# Claim stuck messages (after 60 seconds)
redis-cli XCLAIM orders.events mygroup worker2 60000 <message-id>
# Delete processed messages (careful!)
redis-cli XDEL orders.events <message-id>
Notes
- Redis Streams provide at-least-once delivery with consumer group acknowledgements
- Use
MAXLEN ~trimming to prevent unbounded memory growth (approximate is faster) - Consider Redis Cluster for horizontal scaling with multiple streams
- Combine with Redis pub/sub for fan-out to ephemeral subscribers
- For durability, enable AOF persistence with
appendfsync everysecoralways - Monitor memory usage closely; Redis will reject writes when
maxmemoryis reached
Kafka sink
The Kafka sink publishes batches to a Kafka topic using rdkafka.
When to use Kafka
Kafka excels as the backbone for event-driven architectures where durability, ordering, and replay capabilities are critical.
Real-world applications
| Use Case | Description |
|---|---|
| Event sourcing | Store all state changes as an immutable log; rebuild application state by replaying events |
| Microservices integration | Decouple services with async messaging; each service consumes relevant topics |
| Real-time analytics pipelines | Feed CDC events to Spark, Flink, or ksqlDB for streaming transformations |
| Data lake ingestion | Stream database changes to S3/HDFS via Kafka Connect for analytics and ML |
| Audit logging | Capture every database mutation for compliance, debugging, and forensics |
| Cross-datacenter replication | Use MirrorMaker 2 to replicate topics across regions for DR |
Pros and cons
| Pros | Cons |
|---|---|
| ✅ Durability - Configurable replication ensures no data loss | ❌ Operational complexity - Requires ZooKeeper/KRaft, careful tuning |
| ✅ Ordering guarantees - Per-partition ordering with consumer groups | ❌ Latency - Batching and replication add milliseconds of delay |
| ✅ Replay capability - Configurable retention allows reprocessing | ❌ Resource intensive - High disk I/O and memory requirements |
| ✅ Ecosystem - Connect, Streams, Schema Registry, ksqlDB | ❌ Learning curve - Partitioning, offsets, consumer groups to master |
| ✅ Throughput - Handles millions of messages per second | ❌ Cold start - Cluster setup and topic configuration overhead |
| ✅ Exactly-once semantics - Transactions for critical workloads | ❌ Cost - Managed services can be expensive at scale |
Configuration
|
|
Recommended client_conf settings
| Setting | Recommended | Description |
|---|---|---|
acks | all | Wait for all replicas for durability |
message.timeout.ms | 30000 | Total time to deliver a message |
retries | 2147483647 | Retry indefinitely (with backoff) |
enable.idempotence | true | Prevent duplicates on retry |
compression.type | lz4 | Balance between CPU and bandwidth |
Consuming events
Kafka CLI
# Consume from beginning
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--from-beginning
# Consume with consumer group
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--group deltaforge-consumers
Go consumer example
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest
group, _ := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
for {
err := group.Consume(ctx, []string{"orders"}, &handler{})
if err != nil {
log.Printf("Consumer error: %v", err)
}
}
type handler struct{}
func (h *handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
var event Event
json.Unmarshal(msg.Value, &event)
process(event)
session.MarkMessage(msg, "")
}
return nil
}
Rust consumer example
#![allow(unused)]
fn main() {
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", "my-group")
.set("auto.offset.reset", "earliest")
.create()?;
consumer.subscribe(&["orders"])?;
loop {
match consumer.recv().await {
Ok(msg) => {
let payload = msg.payload_view::<str>().unwrap()?;
let event: Event = serde_json::from_str(payload)?;
process(event);
consumer.commit_message(&msg, CommitMode::Async)?;
}
Err(e) => eprintln!("Kafka error: {}", e),
}
}
}
Python consumer example
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
event = message.value
process(event)
consumer.commit()
Failure modes
| Failure | Symptoms | DeltaForge behavior | Resolution |
|---|---|---|---|
| Broker unavailable | Connection refused, timeout | Retries with backoff; blocks checkpoint | Restore broker; check network |
| Topic not found | UnknownTopicOrPartition | Fails batch; retries | Create topic or enable auto-create |
| Authentication failure | SaslAuthenticationFailed | Fails fast, no retry | Fix credentials in config |
| Authorization failure | TopicAuthorizationFailed | Fails fast, no retry | Grant ACLs for producer |
| Message too large | MessageSizeTooLarge | Fails message permanently | Increase message.max.bytes or filter large events |
| Leader election | NotLeaderForPartition | Automatic retry after metadata refresh | Wait for election; usually transient |
| Disk full | KafkaStorageException | Retries indefinitely | Add disk space; purge old segments |
| Network partition | Timeouts, partial failures | Retries; may produce duplicates | Restore network; idempotence prevents dups |
| SSL/TLS errors | Handshake failures | Fails fast | Fix certificates, verify truststore |
Failure scenarios and data guarantees
Broker failure during batch delivery
- DeltaForge sends batch of 100 events
- 50 events delivered, broker crashes
- rdkafka detects failure, retries remaining 50
- If idempotence enabled: no duplicates
- If not: possible duplicates of events near failure point
- Checkpoint only saved after ALL events acknowledged
DeltaForge crash after Kafka ack, before checkpoint
- Batch delivered to Kafka successfully
- DeltaForge crashes before saving checkpoint
- On restart: replays from last checkpoint
- Result: Duplicate events in Kafka (at-least-once)
- Consumer must handle idempotently
Monitoring recommendations
DeltaForge exposes these metrics for Kafka sink monitoring:
# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink} # Events delivered
deltaforge_sink_batch_total{pipeline,sink} # Batches delivered
deltaforge_sink_latency_seconds{pipeline,sink} # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"} # Stage timing
For deeper Kafka broker visibility, monitor your Kafka cluster directly:
- Broker metrics via JMX or Kafka’s built-in metrics
- Consumer lag via
kafka-consumer-groups.sh - Topic throughput via broker dashboards
Note: Internal
rdkafkaproducer statistics (message queues, broker RTT, etc.) are not currently exposed by DeltaForge. This is a potential future enhancement.
Notes
- Combine Kafka with other sinks to fan out data; use commit policy to control checkpoint behavior
- For exactly-once semantics, ensure your Kafka cluster supports transactions (2.5+)
- Adjust
client_conffor durability (acks=all) or performance based on your requirements - Consider partitioning strategy for ordering guarantees within partitions
- Enable
enable.idempotence=trueto prevent duplicates during retries
NATS sink
The NATS sink publishes events to a NATS JetStream stream for durable, at-least-once delivery.
When to use NATS
NATS JetStream is ideal when you need a lightweight, high-performance messaging system with persistence, without the operational overhead of Kafka.
Real-world applications
| Use Case | Description |
|---|---|
| Edge computing | Lightweight footprint perfect for IoT gateways and edge nodes syncing to cloud |
| Microservices mesh | Request-reply and pub/sub patterns with automatic load balancing |
| Multi-cloud sync | Leaf nodes and superclusters for seamless cross-cloud data replication |
| Kubernetes-native events | NATS Operator for cloud-native deployment; sidecar-friendly architecture |
| Real-time gaming | Low-latency state synchronization for multiplayer game servers |
| Financial data feeds | Stream market data with subject-based routing and wildcards |
| Command and control | Distribute configuration changes and commands to distributed systems |
Pros and cons
| Pros | Cons |
|---|---|
| ✅ Lightweight - Single binary ~20MB; minimal resource footprint | ❌ Smaller ecosystem - Fewer connectors and integrations than Kafka |
| ✅ Simple operations - Zero external dependencies; easy clustering | ❌ Younger persistence - JetStream newer than Kafka’s battle-tested log |
| ✅ Low latency - Sub-millisecond message delivery | ❌ Community size - Smaller community than Kafka or Redis |
| ✅ Flexible patterns - Pub/sub, queues, request-reply, streams | ❌ Tooling maturity - Fewer monitoring and management tools |
✅ Subject hierarchy - Powerful wildcard routing (orders.>, *.events) | ❌ Learning curve - JetStream concepts differ from traditional queues |
| ✅ Multi-tenancy - Built-in accounts and security isolation | ❌ Less enterprise adoption - Fewer case studies at massive scale |
| ✅ Cloud-native - Designed for Kubernetes and distributed systems |
Configuration
|
|
Authentication options
# Credentials file
credentials_file: /etc/nats/creds/user.creds
# Username/password
username: ${NATS_USER}
password: ${NATS_PASSWORD}
# Token
token: ${NATS_TOKEN}
JetStream setup
Before using the NATS sink with JetStream, create a stream that captures your subject:
# Using NATS CLI
nats stream add ORDERS \
--subjects "orders.>" \
--retention limits \
--storage file \
--replicas 3 \
--max-age 7d
# Verify stream
nats stream info ORDERS
Consuming events
NATS CLI
# Subscribe to subject (ephemeral)
nats sub "orders.>"
# Create durable consumer
nats consumer add ORDERS orders-processor \
--pull \
--ack explicit \
--deliver all \
--max-deliver 3 \
--filter "orders.events"
# Consume messages
nats consumer next ORDERS orders-processor --count 10
Go consumer example
nc, _ := nats.Connect("nats://localhost:4222")
js, _ := nc.JetStream()
// Create or bind to consumer
sub, _ := js.PullSubscribe("orders.events", "orders-processor",
nats.Durable("orders-processor"),
nats.AckExplicit(),
)
for {
msgs, _ := sub.Fetch(10, nats.MaxWait(5*time.Second))
for _, msg := range msgs {
var event Event
json.Unmarshal(msg.Data, &event)
process(event)
msg.Ack()
}
}
Rust consumer example
#![allow(unused)]
fn main() {
use async_nats::jetstream;
let client = async_nats::connect("nats://localhost:4222").await?;
let js = jetstream::new(client);
let stream = js.get_stream("ORDERS").await?;
let consumer = stream.get_consumer("orders-processor").await?;
let mut messages = consumer.messages().await?;
while let Some(msg) = messages.next().await {
let msg = msg?;
let event: Event = serde_json::from_slice(&msg.payload)?;
process(event);
msg.ack().await?;
}
}
Monitoring
DeltaForge exposes these metrics for NATS sink monitoring:
# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink} # Events delivered
deltaforge_sink_batch_total{pipeline,sink} # Batches delivered
deltaforge_sink_latency_seconds{pipeline,sink} # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"} # Stage timing
For NATS server visibility, use the NATS CLI or monitoring endpoint:
# Server info
nats server info
# JetStream account info
nats account info
# Stream statistics
nats stream info ORDERS
# Consumer statistics
nats consumer info ORDERS orders-processor
# Real-time event monitoring
nats events
NATS also exposes a monitoring endpoint (default :8222) with JSON stats:
http://localhost:8222/varz- General server statshttp://localhost:8222/jsz- JetStream statshttp://localhost:8222/connz- Connection stats
Subject design patterns
| Pattern | Example | Use Case |
|---|---|---|
| Hierarchical | orders.us.created | Regional routing |
| Wildcard single | orders.*.created | Any region, specific event |
| Wildcard multi | orders.> | All order events |
| Versioned | v1.orders.events | API versioning |
Failure modes
| Failure | Symptoms | DeltaForge behavior | Resolution |
|---|---|---|---|
| Server unavailable | Connection refused | Retries with backoff; blocks checkpoint | Restore NATS; check network |
| Stream not found | stream not found error | Fails batch; no retry | Create stream or remove stream config |
| Authentication failure | authorization violation | Fails fast, no retry | Fix credentials |
| Subject mismatch | no responders (core NATS) | Fails if no subscribers | Add subscribers or use JetStream |
| JetStream disabled | jetstream not enabled | Fails fast | Enable JetStream on server |
| Storage full | insufficient resources | Retries; eventually fails | Add storage; adjust retention |
| Message too large | message size exceeds maximum | Fails message permanently | Increase max_payload or filter large events |
| Cluster partition | Intermittent failures | Retries with backoff | Restore network; wait for quorum |
| Slow consumer | Publish backpressure | Slows down; may timeout | Scale consumers; increase buffer |
| TLS errors | Handshake failures | Fails fast | Fix certificates |
Failure scenarios and data guarantees
NATS server restart during batch delivery
- DeltaForge sends batch of 100 events
- 50 events published, server restarts
- async_nats detects disconnect, starts reconnecting
- After reconnect, DeltaForge retries remaining 50
- JetStream deduplication prevents duplicates (if enabled)
- Checkpoint only saved after ALL events acknowledged
DeltaForge crash after JetStream ack, before checkpoint
- Batch published to JetStream successfully
- DeltaForge crashes before saving checkpoint
- On restart: replays from last checkpoint
- Result: Duplicate events in stream (at-least-once)
- Consumer must handle idempotently (check event.id)
Stream storage exhausted
- JetStream stream hits max_bytes or max_msgs limit
- With
discard: old→ oldest messages removed, publish succeeds - With
discard: new→ publish rejected - DeltaForge retries on rejection
- Resolution: Increase limits or enable
discard: old
JetStream acknowledgement levels
# Stream configuration affects durability
nats stream add ORDERS \
--replicas 3 \ # R=3 for production
--retention limits \ # or 'workqueue' for single consumer
--discard old \ # Remove oldest when full
--max-age 7d \ # Auto-expire after 7 days
--storage file # Persistent (vs memory)
| Replicas | Guarantee | Use Case |
|---|---|---|
| R=1 | Single node; lost if node fails | Development, non-critical |
| R=3 | Survives 1 node failure | Production default |
| R=5 | Survives 2 node failures | Critical data |
Handling duplicates in consumers
// Use event ID for idempotency
processedIDs := make(map[string]bool) // Or use Redis/DB
for _, msg := range msgs {
var event Event
json.Unmarshal(msg.Data, &event)
if processedIDs[event.ID] {
msg.Ack() // Already processed
continue
}
if err := process(event); err == nil {
processedIDs[event.ID] = true
}
msg.Ack()
}
Notes
- When
streamis specified, the sink verifies the stream exists at connection time - Without
stream, events are published to core NATS (no persistence guarantees) - Connection pooling ensures efficient reuse across batches
- Use replicated streams (
--replicas 3) for production durability - Combine with other sinks to fan out data; use commit policy to control checkpoint behavior
- JetStream provides exactly-once semantics when combined with message deduplication
Envelopes and Encodings
DeltaForge supports configurable envelope formats and wire encodings for sink output. This allows you to match the output format expected by your downstream consumers without forcing code changes on them.
Overview
Every CDC event flows through two stages before being written to a sink:
Event -> Envelope (structure) -> Encoding (bytes) -> Sink
- Envelope: Controls the JSON structure of the output (what fields exist, how they’re nested)
- Encoding: Controls the wire format (JSON bytes, future: Avro, Protobuf)
Envelope Formats
Native (default)
The native envelope serializes events directly with minimal overhead. This is DeltaForge’s own format, optimized for efficiency and practical use cases.
Note
The native envelope format may evolve over time as we adapt to user needs and optimize for the lowest possible overhead. If you need a stable, standardized format, consider using
debeziumorcloudeventsenvelopes which follow their respective established specifications.
sinks:
- type: kafka
config:
id: events-kafka
brokers: localhost:9092
topic: events
envelope:
type: native
Output:
{
"before": null,
"after": {"id": 1, "name": "Alice", "email": "alice@example.com"},
"source": {
"version": "0.1.0",
"connector": "mysql",
"name": "orders-db",
"ts_ms": 1700000000000,
"db": "shop",
"table": "customers",
"server_id": 1,
"file": "mysql-bin.000003",
"pos": 12345
},
"op": "c",
"ts_ms": 1700000000000
}
When to use:
- Maximum performance with lowest overhead
- Custom consumers that parse the payload directly
- When format stability is less important than efficiency
- Internal systems where you control both producer and consumer
Debezium
The Debezium envelope wraps the event in a {"schema": null, "payload": ...} structure,
following the Debezium event format specification.
This uses schemaless mode (schema: null), which is equivalent to Debezium’s
JsonConverter with schemas.enable=false. This is the recommended configuration
for most production deployments as it avoids the overhead of inline schemas.
sinks:
- type: kafka
config:
id: events-kafka
brokers: localhost:9092
topic: events
envelope:
type: debezium
Output:
{
"schema": null,
"payload": {
"before": null,
"after": {"id": 1, "name": "Alice", "email": "alice@example.com"},
"source": {
"version": "0.1.0",
"connector": "mysql",
"name": "orders-db",
"ts_ms": 1700000000000,
"db": "shop",
"table": "customers"
},
"op": "c",
"ts_ms": 1700000000000
}
}
When to use:
- Kafka Connect consumers expecting full Debezium format
- Existing Debezium-based pipelines you’re migrating from
- Tools that specifically parse the
payloadwrapper - When you need a stable, well-documented format with broad ecosystem support
Note: For Schema Registry integration with Avro encoding (planned), schema handling will move to the encoding layer where schema IDs are embedded in the wire format.
CloudEvents
The CloudEvents envelope restructures events to the CloudEvents 1.0 specification, a CNCF project that defines a vendor-neutral format for event data. This format strictly follows the CloudEvents spec and is guaranteed to remain compliant.
sinks:
- type: kafka
config:
id: events-kafka
brokers: localhost:9092
topic: events
envelope:
type: cloudevents
type_prefix: "com.example.cdc"
Output:
{
"specversion": "1.0",
"id": "550e8400-e29b-41d4-a716-446655440000",
"source": "deltaforge/orders-db/shop.customers",
"type": "com.example.cdc.created",
"time": "2024-01-15T10:30:00.000Z",
"datacontenttype": "application/json",
"subject": "shop.customers",
"data": {
"before": null,
"after": {"id": 1, "name": "Alice", "email": "alice@example.com"},
"op": "c"
}
}
The type field is constructed from your type_prefix plus the operation:
com.example.cdc.created(INSERT)com.example.cdc.updated(UPDATE)com.example.cdc.deleted(DELETE)com.example.cdc.snapshot(READ/snapshot)com.example.cdc.truncated(TRUNCATE)
When to use:
- AWS EventBridge, Azure Event Grid, or other CloudEvents-native platforms
- Serverless architectures (Lambda, Cloud Functions)
- Event-driven microservices using CloudEvents SDKs
- Standardized event routing based on
typefield - When you need a vendor-neutral, CNCF-backed standard format
Wire Encodings
JSON (default)
Standard UTF-8 JSON encoding. Human-readable and widely supported.
sinks:
- type: kafka
config:
id: events-kafka
brokers: localhost:9092
topic: events
encoding: json
Content-Type: application/json
When to use:
- Development and debugging
- Consumers that expect JSON
- When human readability matters
- Most use cases (good default)
Future: Avro
Coming soon: Avro encoding with Schema Registry integration for compact binary serialization and schema evolution support.
# Future configuration (not yet implemented)
sinks:
- type: kafka
config:
id: events-kafka
brokers: localhost:9092
topic: events
encoding:
type: avro
schema_registry: http://schema-registry:8081
Configuration Examples
Kafka with CloudEvents
sinks:
- type: kafka
config:
id: orders-kafka
brokers: ${KAFKA_BROKERS}
topic: order-events
envelope:
type: cloudevents
type_prefix: "com.acme.orders"
encoding: json
required: true
Redis with Debezium envelope
sinks:
- type: redis
config:
id: orders-redis
uri: ${REDIS_URI}
stream: orders
envelope:
type: debezium
encoding: json
NATS with native envelope
sinks:
- type: nats
config:
id: orders-nats
url: ${NATS_URL}
subject: orders.events
stream: ORDERS
envelope:
type: native
encoding: json
Multi-sink with different formats
Different consumers may expect different formats. Configure each sink independently:
sinks:
# Kafka Connect expects Debezium format
- type: kafka
config:
id: connect-sink
brokers: ${KAFKA_BROKERS}
topic: connect-events
envelope:
type: debezium
required: true
# Lambda expects CloudEvents
- type: kafka
config:
id: lambda-sink
brokers: ${KAFKA_BROKERS}
topic: lambda-events
envelope:
type: cloudevents
type_prefix: "com.acme.cdc"
required: false
# Analytics wants raw events
- type: redis
config:
id: analytics-redis
uri: ${REDIS_URI}
stream: analytics
envelope:
type: native
Operation Mapping
DeltaForge uses Debezium-compatible operation codes:
| Operation | Code | Description |
|---|---|---|
| Create/Insert | c | New row inserted |
| Update | u | Existing row modified |
| Delete | d | Row deleted |
| Read | r | Snapshot read (initial load) |
| Truncate | t | Table truncated |
These codes appear in the op field regardless of envelope format.
Performance Considerations
| Envelope | Overhead | Format Stability | Use Case |
|---|---|---|---|
| Native | Baseline (minimal) | May evolve | High-throughput, internal systems |
| Debezium | ~14 bytes | Stable (follows Debezium spec) | Kafka Connect, Debezium ecosystem |
| CloudEvents | ~150-200 bytes | Stable (follows CNCF spec) | Serverless, event-driven architectures |
The native envelope is recommended for maximum throughput when you control both ends of the pipeline. For interoperability with external systems or when format stability is critical, use debezium or cloudevents.
Defaults
If not specified, sinks use:
- Envelope:
native - Encoding:
json
The native envelope provides the lowest overhead for high-throughput scenarios. If you need format stability guarantees, use debezium or cloudevents which adhere to their respective established specifications.
Dynamic Routing
Dynamic routing controls where each CDC event is delivered - which Kafka topic, Redis stream, or NATS subject receives it. By default, all events go to the single destination configured in the sink (static routing). With dynamic routing, events can be split across destinations based on their content or other attributes of events, pipeline and etc.
Overview
There are two routing mechanisms, and they compose naturally:
- Template strings in sink config - resolve per-event from event fields
- JavaScript
ev.route()- programmatic per-event routing in processors
When both are used, ev.route() overrides take highest priority, then template resolution, then the static config value.
Template Routing
Replace static topic/stream/subject strings with templates containing ${...} variables. Templates are compiled once at startup and resolved per-event with zero regex overhead.
Kafka
sinks:
- type: kafka
config:
id: kafka-routed
brokers: ${KAFKA_BROKERS}
topic: "cdc.${source.db}.${source.table}"
key: "${after.customer_id}"
envelope:
type: debezium
Events from shop.orders -> topic cdc.shop.orders, partitioned by customer_id.
Redis
sinks:
- type: redis
config:
id: redis-routed
uri: ${REDIS_URI}
stream: "events:${source.table}"
key: "${after.id}"
Events from orders -> stream events:orders. The key value appears as the df-key field in each stream entry.
NATS
sinks:
- type: nats
config:
id: nats-routed
url: ${NATS_URL}
subject: "cdc.${source.db}.${source.table}"
key: "${after.id}"
stream: CDC
Events from shop.orders -> subject cdc.shop.orders. The key value appears as the df-key NATS header.
Available Variables
| Variable | Description | Example value |
|---|---|---|
${source.table} | Table name | orders |
${source.db} | Database name | shop |
${source.schema} | Schema name (PostgreSQL) | public |
${source.connector} | Source type | mysql |
${op} | Operation code | c, u, d, r, t |
${after.<field>} | Field from after image | 42, cust-abc |
${before.<field>} | Field from before image | old-value |
${tenant_id} | Pipeline tenant ID | acme |
Missing fields resolve to an empty string. A warning is logged once per unique template, not per event.
Static strings (no ${...}) are detected at parse time and have zero overhead on the hot path - no allocation, no resolution.
Env Vars vs Templates
Both use ${...} syntax. The config loader expands environment variables first. Unknown variables pass through as templates for runtime resolution:
brokers: ${KAFKA_BROKERS} # env var - expanded at load time
topic: "cdc.${source.table}" # template - passed through to runtime
key: "${after.customer_id}" # template - resolved per-event
JavaScript Routing
For routing logic that goes beyond field substitution, use ev.route() in a JavaScript processor. This lets you make conditional routing decisions based on event content.
processors:
- type: javascript
id: smart-router
inline: |
function processBatch(events) {
for (const ev of events) {
if (!ev.after) continue;
if (ev.after.total_amount > 10000) {
ev.route({
topic: "orders.priority",
key: String(ev.after.customer_id),
headers: {
"x-tier": "high-value",
"x-amount": String(ev.after.total_amount)
}
});
} else {
ev.route({
topic: "orders.standard",
key: String(ev.after.customer_id)
});
}
}
return events;
}
ev.route() fields
| Field | Type | Description |
|---|---|---|
topic | string | Override destination (topic, stream, or subject) |
key | string | Override message/partition key |
headers | object | Key-value pairs added to the message |
All fields are optional. Only set fields override; omitted fields fall through to config templates or static values.
Calling ev.route() replaces any previous routing on that event - it does not merge.
How headers are delivered
| Sink | Key delivery | Header delivery |
|---|---|---|
| Kafka | Kafka message key | Kafka message headers |
| Redis | df-key field in stream entry | df-headers field (JSON) |
| NATS | df-key NATS header | Individual NATS headers |
Resolution Order
For each event, the destination is resolved in priority order:
ev.route() override → config template → static config value
Specifically:
- If the event has
routing.topicset (viaev.route()or programmatically), use it - If the sink config contains a template (has
${...}), resolve it from event fields - Otherwise, use the static config string
The same order applies independently to key and headers.
Examples
See the complete example configurations:
- Dynamic Routing - template-based routing across Kafka, Redis, and NATS
- JavaScript Routing - conditional routing with
ev.route()based on business logic
Related
- Configuration Reference — full sink config fields
- JavaScript Processors — processor API reference
- Sinks Overview — multi-sink patterns and commit policies
Outbox Pattern
The transactional outbox pattern guarantees that domain events are published whenever the corresponding database change is committed - no two-phase commit required. DeltaForge supports this natively for both MySQL and PostgreSQL with zero application-side polling.
How It Works
┌─────────────────┐ ┌──────────────┐ ┌──────────────────┐ ┌──────┐
│ Application │────>│ Database │────>│ DeltaForge │────>│ Sink │
│ (writes data │ │ (outbox │ │ (captures + │ │ │
│ + outbox msg) │ │ table/WAL) │ │ transforms) │ │ │
└─────────────────┘ └──────────────┘ └──────────────────┘ └──────┘
- Your application writes business data and an outbox message in the same transaction.
- DeltaForge captures the outbox event through the database’s native replication stream.
- The
OutboxProcessorextracts the payload, resolves the destination topic, and sets routing headers. - The transformed event flows to sinks like any other CDC event.
Because the outbox write is part of the application transaction, the event is guaranteed to be published if and only if the transaction commits.
Source Configuration
Each database uses its native mechanism — there is nothing to install or poll.
PostgreSQL — WAL Messages
PostgreSQL uses pg_logical_emit_message() to write messages directly into the WAL. No table is needed.
-- In your application transaction:
BEGIN;
INSERT INTO orders (id, total) VALUES (42, 99.99);
SELECT pg_logical_emit_message(
true, -- transactional: tied to the enclosing TX
'outbox', -- prefix: matched by DeltaForge
'{"aggregate_type":"Order","aggregate_id":"42","event_type":"OrderCreated","payload":{"total":99.99}}'
);
COMMIT;
Source config:
source:
type: postgres
config:
id: orders-pg
dsn: ${POSTGRES_DSN}
slot: deltaforge_orders
publication: orders_pub
tables: [public.orders]
outbox:
prefixes: [outbox]
| Field | Type | Description |
|---|---|---|
outbox.prefixes | array | WAL message prefixes to capture. Supports glob patterns: outbox (exact), outbox_% (prefix match), * (all). |
The message prefix becomes source.table on the event, so processors can filter by prefix when multiple outbox channels share a pipeline.
MySQL — Table Inserts
MySQL uses a regular table. For production, use the BLACKHOLE storage engine so rows are written to the binlog but never stored on disk.
-- Create outbox table (BLACKHOLE = no disk storage, binlog only)
CREATE TABLE outbox (
id INT AUTO_INCREMENT PRIMARY KEY,
aggregate_type VARCHAR(64),
aggregate_id VARCHAR(64),
event_type VARCHAR(64),
payload JSON
) ENGINE=BLACKHOLE;
-- In your application transaction:
BEGIN;
INSERT INTO orders (id, total) VALUES (42, 99.99);
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES ('Order', '42', 'OrderCreated', '{"total": 99.99}');
COMMIT;
Source config:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.outbox
outbox:
tables: ["shop.outbox"]
| Field | Type | Description |
|---|---|---|
outbox.tables | array | Table patterns to tag as outbox events. Supports globs: shop.outbox (exact), *.outbox (any database), shop.outbox_% (prefix). |
Note: The outbox table must be included in the source’s
tableslist so DeltaForge subscribes to its binlog events. Only INSERTs are captured - UPDATE and DELETE on the outbox table are ignored.
Outbox Processor
The OutboxProcessor transforms raw outbox events into routed, sink-ready events. Add it to your processors list:
processors:
- type: outbox
topic: "${aggregate_type}.${event_type}"
default_topic: "events.unrouted"
| Field | Type | Default | Description |
|---|---|---|---|
id | string | "outbox" | Processor identifier |
tables | array | [] | Filter: only process outbox events matching these patterns. Empty = all outbox events. |
topic | string | - | Topic template resolved against the raw payload using ${field} placeholders |
default_topic | string | - | Fallback topic when template resolution fails and no topic column exists |
columns | object | (see below) | Column name mappings for extracting outbox fields |
additional_headers | map | {} | Forward extra payload fields as routing headers. Key = header name, value = column name. |
raw_payload | bool | false | When true, deliver the extracted payload as-is to sinks, bypassing envelope wrapping (native/debezium/cloudevents). Metadata is still available via routing headers. |
key | string | - | Key template resolved against raw payload. Sets routing.key for sink partitioning. Default: aggregate_id value. |
strict | bool | false | When true, fail the batch if required fields are missing (topic, payload, aggregate_type, aggregate_id, event_type). When false, missing fields are silently skipped. |
Column Mappings
Column mappings control header extraction and payload rewriting - they tell the processor which fields correspond to aggregate_type, aggregate_id, etc. for setting df-* headers. The topic template resolves directly against the raw payload, so you reference your actual column names there.
| Column | Default | Header | Description |
|---|---|---|---|
payload | "payload" | - | Event body. Extracted and promoted to event.after. |
aggregate_type | "aggregate_type" | df-aggregate-type | Aggregate root type (e.g. Order). |
aggregate_id | "aggregate_id" | df-aggregate-id | Aggregate root ID. Also used as default routing key. |
event_type | "event_type" | df-event-type | Domain event type (e.g. OrderCreated). |
topic | "topic" | - | Per-row topic override (used when template is absent). |
event_id | "id" | df-event-id | Event identity for idempotency/dedup. |
If your outbox payload uses non-default field names, override them:
processors:
- type: outbox
topic: "${aggregate_type}.${event_type}"
columns:
payload: data # default: "payload"
aggregate_type: type # default: "aggregate_type"
aggregate_id: key # default: "aggregate_id"
event_type: action # default: "event_type"
topic: destination # default: "topic"
event_id: uuid # default: "id"
Additional Headers
Forward arbitrary payload fields as routing headers. This is useful when migrating from Debezium’s table.fields.additional.placement or when downstream consumers need extra metadata:
processors:
- type: outbox
topic: "${aggregate_type}.${event_type}"
additional_headers:
x-trace-id: trace_id
x-correlation-id: correlation_id
x-source-region: region
Each key becomes a header name, each value is the column name in the outbox payload. Missing columns are silently skipped - no error if a row doesn’t contain the field.
Typed Extraction
Header values are extracted as strings regardless of the source JSON type. Numeric IDs, booleans, and string values are all stringified automatically:
| JSON value | Header value |
|---|---|
"abc-123" | abc-123 |
42 | 42 |
true | true |
null / missing | (skipped) |
{} / [] | (skipped) |
What the Processor Does
- Identifies outbox events by the
__outboxsentinel onsource.schema(set by the source). - Extracts
aggregate_type,aggregate_id,event_type, andpayloadfromevent.after. - Resolves the topic using a three-step cascade:
- Template resolved against the raw payload (e.g.
${domain}.${action}— use your actual column names) - Column value (a
topicfield in the payload, configurable viacolumns.topic) default_topicfallback
- Template resolved against the raw payload (e.g.
- Rewrites
event.afterto just thepayloadcontent. - Sets routing headers:
df-event-id,df-aggregate-type,df-aggregate-id,df-event-type, plus anyadditional_headersmappings. - Sets routing key using the key template (or falls back to
aggregate_id). - Marks raw delivery if
raw_payload: true— sinks serializeevent.afterdirectly, skipping envelope wrapping. - Clears the
__outboxsentinel so the event looks like a normal CDC event to sinks. - Drops non-INSERT outbox events (UPDATE/DELETE on the outbox table are meaningless).
- Validates in strict mode (
strict: true): fails the batch with an error if any required field is missing (topic, payload, aggregate_type, aggregate_id, event_type). The error names the missing fields so operators can fix the schema. In lenient mode (default), missing fields are silently skipped. - Passes through all non-outbox events unchanged.
Multi-Outbox Routing
When a source captures multiple outbox channels, use the tables filter to scope each processor:
processors:
- type: outbox
tables: [orders_outbox]
topic: "orders.${event_type}"
- type: outbox
tables: [payments_outbox]
topic: "payments.${event_type}"
columns:
payload: data
Complete Examples
PostgreSQL → Kafka
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: order-events
tenant: acme
spec:
source:
type: postgres
config:
id: pg1
dsn: ${POSTGRES_DSN}
publication: orders_pub
slot: orders_slot
tables: [public.orders]
outbox:
prefixes: [outbox]
processors:
- type: outbox
topic: "${aggregate_type}.${event_type}"
default_topic: "events.unrouted"
raw_payload: true
sinks:
- type: kafka
config:
id: k1
brokers: ${KAFKA_BROKERS}
topic: "events.fallback"
The raw_payload: true means outbox events hit the wire as the extracted payload JSON. Regular CDC events (from public.orders) still use the sink’s configured envelope.
MySQL → Kafka (Multi-Outbox)
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: shop-events
tenant: acme
spec:
source:
type: mysql
config:
id: m1
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.orders_outbox
- shop.payments_outbox
outbox:
tables: ["shop.*_outbox"]
processors:
- type: outbox
tables: [orders_outbox]
topic: "orders.${event_type}"
raw_payload: true
- type: outbox
tables: [payments_outbox]
topic: "payments.${event_type}"
raw_payload: true
sinks:
- type: kafka
config:
id: k1
brokers: ${KAFKA_BROKERS}
topic: "events.default"
Migrating from Debezium
If you’re using Debezium’s outbox event router with a custom schema, DeltaForge’s column mappings and additional_headers map directly. For example, given this Debezium-style outbox table:
CREATE TABLE outbox_events (
id UUID PRIMARY KEY,
aggregatetype VARCHAR(64),
aggregateid VARCHAR(64),
type VARCHAR(64),
payload JSONB,
traceid VARCHAR(64),
tenant VARCHAR(32)
);
Debezium config:
transforms.outbox.table.field.event.id=id
transforms.outbox.table.field.event.key=aggregateid
transforms.outbox.table.field.event.type=type
transforms.outbox.table.field.event.payload=payload
transforms.outbox.table.fields.additional.placement=traceid:header,tenant:header
DeltaForge equivalent:
processors:
- type: outbox
topic: "${aggregatetype}.${type}"
key: "${aggregateid}"
raw_payload: true
columns:
event_id: id
aggregate_type: aggregatetype
aggregate_id: aggregateid
event_type: type
payload: payload
additional_headers:
x-trace-id: traceid
x-tenant: tenant
Key differences from Debezium:
- Topic template references raw column names directly —
${aggregatetype}, not${aggregate_type} raw_payload: truedelivers the extracted payload as-is — same behavior as Debezium’s outbox event router- Column mappings only affect header extraction (
df-*headers) and payload rewriting additional_headersreplacestable.fields.additional.placement- No SMT chain — everything is in one processor config
Observability
The outbox processor emits Prometheus-compatible metrics:
| Metric | Labels | Description |
|---|---|---|
deltaforge_outbox_transformed_total | - | Events successfully transformed |
deltaforge_outbox_dropped_total | reason | Events dropped or rejected |
Drop reasons:
| Reason | Meaning |
|---|---|
non_insert | UPDATE/DELETE on outbox table (expected, harmless) |
non_object | event.after is not a JSON object |
null_payload | event.after is null |
strict_missing_fields | Strict mode: required field missing (batch fails) |
In strict mode, strict_missing_fields increments the counter and returns an error that halts the batch - the pipeline will not silently lose events.
Tips
- PostgreSQL WAL messages are lightweight. No table, no index, no vacuum - just a single WAL entry per message. Prefer this over table-based outbox when using PostgreSQL.
- MySQL BLACKHOLE engine avoids storing outbox rows on disk. The row is written to the binlog and immediately discarded. Use it in production to avoid unbounded table growth.
- Outbox events coexist with normal CDC. The processor passes through all non-outbox events untouched, so you can mix regular table capture and outbox in the same pipeline.
- Topic templates use
${field}syntax, same as dynamic routing. The template resolves directly against the raw outbox payload columns - use your actual column names like${domain}.${action}, no remapping needed. - At-least-once delivery applies to outbox events just like regular CDC events. Downstream consumers should be idempotent - use the
df-event-idheader for idempotency, oraggregate_id+event_typeas a composite dedup key. - Malformed events are logged and dropped by default. Non-INSERT operations, null payloads, and non-object payloads produce a WARN-level log with the table name and reason. They do not propagate to sinks. Enable
strict: trueto fail the batch instead of dropping - this ensures operators are alerted to schema issues before events are lost.
Architecture
This document describes DeltaForge’s internal architecture, design decisions, and how the major components interact.
Design Principles
Source-Owned Semantics
DeltaForge avoids imposing a universal data model on all sources. Instead, each database source defines and owns its schema semantics:
- MySQL captures MySQL-specific types, collations, and engine information
- PostgreSQL captures PostgreSQL-specific types, OIDs, and replica identity
- Future sources (MongoDB, ClickHouse, TiDB) will capture their native semantics
This approach means downstream consumers receive schemas that accurately reflect the source database rather than a lowest-common-denominator normalization.
Delivery Guarantees First
The checkpoint system is designed around a single invariant:
Checkpoints are only saved after events have been successfully delivered.
This ordering guarantees at-least-once delivery. A crash between checkpoint and delivery would lose events; DeltaForge prevents this by always checkpointing after sink acknowledgment.
Configuration Over Code
Pipelines are defined declaratively in YAML. This enables:
- Version-controlled pipeline definitions
- Environment-specific configuration via variable expansion
- Rapid iteration without recompilation
Component Overview
┌─────────────────────────────────────────────────────────────────┐
│ DeltaForge Runtime │
├─────────────┬─────────────┬─────────────┬─────────────┬─────────┤
│ Sources │ Schema │ Coordinator │ Sinks │ Control │
│ │ Registry │ + Batch │ │ Plane │
├─────────────┼─────────────┼─────────────┼─────────────┼─────────┤
│ MySQL │ Durable │ Batching │ Kafka │ REST API│
│ PostgreSQL │ Schema │ Commit │ Redis │ Metrics │
│ │ Registry │ Policy │ NATS │ Health │
└─────────────┴──────┬──────┴─────────────┴─────────────┴─────────┘
│
┌──────────┴──────────┐
│ Storage Backend │
│ (SQLite / PG / │
│ Memory) │
├─────────────────────┤
│ KV · Log · Slot │
│ Queue │
└─────────────────────┘
Data Flow
Event Lifecycle
1. Source reads from database log (binlog/WAL)
│
▼
2. Schema loader maps table_id to schema
│
▼
3. Event constructed with before/after images
│
▼
4. Event sent to coordinator via channel
│
▼
5. Coordinator batches events
│
▼
6. Processors transform batch (JavaScript)
│
▼
7. Sinks deliver batch concurrently
│
▼
8. Commit policy evaluated
│
▼
9. Checkpoint saved (if policy satisfied)
Event Structure
Every CDC event shares a common structure:
#![allow(unused)]
fn main() {
pub struct Event {
pub source_id: String, // Source identifier
pub database: String, // Database name
pub table: String, // Table name
pub op: Op, // Insert, Update, Delete, Ddl
pub tx_id: Option<u64>, // Source transaction ID
pub before: Option<Value>, // Previous row state
pub after: Option<Value>, // New row state
pub schema_version: Option<String>, // Schema fingerprint
pub schema_sequence: Option<u64>, // For replay lookups
pub ddl: Option<Value>, // DDL payload if op == Ddl
pub timestamp: DateTime<Utc>, // Event timestamp
pub checkpoint: Option<CheckpointMeta>, // Position info
pub size_bytes: usize, // For batching
}
}
Schema Registry
Role
The schema registry serves three purposes:
- Map table IDs to schemas: Binlog events reference tables by ID; the registry resolves these to full schema metadata
- Detect schema changes: Fingerprint comparison identifies when DDL has modified a table
- Enable replay: Sequence numbers correlate events with the schema active when they were produced
Schema Sensing
At startup, the schema loader auto-discovers tables via pattern expansion and loads their schemas from the live database catalog before any CDC events arrive.
Pattern expansion supports wildcards:
| Pattern | Matches |
|---|---|
db.table | Exact table |
db.* | All tables in database |
db.prefix% | Tables matching prefix |
%.table | Table in any database |
DDL detection works through cache invalidation. When the binlog delivers a QueryEvent (DDL), the affected database’s cache is cleared. On the next row event for that table, the schema is re-fetched from INFORMATION_SCHEMA, fingerprinted, and registered as a new version. No separate DDL history log is maintained — the live catalog is always the source of truth.
Failover reconciliation (verifying schemas against a new primary after server identity change) is planned but not yet implemented.
Schema versions are persisted via DurableSchemaRegistry, which uses the StorageBackend Log primitive. On startup the log is replayed to populate an in-memory cache, so hot-path reads have the same performance as the previous in-memory implementation. Cold-start reconstruction is always possible from the log alone.
Schema Registration Flow
1. Schema loader fetches from INFORMATION_SCHEMA
│
▼
2. Compute fingerprint (SHA-256 of structure)
│
▼
3. Check registry for existing schema with same fingerprint
│
├── Found: Return existing version (idempotent)
│
└── Not found: Allocate new version number
│
▼
4. Store with: version, fingerprint, JSON, timestamp, sequence, checkpoint
Sequence Numbers
The registry maintains a global monotonic counter. Each schema version receives a sequence number at registration. Events carry this sequence, enabling accurate schema lookup during replay:
Timeline:
─────────────────────────────────────────────────────────────►
│ │ │
Schema v1 Schema v2 Schema v3
(seq=1) (seq=15) (seq=42)
│ │ │
└──events 1-14─┘──events 15-41─────┘──events 42+──►
Replay at seq=20: Use schema v2 (registered at seq=15, before seq=42)
Checkpoint Store
Timing Guarantee
The checkpoint is saved only after sinks acknowledge delivery:
┌────────┐ events ┌────────┐ ack ┌────────────┐
│ Source │ ─────────▶ │ Sink │ ───────▶ │ Checkpoint │
└────────┘ └────────┘ │ Store │
└────────────┘
If the process crashes after sending to sink but before checkpoint, events will be replayed. This is the “at-least-once” guarantee — duplicates are possible, but loss is not.
Storage Backends
Checkpoints are stored via BackendCheckpointStore, a thin adapter over the StorageBackend KV primitive. See Storage for backend configuration and the full namespace map.
| Backend | Persistence | Use Case |
|---|---|---|
SqliteStorageBackend | SQLite file | Single-instance production |
MemoryStorageBackend | None | Testing, ephemeral deployments |
PostgresStorageBackend | External DB | HA, multi-instance |
Checkpoint-Schema Correlation
When registering schemas, the current checkpoint can be attached:
#![allow(unused)]
fn main() {
registry.register_with_checkpoint(
tenant, db, table,
&fingerprint,
&schema_json,
Some(&checkpoint_bytes), // Current binlog position
).await?;
}
This creates a link between schema versions and source positions, enabling accurate schema lookup during replay.
Coordinator
The coordinator orchestrates event flow between source and sinks:
Batching
Events are accumulated until a threshold triggers flush:
max_events: Event count limitmax_bytes: Total serialized size limitmax_ms: Time since batch startedrespect_source_tx: Never split source transactions
Commit Policy
When multiple sinks are configured, the commit policy determines when the checkpoint advances:
#![allow(unused)]
fn main() {
match policy {
All => required_acks == total_sinks,
Required => required_acks == sinks.filter(|s| s.required).count(),
Quorum(n) => required_acks >= n,
}
}
Processor Pipeline
Processors run in declared order, transforming batches:
events ──▶ Processor 1 ──▶ Processor 2 ──▶ ... ──▶ transformed events
Each processor can filter, transform, or enrich events. The JavaScript processor uses deno_core for sandboxed execution.
Hot Paths
Critical performance paths have been optimized:
- Event construction - Minimal allocations, reuse buffers
- Checkpoint serialization - Opaque bytes avoid repeated JSON encoding
- Sink delivery - Batch operations reduce round trips
- Schema lookup - In-memory cache with stable fingerprints
Benchmarking
Performance is tracked via:
- Micro-benchmarks for specific operations
- End-to-end benchmarks using the Coordinator component
- Regression detection in CI
Future Architecture
Planned enhancements:
- Initial snapshot/backfill using the Slot primitive for cursor tracking
- Event store: time-based replay and schema evolution using the Log primitive
- Distributed coordination: leader election via the Slot primitive with TTL-based leases
- Additional sources: MongoDB, SQL Server, TiDB
- PostgreSQL storage validation: chaos/recovery testing to bring it to production parity with SQLite
Checkpoints
Checkpoints record pipeline progress so ingestion can resume from the last successfully delivered position. DeltaForge guarantees at-least-once delivery by saving checkpoints only after events have been acknowledged by sinks.
Core Guarantee: At-Least-Once Delivery
Checkpoints are only saved after events have been successfully delivered to sinks.
If a checkpoint were saved before delivery, a crash between those two points would silently lose events. DeltaForge prevents this by always checkpointing after sink acknowledgment.
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Source │────▶│ Processor │────▶│ Sink │────▶│ Checkpoint │
│ (read) │ │ (transform) │ │ (deliver) │ │ (save) │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
On crash: events since the last checkpoint are replayed. Consumers should be idempotent or use deduplication.
Storage
Checkpoints are stored via the unified StorageBackend - see Storage for backend configuration. All pipelines share the same storage backend; each pipeline’s checkpoint is keyed by its source ID.
| Backend | Persistence | Use Case |
|---|---|---|
SqliteStorageBackend | SQLite file on disk | Single-instance production |
MemoryStorageBackend | None (lost on restart) | Testing |
PostgresStorageBackend | External database | HA, multi-instance |
Checkpoint Contents
MySQL
Tracks binlog position:
file: binlog.000042
pos: 12345
gtid_set: (optional, if GTID replication is enabled)
PostgreSQL
Tracks replication stream LSN:
lsn: 0/1A2B3C4D
tx_id: (optional)
Checkpoint-Schema Correlation
When a schema change is detected, the current checkpoint position is recorded alongside the new schema version. This ensures that during replay, each event is interpreted with the schema that was active when it was produced - even if the table has since been altered.
Commit Policy
When multiple sinks are configured, the commit policy controls when the checkpoint advances:
| Policy | Behaviour |
|---|---|
all | All sinks must acknowledge |
required | Only sinks marked required: true must acknowledge |
quorum(n) | At least n sinks must acknowledge |
Set required: true only on sinks where delivery is mandatory for correctness. Optional sinks can fail without blocking the checkpoint.
Operations
Inspecting Checkpoints
sqlite3 ./data/deltaforge.db \
"SELECT key, length(value) FROM kv WHERE namespace = 'checkpoints';"
Resetting a Pipeline
To force a pipeline to re-read from the beginning, delete its checkpoint:
# Via API
curl -X DELETE http://localhost:8080/pipelines/{name}/checkpoint
# Directly in SQLite
sqlite3 ./data/deltaforge.db \
"DELETE FROM kv WHERE namespace = 'checkpoints' AND key = '{source-id}';"
Best Practices
- Back up
deltaforge.dbregularly - it contains both checkpoints and schema history - Monitor checkpoint lag to detect stuck pipelines
- Use smaller batch sizes for more frequent checkpoints at the cost of throughput
- Test recovery by killing the process and verifying no events are lost or duplicated
Storage
DeltaForge uses a unified storage layer that backs all runtime state: checkpoints, schema registry, FSM state, leases, and quarantine queues. A single StorageBackend instance is shared across subsystems, so there is one operational concern instead of many.
Backends
| Backend | Persistence | Use Case |
|---|---|---|
MemoryStorageBackend | None (lost on restart) | Testing, ephemeral deployments |
SqliteStorageBackend | SQLite file on disk | Single-instance production |
PostgresStorageBackend | External database | HA, multi-instance deployments |
Configuration
The backend is selected via CLI flags:
# SQLite (default for production)
deltaforge --config pipeline.yaml --storage-backend sqlite --storage-path ./data/deltaforge.db
# In-memory (testing only — all state lost on restart)
deltaforge --config pipeline.yaml --storage-backend memory
Or via the config file:
storage:
backend: sqlite
path: ./data/deltaforge.db
# or:
storage:
backend: postgres
dsn: "host=localhost dbname=deltaforge_storage user=df password=secret"
The SQLite backend creates the database file and parent directories automatically on first start. The PostgreSQL backend runs schema migrations on first connect - no manual table creation is needed.
PostgreSQL backend is implemented and available under the
postgresfeature flag, but has not yet received the same chaos/recovery validation as the SQLite backend. Treat it as beta for production use.
Primitives
The StorageBackend trait exposes four primitives. All operations are namespaced - different subsystems share the same backend without key collisions.
KV (Key-Value with optional TTL)
General-purpose persistent key-value store. Used for checkpoints, FSM state, and leases.
#![allow(unused)]
fn main() {
backend.kv_put("checkpoints", "mysql-pipe1", &bytes).await?;
backend.kv_put_with_ttl("leases", "pipe1", b"alive", ttl_secs).await?;
backend.kv_get("checkpoints", "mysql-pipe1").await?;
backend.kv_delete("checkpoints", "mysql-pipe1").await?;
backend.kv_list("checkpoints", Some("mysql-")).await?; // prefix filter
}
Log (Append-only, globally sequenced)
Immutable append-only log with a global monotonic sequence counter. Used for the schema registry. The sequence is global across all keys in the namespace - interleaved appends to different keys produce strictly increasing sequence numbers.
#![allow(unused)]
fn main() {
let seq = backend.log_append("schemas", "acme/shop/orders", &entry).await?;
let all = backend.log_list("schemas", "acme/shop/orders").await?;
let tail = backend.log_since("schemas", "acme/shop/orders", since_seq).await?;
}
Slot (Versioned, compare-and-swap)
Single versioned value with CAS semantics. Used for snapshot cursors and leader election. Concurrent CAS operations with the same expected version are serialized - exactly one wins.
#![allow(unused)]
fn main() {
let ver = backend.slot_upsert("snapshots", "pipe/orders", &state).await?;
let won = backend.slot_cas("snapshots", "pipe/orders", ver, &new_state).await?;
let cur = backend.slot_get("snapshots", "pipe/orders").await?; // (version, bytes)
}
Queue (Ordered, ack-based)
Ordered queue with explicit acknowledgement. Used for quarantine and DLQ. Items are not removed until acked; oldest items can be dropped when the queue is full.
#![allow(unused)]
fn main() {
let id = backend.queue_push("quarantine", "pipe/orders", &event_bytes).await?;
let items = backend.queue_peek("quarantine", "pipe/orders", 10).await?;
backend.queue_ack("quarantine", "pipe/orders", id).await?;
backend.queue_drop_oldest("quarantine", "pipe/orders", n).await?;
let len = backend.queue_len("quarantine", "pipe/orders").await?;
}
Namespace Map
Each subsystem owns a fixed namespace. Keys within a namespace never collide across subsystems.
| Namespace | Primitive | Used by | Key format |
|---|---|---|---|
checkpoints | KV | BackendCheckpointStore | {source-id} |
schemas | Log | DurableSchemaRegistry | {tenant}/{db}/{table} |
schemas | KV | DurableSchemaRegistry | {tenant}/{db}/{table} (index) |
snapshots | Slot | Snapshot/backfill (planned) | {pipeline}/{table} |
fsm | KV | FSM state (planned) | {pipeline} |
leases | KV + TTL | Leader election (planned) | {pipeline} |
quarantine | Queue | Quarantine (planned) | {pipeline}/{table} |
dlq | Queue | Dead-letter queue (planned) | {pipeline} |
Adapters
Existing pipeline code uses higher-level interfaces - it never touches the StorageBackend directly.
BackendCheckpointStore
Implements the CheckpointStore trait on top of KV. Drop-in replacement for the old FileCheckpointStore and SqliteCheckpointStore.
#![allow(unused)]
fn main() {
let store = BackendCheckpointStore::new(Arc::clone(&backend));
// CheckpointStore trait methods work unchanged
store.put_raw("mysql-pipe1", &bytes).await?;
store.get_raw("mysql-pipe1").await?;
}
DurableSchemaRegistry
Implements the schema registry on top of the Log primitive. On startup it replays the log to populate an in-memory cache, so hot-path reads are identical in performance to the old InMemoryRegistry. Writes go through the log, enabling cold-start reconstruction.
#![allow(unused)]
fn main() {
// Production: async construction with log replay
let registry = DurableSchemaRegistry::new(Arc::clone(&backend)).await?;
// Tests: sync construction, no replay (MemoryStorageBackend, empty log)
let registry = DurableSchemaRegistry::for_testing();
}
PipelineManager Wiring
PipelineManager::with_backend() wires both subsystems from a single backend:
#![allow(unused)]
fn main() {
let backend: ArcStorageBackend = SqliteStorageBackend::open("./data/deltaforge.db")?;
let manager = PipelineManager::with_backend(backend).await?;
}
Internally this creates a BackendCheckpointStore and a DurableSchemaRegistry from the same backend instance. All pipelines share the same storage file.
Schema Registry
DeltaForge’s schema registry tracks table schemas across time, enabling accurate event interpretation during replay and providing change detection for DDL operations.
Design Philosophy: Source-Owned Schemas
DeltaForge takes a fundamentally different approach to schema handling than many CDC tools. Rather than normalizing all database schemas into a universal type system, each source defines and owns its schema semantics.
This means:
- MySQL schemas capture MySQL semantics - column types like
bigint(20) unsigned,varchar(255), andjsonare preserved exactly as MySQL defines them - PostgreSQL schemas capture PostgreSQL semantics - arrays, custom types, and pg-specific attributes remain intact
- No lossy normalization - you don’t lose precision or database-specific information by forcing everything into a common format
This design avoids the maintenance burden of keeping a universal type system synchronized across all databases, and it ensures that downstream consumers receive schemas that accurately reflect the source database’s capabilities and constraints.
The SourceSchema Trait
Every CDC source implements the SourceSchema trait, which provides a common interface for fingerprinting and column access while allowing source-specific schema representations:
#![allow(unused)]
fn main() {
pub trait SourceSchema: Serialize + DeserializeOwned + Send + Sync {
/// Source type identifier (e.g., "mysql", "postgres", "mongodb").
fn source_kind(&self) -> &'static str;
/// Content-addressable fingerprint for change detection.
/// Two schemas with the same fingerprint are identical.
fn fingerprint(&self) -> String;
/// Column/field names in ordinal order.
fn column_names(&self) -> Vec<&str>;
/// Primary key column names.
fn primary_key(&self) -> Vec<&str>;
/// Human-readable description.
fn describe(&self) -> String;
}
}
Fingerprinting
Schema fingerprints use SHA-256 hashing over JSON-serialized content to provide:
- Stability - the same schema always produces the same fingerprint
- Change detection - any structural change produces a different fingerprint
- Content-addressability - fingerprints can be used as cache keys or deduplication identifiers
#![allow(unused)]
fn main() {
pub fn compute_fingerprint<T: Serialize>(value: &T) -> String {
let json = serde_json::to_vec(value).unwrap_or_default();
let hash = Sha256::digest(&json);
format!("sha256:{}", hex::encode(hash))
}
}
The fingerprint only includes structurally significant fields. For MySQL, this means columns and primary key are included, but engine and charset are excluded since they don’t affect how CDC events should be interpreted.
MySQL Schema Implementation
The MySqlTableSchema struct captures comprehensive MySQL table metadata:
#![allow(unused)]
fn main() {
pub struct MySqlTableSchema {
/// Columns in ordinal order
pub columns: Vec<MySqlColumn>,
/// Primary key column names
pub primary_key: Vec<String>,
/// Storage engine (InnoDB, MyISAM, etc.)
pub engine: Option<String>,
/// Default charset
pub charset: Option<String>,
/// Default collation
pub collation: Option<String>,
}
pub struct MySqlColumn {
pub name: String,
pub column_type: String, // e.g., "bigint(20) unsigned"
pub data_type: String, // e.g., "bigint"
pub nullable: bool,
pub ordinal_position: u32,
pub default_value: Option<String>,
pub extra: Option<String>, // e.g., "auto_increment"
pub comment: Option<String>,
pub char_max_length: Option<i64>,
pub numeric_precision: Option<i64>,
pub numeric_scale: Option<i64>,
}
}
Schema information is fetched from INFORMATION_SCHEMA at startup and cached for the pipeline’s lifetime.
Schema Registry Architecture
The schema registry serves three core functions:
- Version tracking - maintains a history of schema versions per table
- Change detection - compares fingerprints to detect DDL changes
- Replay correlation - associates schemas with checkpoint positions for accurate replay
Schema Versions
Each registered schema version includes:
| Field | Description |
|---|---|
version | Per-table version number (starts at 1) |
hash | Content fingerprint for deduplication |
schema_json | Full schema as JSON |
registered_at | Registration timestamp |
sequence | Global monotonic sequence number |
checkpoint | Source checkpoint at registration time |
Sequence Numbers for Replay
The registry maintains a global monotonic sequence counter. When a schema is registered, it receives the next sequence number. Events carry this sequence number, enabling the replay engine to look up the correct schema version:
#![allow(unused)]
fn main() {
// During replay: find schema active at event's sequence
let schema = registry.get_at_sequence(tenant, db, table, event.schema_sequence);
}
This ensures events are always interpreted with the schema that was active when they were produced, even if the table structure has since changed.
Checkpoint Correlation
Schemas can be registered with an associated checkpoint, creating a correlation between schema versions and source positions:
#![allow(unused)]
fn main() {
registry.register_with_checkpoint(
tenant, db, table,
&fingerprint,
&schema_json,
Some(checkpoint_bytes), // Optional: binlog position when schema was observed
).await?;
}
This correlation supports scenarios like:
- Replaying events from a specific checkpoint with the correct schema
- Determining which schema was active at a particular binlog position
- Rolling back schema state along with checkpoint rollback
Schema Loader
The MySqlSchemaLoader handles schema discovery and caching:
Pattern Expansion
Tables are specified using patterns that support wildcards:
| Pattern | Description |
|---|---|
db.table | Exact match |
db.* | All tables in database |
db.prefix% | Tables starting with prefix |
%.table | Table in any database |
Preloading
At startup, the loader expands patterns and preloads all matching schemas:
#![allow(unused)]
fn main() {
let schema_loader = MySqlSchemaLoader::new(dsn, registry, tenant);
let tracked_tables = schema_loader.preload(&["shop.orders", "shop.order_%"]).await?;
}
This ensures schemas are available before the first CDC event arrives.
Caching
Loaded schemas are cached to avoid repeated INFORMATION_SCHEMA queries:
#![allow(unused)]
fn main() {
// Fast path: return cached schema
if let Some(cached) = cache.get(&(db, table)) {
return Ok(cached.clone());
}
// Slow path: fetch from database, register, cache
let schema = fetch_schema(db, table).await?;
let version = registry.register(...).await?;
cache.insert((db, table), loaded_schema);
}
DDL Handling
When the binlog contains DDL events, the schema loader responds:
| DDL Type | Action |
|---|---|
CREATE TABLE | Schema loaded on first row event |
ALTER TABLE | Cache invalidated, reloaded on next row |
DROP TABLE | Cache entry removed |
TRUNCATE | No schema change |
RENAME TABLE | Old removed, new loaded on first row |
DDL detection uses the QueryEvent type in the binlog. On DDL, the entire database’s schema cache is invalidated since MySQL doesn’t always specify the exact table in DDL events.
API Endpoints
Reload Schemas
Force reload schemas from the database:
curl -X POST http://localhost:8080/pipelines/{name}/schemas/reload
This clears the cache and re-fetches schemas for all tracked tables.
List Cached Schemas
View currently cached schemas:
curl http://localhost:8080/pipelines/{name}/schemas
Limitations
- In-memory registry - Schema versions are lost on restart. Persistent backends (SQLite, then PostgreSQL for HA) are planned.
- No cross-pipeline sharing - Each pipeline maintains its own registry instance
- Pattern expansion at startup - New tables matching patterns require pipeline restart or reload
Best Practices
- Use explicit table patterns in production to avoid accidentally capturing unwanted tables
- Monitor schema reload times - slow reloads may indicate overly broad patterns
- Trigger schema reload after DDL if your deployment process modifies schemas
- Include schema version in downstream events for consumers that need schema evolution awareness
Troubleshooting
Unknown table_id Errors
WARN write_rows for unknown table_id, table_id=42
The binlog contains row events for a table not in the table_map. This happens when:
- A table was created after the CDC stream started
- Table patterns don’t match the table
Solution: Trigger a schema reload via the REST API.
Schema Fetch Returned 0 Columns
WARN schema fetch returned 0 columns, db=shop, table=orders
Usually indicates:
- Table doesn’t exist
- MySQL user lacks
SELECTprivilege onINFORMATION_SCHEMA - Table was dropped between detection and schema load
Slow Schema Loading
WARN slow schema fetch, db=shop, table=orders, ms=350
Consider:
- Narrowing table patterns to reduce the number of tables
- Using exact table names instead of wildcards
- Verifying network latency to the MySQL server
Schema Sensing
Schema sensing automatically infers and tracks schema structure from JSON event payloads. This complements the schema registry by discovering schema from data rather than database metadata.
When to Use Schema Sensing
Schema sensing is useful when:
- Source doesn’t provide schema: Some sources emit JSON without metadata
- JSON columns: Database JSON/JSONB columns have dynamic structure
- Schema evolution tracking: Detect when payload structure changes over time
- Downstream integration: Generate JSON Schema for consumers
- Dynamic map keys: Session IDs, trace IDs, or other high-cardinality keys in JSON
How It Works
┌──────────────┐ ┌─────────────────┐ ┌──────────────────┐
│ Event │────▶│ Schema Sensor │────▶│ Inferred Schema │
│ Payload │ │ (sampling) │ │ + Fingerprint │
└──────────────┘ └─────────────────┘ └──────────────────┘
│
▼
┌─────────────────┐
│ Structure Cache │
│ + HC Classifier │
└─────────────────┘
- Observation: Events flow through the sensor during batch processing
- Sampling: Not every event is fully analyzed (configurable rate)
- Deep inspection: Nested JSON structures are recursively analyzed
- High-cardinality detection: Dynamic map keys are classified and normalized
- Fingerprinting: Schema changes are detected via SHA-256 fingerprints
- Caching: Repeated structures skip full analysis for performance
High-Cardinality Key Handling
JSON payloads often contain dynamic keys like session IDs, trace IDs, or user-generated identifiers:
{
"id": 1,
"sessions": {
"sess_abc123": {"user_id": 42, "started_at": 1700000000},
"sess_xyz789": {"user_id": 43, "started_at": 1700000001}
}
}
Without special handling, each unique key (sess_abc123, sess_xyz789) triggers a “schema evolution” event, causing:
- 0% cache hit rate
- Constant false evolution alerts
- Unbounded schema growth
How It Works
DeltaForge uses probabilistic data structures (HyperLogLog, SpaceSaving) to classify fields:
| Classification | Description | Example |
|---|---|---|
| Stable fields | Appear in most events | id, type, timestamp |
| Dynamic fields | Unique per event, high cardinality | sess_*, trace_*, uuid_* |
When dynamic fields are detected, the schema sensor:
- Normalizes keys: Replaces
sess_abc123with<dynamic>placeholder - Uses adaptive hashing: Structure cache ignores dynamic key names
- Produces stable fingerprints: Same schema despite different keys
Results
| Scenario | Without HC | With HC |
|---|---|---|
| Nested dynamic keys | 100% evolution rate | <1% evolution rate |
| Top-level dynamic keys | 0% cache hits | >99% cache hits |
| Stable structs | Baseline | ~20% overhead during warmup, then ~0% |
Configuration
Example
|
Options
|
Inferred Types
Schema sensing infers these JSON types:
| Type | Description |
|---|---|
null | JSON null value |
boolean | true/false |
integer | Whole numbers |
number | Floating point numbers |
string | Text values |
array | JSON arrays (element types tracked) |
object | Nested objects (fields recursively analyzed) |
For fields with varying types across events, all observed types are recorded.
Schema Evolution
When schema structure changes, the sensor:
- Detects change: Fingerprint differs from previous version
- Increments sequence: Monotonic version number increases
- Logs evolution: Emits structured log with old/new fingerprints
- Updates cache: New structure becomes current
Evolution events are available via the REST API and can trigger alerts.
Stabilization
After observing enough events, a schema “stabilizes”:
- Warmup phase completes
- Structure stops changing
- Sampling rate takes effect
- Cache hit rate increases
Stabilized schemas have stabilized: true in API responses.
API Access
List Inferred Schemas
curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas
Get Schema Details
curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders
Export as JSON Schema
curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders/json-schema
Cache Statistics
curl http://localhost:8080/pipelines/my-pipeline/sensing/stats
Dynamic Map Classifications
curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders/classifications
Response:
{
"table": "orders",
"paths": {
"": {"stable_fields": ["id", "type", "timestamp"], "has_dynamic_fields": false},
"sessions": {"stable_fields": [], "has_dynamic_fields": true, "unique_keys": 1523},
"metadata": {"stable_fields": ["version"], "has_dynamic_fields": true, "unique_keys": 42}
}
}
Drift Detection
Schema sensing integrates with drift detection to compare:
- Expected schema: From database metadata (schema registry)
- Observed schema: From event payloads (schema sensing)
When mismatches occur, drift events are recorded:
| Drift Type | Description |
|---|---|
unexpected_null | Non-nullable column has null values |
type_mismatch | Observed type differs from declared type |
undeclared_column | Field in data not in schema |
missing_column | Schema field never seen in data |
json_structure_change | JSON column structure changed |
Access drift data via:
curl http://localhost:8080/pipelines/my-pipeline/drift
Performance Considerations
Sampling Tradeoffs
| Setting | Effect |
|---|---|
Higher warmup_events | Better initial accuracy, slower stabilization |
Higher sample_rate | Lower CPU usage, slower evolution detection |
Larger structure_cache_size | More memory, better hit rate |
Recommended Settings
High-throughput pipelines (>10k events/sec):
sampling:
warmup_events: 100
sample_rate: 10
structure_cache: true
structure_cache_size: 100
high_cardinality:
enabled: true
min_events: 200
Schema evolution monitoring:
sampling:
warmup_events: 25
sample_rate: 2
structure_cache: true
high_cardinality:
enabled: true
min_events: 50
Payloads with dynamic keys (session stores, feature flags):
sampling:
structure_cache: true
structure_cache_size: 50
high_cardinality:
enabled: true
min_events: 100
min_dynamic_fields: 3
stable_threshold: 0.5
Development/debugging:
sampling:
warmup_events: 10
sample_rate: 1 # Analyze every event
high_cardinality:
enabled: true
min_events: 20 # Faster classification
Example: JSON Column Sensing
For tables with JSON columns, sensing reveals the internal structure:
# Database schema shows: metadata JSON
# Sensing reveals:
fields:
- name: "metadata.user_agent"
types: ["string"]
nullable: false
- name: "metadata.ip_address"
types: ["string"]
nullable: true
- name: "metadata.tags"
types: ["array"]
array_element_types: ["string"]
This enables downstream consumers to understand JSON column structure without manual documentation.
Metrics
Schema sensing emits these Prometheus metrics:
| Metric | Type | Labels | Description |
|---|---|---|---|
deltaforge_schema_events_total | Counter | table | Total events observed |
deltaforge_schema_cache_hits_total | Counter | table | Structure cache hits |
deltaforge_schema_cache_misses_total | Counter | table | Structure cache misses |
deltaforge_schema_evolutions_total | Counter | table | Schema evolutions detected |
deltaforge_schema_tables_total | Gauge | - | Tables with detected schemas |
deltaforge_schema_dynamic_maps_total | Gauge | - | Paths classified as dynamic maps |
deltaforge_schema_sensing_seconds | Histogram | table | Per-event sensing latency |
Example Queries
# Cache hit rate per table
sum(rate(deltaforge_schema_cache_hits_total[5m])) by (table)
/
sum(rate(deltaforge_schema_events_total[5m])) by (table)
# Schema evolution rate (should be near zero after warmup)
sum(rate(deltaforge_schema_evolutions_total[5m])) by (table)
# P99 sensing latency
histogram_quantile(0.99, rate(deltaforge_schema_sensing_seconds_bucket[5m]))
Observability playbook
DeltaForge already ships a Prometheus exporter, structured logging, and a panic hook. The runtime now emits source ingress counters, batching/processor histograms, and sink latency/throughput so operators can build production dashboards immediately. The tables below capture what is wired today and the remaining gaps to make the platform production ready for data and infra engineers.
What exists today
- Prometheus endpoint served at
/metrics(default0.0.0.0:9000) with descriptors for pipeline counts, source/sink counters, and a stage latency histogram. The recorder is installed automatically when metrics are enabled. - Structured logging via
tracing_subscriberwith JSON output by default, optional targets, and support forRUST_LOGoverrides. - Panic hook increments a
deltaforge_panics_totalcounter and logs captured panics before delegating to the default hook.
Instrumentation gaps and recommendations
The sections below call out concrete metrics and log events to add per component. All metrics should include pipeline, tenant, and component identifiers where applicable so users can aggregate across fleets.
Sources (MySQL/Postgres)
| Status | Metric/log | Rationale |
|---|---|---|
| ✅ Implemented | deltaforge_source_events_total{pipeline,source,table} counter increments when MySQL events are handed to the coordinator. | Surfaces ingress per table and pipeline. |
| ✅ Implemented | deltaforge_source_reconnects_total{pipeline,source} counter when binlog reads reconnect. | Makes retry storms visible. |
| 🚧 Gap | deltaforge_source_lag_seconds{pipeline,source} gauge based on binlog/WAL position vs. server time. | Alert when sources fall behind. |
| 🚧 Gap | deltaforge_source_idle_seconds{pipeline,source} gauge updated when no events arrive within the inactivity window. | Catch stuck readers before downstream backlogs form. |
Coordinator and batching
| Status | Metric/log | Rationale |
|---|---|---|
| ✅ Implemented | deltaforge_batch_events{pipeline} and deltaforge_batch_bytes{pipeline} histograms in Coordinator::process_deliver_and_maybe_commit. | Tune batching policies with data. |
| ✅ Implemented | deltaforge_stage_latency_seconds{pipeline,stage,trigger} histogram for processor stage. | Provides batch timing per trigger (timer/limits/shutdown). |
| ✅ Implemented | deltaforge_processor_latency_seconds{pipeline,processor} histogram around every processor invocation. | Identify slow user functions. |
| 🚧 Gap | deltaforge_pipeline_channel_depth{pipeline} gauge from mpsc::Sender::capacity()/len(). | Detect backpressure between sources and coordinator. |
| 🚧 Gap | Checkpoint outcome counters/logs (deltaforge_checkpoint_success_total / _failure_total). | Alert on persistence regressions and correlate to data loss risk. |
Sinks (Kafka/Redis/custom)
| Status | Metric/log | Rationale |
|---|---|---|
| ✅ Implemented | deltaforge_sink_events_total{pipeline,sink} counter and deltaforge_sink_latency_seconds{pipeline,sink} histogram around each send. | Throughput and responsiveness per sink. |
| ✅ Implemented | deltaforge_sink_batch_total{pipeline,sink} counter for send. | Number of batches sent per sink. |
| 🚧 Gap | Error taxonomy in deltaforge_sink_failures_total (add kind/details). | Easier alerting on specific failure classes (auth, timeout, schema). |
| 🚧 Gap | Backpressure gauge for client buffers (rdkafka queue, Redis pipeline depth). | Early signal before errors occur. |
| 🚧 Gap | Drop/skip counters from processors/sinks. | Auditing and reconciliation. |
Control plane and health endpoints
| Need | Suggested metric/log | Rationale |
|---|---|---|
| API request accounting | deltaforge_api_requests_total{route,method,status} counter and latency histogram using Axum middleware. | Production-grade visibility of operator actions. |
| Ready/Liveness transitions | Logs with pipeline counts and per-pipeline status when readiness changes. | Explain probe failures in log aggregation. |
| Pipeline lifecycle | Counters for create/patch/stop actions with success/error labels; include tenant and caller metadata in logs. | Auditable control-plane operations. |
Examples
Complete pipeline configurations demonstrating common DeltaForge use cases. Each example is ready to run with minimal modifications.
Available Examples
| Example | Source | Sink(s) | Key Features |
|---|---|---|---|
| MySQL to Redis | MySQL | Redis | JavaScript processor, PII redaction |
| Turso to Kafka | Turso/libSQL | Kafka | Native CDC, CloudEvents envelope |
| PostgreSQL to NATS | PostgreSQL | NATS | Logical replication, CloudEvents |
| Multi-Sink Fan-Out | MySQL | Kafka + Redis + NATS | Multiple envelopes, selective checkpointing |
| Event Filtering | MySQL | Kafka | JavaScript filtering, PII redaction |
| Schema Sensing | PostgreSQL | Kafka | JSON schema inference, drift detection |
| Production Kafka | PostgreSQL | Kafka | SASL/SSL auth, exactly-once, tuning |
| Cache Invalidation | MySQL | Redis | CDC stream for cache invalidation workers |
| Audit Trail | PostgreSQL | Kafka | Compliance logging, PII redaction |
| Analytics Preprocessing | MySQL | Kafka + Redis | Metrics enrichment, analytics stream |
| Outbox Pattern | MySQL | Kafka | Transactional outbox, raw payload, per-aggregate routing |
Quick Start
- Set environment variables for your database and sink connections
- Copy the example to a
.yamlfile - Run DeltaForge:
cargo run -p runner -- --config your-pipeline.yaml
Examples by Category
Getting Started
| Example | Description |
|---|---|
| MySQL to Redis | Simple pipeline with JavaScript transformation |
| Turso to Kafka | Edge database to Kafka with CloudEvents |
| PostgreSQL to NATS | PostgreSQL logical replication to NATS |
Production Patterns
| Example | Description |
|---|---|
| Production Kafka | Authentication, exactly-once, performance tuning |
| Multi-Sink Fan-Out | Multiple sinks with different formats |
| Cache Invalidation | CDC stream for cache invalidation |
| Outbox Pattern | Transactional outbox with raw payload delivery |
Data Processing
| Example | Description |
|---|---|
| Event Filtering | Filter, drop, and redact events |
| Schema Sensing | Automatic JSON schema discovery |
| Analytics Preprocessing | Prepare events for analytics platforms |
Compliance & Auditing
| Example | Description |
|---|---|
| Audit Trail | SOC2/HIPAA/GDPR-compliant change tracking |
Examples by Feature
Envelope Formats
| Format | Example |
|---|---|
| Native | MySQL to Redis, Event Filtering, Cache Invalidation |
| Debezium | Schema Sensing, Multi-Sink Fan-Out, Production Kafka |
| CloudEvents | Turso to Kafka, PostgreSQL to NATS |
JavaScript Processors
| Use Case | Example |
|---|---|
| PII Redaction | MySQL to Redis, Audit Trail |
| Event Filtering | Event Filtering |
| Enrichment | Turso to Kafka, Analytics Preprocessing |
| Cache Key Generation | Cache Invalidation |
| Audit Metadata | Audit Trail |
Multi-Sink Patterns
| Pattern | Example |
|---|---|
| Fan-out with different formats | Multi-Sink Fan-Out |
| Primary + best-effort secondary | Multi-Sink Fan-Out, Analytics Preprocessing |
Customizing Examples
Change Envelope Format
All sinks support configurable envelopes. Add to any sink config:
sinks:
- type: kafka
config:
# ... other config
envelope:
type: cloudevents # or: native, debezium
type_prefix: "com.example" # required for cloudevents
encoding: json
See Envelopes and Encodings for details.
Add Multiple Sinks
Fan out to multiple destinations with different formats:
sinks:
- type: kafka
config:
id: primary-kafka
envelope:
type: debezium
required: true # Must succeed for checkpoint
- type: redis
config:
id: cache-redis
envelope:
type: native
required: false # Best-effort, won't block checkpoint
See Sinks documentation for multi-sink patterns.
Enable Schema Sensing
Automatically discover JSON structure in your data:
schema_sensing:
enabled: true
deep_inspect:
enabled: true
max_depth: 3
sampling:
warmup_events: 100
sample_rate: 10
See Schema Sensing for configuration options.
More Resources
- Configuration Reference - Full spec documentation
- Quickstart Guide - Get running in minutes
- Troubleshooting - Common issues and solutions
MySQL to Redis
This example streams MySQL binlog events into a Redis stream with an inline JavaScript transformation for PII redaction.
Overview
| Component | Configuration |
|---|---|
| Source | MySQL binlog CDC |
| Processor | JavaScript email redaction |
| Sink | Redis Streams |
| Envelope | Native (configurable) |
Pipeline Configuration
metadata:
name: orders-mysql-to-redis
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
processors:
- type: javascript
id: redact-email
inline: |
function processBatch(events) {
return events.map((event) => {
if (event.after && event.after.email) {
event.after.email = "[redacted]";
}
return event;
});
}
limits:
timeout_ms: 500
sinks:
- type: redis
config:
id: orders-redis
uri: ${REDIS_URI}
stream: orders
envelope:
type: native
encoding: json
required: true
batch:
max_events: 500
max_bytes: 1048576
max_ms: 1000
commit_policy:
mode: required
Running the Example
1. Set Environment Variables
export MYSQL_DSN="mysql://user:password@localhost:3306/shop"
export REDIS_URI="redis://localhost:6379"
2. Start DeltaForge
# Save config as mysql-redis.yaml
cargo run -p runner -- --config mysql-redis.yaml
3. Insert Test Data
INSERT INTO shop.orders (email, total, status)
VALUES ('alice@example.com', 99.99, 'pending');
4. Verify in Redis
./dev.sh redis-read orders 10
You should see the event with the email redacted:
{
"before": null,
"after": {
"id": 1,
"email": "[redacted]",
"total": 99.99,
"status": "pending"
},
"op": "c",
"ts_ms": 1700000000000
}
Variations
With Debezium Envelope
For Kafka Connect compatibility downstream:
sinks:
- type: redis
config:
id: orders-redis
uri: ${REDIS_URI}
stream: orders
envelope:
type: debezium
Multi-Sink Fan-Out
Add Kafka alongside Redis for durability:
sinks:
- type: kafka
config:
id: orders-kafka
brokers: ${KAFKA_BROKERS}
topic: orders
envelope:
type: debezium
required: true # Critical path
- type: redis
config:
id: orders-redis
uri: ${REDIS_URI}
stream: orders
envelope:
type: native
required: false # Best-effort
With this configuration, checkpoints only wait for Kafka. Redis failures won’t block the pipeline.
With Schema Sensing
Automatically track schema changes:
spec:
# ... source and sinks config ...
schema_sensing:
enabled: true
deep_inspect:
enabled: true
max_depth: 3
sampling:
warmup_events: 100
sample_rate: 10
Key Concepts Demonstrated
- JavaScript Processors: Transform events in-flight with custom logic
- PII Redaction: Mask sensitive data before it reaches downstream systems
- Envelope Configuration: Choose output format based on consumer needs
- Commit Policy: Control checkpoint behavior with
requiredflag
Related Documentation
- MySQL Source - Prerequisites and configuration
- Redis Sink - Connection options and batching
- Envelopes - Output format options
- Configuration Reference - Full spec documentation
Example: Turso to Kafka
This example demonstrates streaming changes from a Turso database to Kafka with schema sensing enabled.
Use Case
You have a Turso database (or local libSQL) and want to:
- Stream table changes to a Kafka topic
- Automatically detect schema structure from JSON payloads
- Transform events with JavaScript before publishing
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: turso2kafka
tenant: acme
spec:
source:
type: turso
config:
id: turso-main
# Local libSQL for development
url: "http://127.0.0.1:8080"
# For Turso cloud:
# url: "libsql://your-db.turso.io"
# auth_token: "${TURSO_AUTH_TOKEN}"
tables: ["users", "orders", "order_items"]
poll_interval_ms: 1000
cdc_mode: auto
processors:
- type: javascript
id: enrich
inline: |
function processBatch(events) {
return events.map(event => {
// Add custom metadata to events
event.source_type = "turso";
event.processed_at = new Date().toISOString();
return event;
});
}
sinks:
- type: kafka
config:
id: kafka-main
brokers: "${KAFKA_BROKERS}"
topic: turso.changes
required: true
exactly_once: false
client_conf:
message.timeout.ms: "5000"
acks: "all"
batch:
max_events: 100
max_bytes: 1048576
max_ms: 500
respect_source_tx: false
max_inflight: 2
commit_policy:
mode: required
schema_sensing:
enabled: true
deep_inspect:
enabled: true
max_depth: 3
sampling:
warmup_events: 50
sample_rate: 5
structure_cache: true
Running the Example
1. Start Infrastructure
# Start Kafka and other services
./dev.sh up
# Create the target topic
./dev.sh k-create turso.changes 6
2. Start Local libSQL (Optional)
For local development without Turso cloud:
# Using sqld (libSQL server)
sqld --http-listen-addr 127.0.0.1:8080
# Or with Docker
docker run -p 8080:8080 ghcr.io/libsql/sqld:latest
3. Create Test Tables
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE,
metadata TEXT -- JSON column
);
CREATE TABLE orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
total REAL NOT NULL,
status TEXT DEFAULT 'pending',
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
4. Run DeltaForge
# Save config as turso-kafka.yaml
cargo run -p runner -- --config turso-kafka.yaml
5. Insert Test Data
INSERT INTO users (name, email, metadata)
VALUES ('Alice', 'alice@example.com', '{"role": "admin", "tags": ["vip"]}');
INSERT INTO orders (user_id, total, status)
VALUES (1, 99.99, 'completed');
6. Verify Events in Kafka
./dev.sh k-consume turso.changes --from-beginning
You should see events like:
{
"event_id": "550e8400-e29b-41d4-a716-446655440000",
"tenant_id": "acme",
"table": "users",
"op": "insert",
"after": {
"id": 1,
"name": "Alice",
"email": "alice@example.com",
"metadata": "{\"role\": \"admin\", \"tags\": [\"vip\"]}"
},
"source_type": "turso",
"processed_at": "2025-01-15T10:30:00.000Z",
"timestamp": "2025-01-15T10:30:00.123Z"
}
Monitoring
Check Pipeline Status
curl http://localhost:8080/pipelines/turso2kafka
View Inferred Schemas
# List all inferred schemas
curl http://localhost:8080/pipelines/turso2kafka/sensing/schemas
# Get details for users table
curl http://localhost:8080/pipelines/turso2kafka/sensing/schemas/users
# Export as JSON Schema
curl http://localhost:8080/pipelines/turso2kafka/sensing/schemas/users/json-schema
Check Drift Detection
curl http://localhost:8080/pipelines/turso2kafka/drift
Turso Cloud Configuration
For production with Turso cloud:
source:
type: turso
config:
id: turso-prod
url: "libsql://mydb-myorg.turso.io"
auth_token: "${TURSO_AUTH_TOKEN}"
tables: ["*"]
cdc_mode: native
poll_interval_ms: 1000
native_cdc:
level: data
Set the auth token via environment variable:
export TURSO_AUTH_TOKEN="your-token-here"
Notes
- CDC Mode:
autotries native CDC first, then falls back to triggers or polling - Poll Interval: Lower values reduce latency but increase database load
- Schema Sensing: Automatically discovers JSON structure in text columns
- Exactly Once: Set to
falsefor higher throughput; usetrueif Kafka cluster supports EOS
PostgreSQL to NATS
This one streams PostgreSQL logical replication changes to NATS JetStream with CloudEvents envelope format, for serverless architectures for example.
Overview
| Component | Configuration |
|---|---|
| Source | PostgreSQL logical replication |
| Processor | None (passthrough) |
| Sink | NATS JetStream |
| Envelope | CloudEvents 1.0 |
Use Case
You have a PostgreSQL database and want to:
- Stream changes to NATS for event-driven microservices
- Use CloudEvents format for AWS Lambda, Azure Functions, or Knative
- Leverage JetStream for durable, replay-capable event streams
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: users-postgres-to-nats
tenant: acme
spec:
source:
type: postgres
config:
id: users-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_users
publication: users_pub
tables:
- public.users
- public.profiles
- public.user_sessions
start_position: earliest
sinks:
- type: nats
config:
id: users-nats
url: ${NATS_URL}
subject: users.events
stream: USERS
envelope:
type: cloudevents
type_prefix: "com.acme.users"
encoding: json
required: true
send_timeout_secs: 5
batch_timeout_secs: 30
batch:
max_events: 500
max_ms: 500
respect_source_tx: true
commit_policy:
mode: required
Prerequisites
PostgreSQL Setup
-- Enable logical replication (postgresql.conf)
-- wal_level = logical
-- Create publication for the tables
CREATE PUBLICATION users_pub FOR TABLE users, profiles, user_sessions;
-- Verify publication
SELECT * FROM pg_publication_tables WHERE pubname = 'users_pub';
NATS JetStream Setup
# Start NATS with JetStream enabled
./dev.sh up
# Create the stream
./dev.sh nats-stream-add USERS 'users.>'
Running the Example
1. Set Environment Variables
export POSTGRES_DSN="postgres://user:password@localhost:5432/mydb"
export NATS_URL="nats://localhost:4222"
2. Start DeltaForge
cargo run -p runner -- --config postgres-nats.yaml
3. Insert Test Data
INSERT INTO users (name, email, created_at)
VALUES ('Alice', 'alice@example.com', NOW());
UPDATE users SET email = 'alice.new@example.com' WHERE name = 'Alice';
4. Verify in NATS
./dev.sh nats-sub 'users.>'
You should see CloudEvents formatted messages:
{
"specversion": "1.0",
"id": "550e8400-e29b-41d4-a716-446655440000",
"source": "deltaforge/users-postgres/public.users",
"type": "com.acme.users.created",
"time": "2025-01-15T10:30:00.000Z",
"datacontenttype": "application/json",
"subject": "public.users",
"data": {
"before": null,
"after": {
"id": 1,
"name": "Alice",
"email": "alice@example.com",
"created_at": "2025-01-15T10:30:00.000Z"
},
"op": "c"
}
}
Variations
With Debezium Envelope
For compatibility with existing Debezium consumers:
sinks:
- type: nats
config:
id: users-nats
url: ${NATS_URL}
subject: users.events
stream: USERS
envelope:
type: debezium
With Authentication
sinks:
- type: nats
config:
id: users-nats
url: ${NATS_URL}
subject: users.events
stream: USERS
envelope:
type: cloudevents
type_prefix: "com.acme.users"
credentials_file: /path/to/nats.creds
# Or use username/password:
# username: ${NATS_USER}
# password: ${NATS_PASS}
Starting from Latest
Skip existing data and only capture new changes:
source:
type: postgres
config:
id: users-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_users
publication: users_pub
tables:
- public.users
start_position: latest
Key Concepts Demonstrated
- PostgreSQL Logical Replication: Production-ready CDC with slot management
- CloudEvents Format: Standard envelope for cloud-native event routing
- JetStream Durability: Replay-capable event streams with consumer acknowledgment
- Transaction Preservation:
respect_source_tx: truekeeps related changes together
Related Documentation
- PostgreSQL Source - Replication setup and configuration
- NATS Sink - JetStream configuration and authentication
- Envelopes - Output format options
- Configuration Reference - Full spec documentation
Multi-Sink Fan-Out
This example demonstrates streaming changes to multiple destinations simultaneously, each with a different envelope format tailored to its consumers.
Overview
| Component | Configuration |
|---|---|
| Source | MySQL binlog CDC |
| Processor | JavaScript enrichment |
| Sinks | Kafka (Debezium) + Redis (Native) + NATS (CloudEvents) |
| Pattern | Fan-out with format adaptation |
Use Case
You have a MySQL database and need to:
- Send to Kafka Connect (requires Debezium format)
- Populate a Redis cache (native format for efficiency)
- Trigger serverless functions via NATS (CloudEvents format)
- Handle sink failures independently without blocking the pipeline
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-fan-out
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.order_items
processors:
- type: javascript
id: enrich
inline: |
function processBatch(events) {
return events.map(event => {
// Add routing hints via tags (tags is a valid Event field)
event.tags = event.tags || [];
if (event.table.includes('orders')) {
event.tags.push('entity:order');
// High-value orders get priority tag
if (event.after && event.after.total > 1000) {
event.tags.push('priority:high');
}
}
event.tags.push('enriched');
return event;
});
}
limits:
timeout_ms: 500
sinks:
# Primary: Kafka for data warehouse / Kafka Connect
# Uses Debezium format for ecosystem compatibility
- type: kafka
config:
id: warehouse-kafka
brokers: ${KAFKA_BROKERS}
topic: orders.cdc
envelope:
type: debezium
encoding: json
required: true # Must succeed for checkpoint
exactly_once: false
client_conf:
acks: "all"
# Secondary: Redis for real-time cache
# Uses native format for minimal overhead
- type: redis
config:
id: cache-redis
uri: ${REDIS_URI}
stream: orders:cache
envelope:
type: native
encoding: json
required: false # Best-effort, won't block pipeline
# Tertiary: NATS for serverless triggers
# Uses CloudEvents for Lambda/Functions compatibility
- type: nats
config:
id: serverless-nats
url: ${NATS_URL}
subject: orders.events
stream: ORDERS
envelope:
type: cloudevents
type_prefix: "com.acme.orders"
encoding: json
required: false # Best-effort
batch:
max_events: 500
max_bytes: 1048576
max_ms: 500
respect_source_tx: true
commit_policy:
mode: required # Only wait for required sinks (Kafka)
How It Works
Checkpoint Behavior
With commit_policy.mode: required:
Source Event
│
▼
┌─────────────────────────────────────────────────────┐
│ Parallel Sink Delivery │
├─────────────────┬─────────────────┬─────────────────┤
│ Kafka │ Redis │ NATS │
│ required: true │ required: false │ required: false │
│ │ │ │
│ ✓ Must succeed │ ✓ Best-effort │ ✓ Best-effort │
└────────┬────────┴────────┬────────┴──────┬──────────┘
│ │ │
▼ │ │
Checkpoint <───────────┘ │
advances │
(even if Redis/NATS fail) │
Output Formats
Kafka (Debezium):
{
"payload": {
"before": null,
"after": {"id": 1, "total": 1500.00, "status": "pending"},
"source": {"connector": "mysql", "db": "shop", "table": "orders"},
"op": "c",
"ts_ms": 1700000000000
}
}
Redis (Native):
{
"before": null,
"after": {"id": 1, "total": 1500.00, "status": "pending"},
"source": {"connector": "mysql", "db": "shop", "table": "orders"},
"op": "c",
"ts_ms": 1700000000000
}
NATS (CloudEvents):
{
"specversion": "1.0",
"id": "evt-123",
"source": "deltaforge/orders-mysql/shop.orders",
"type": "com.acme.orders.created",
"time": "2025-01-15T10:30:00.000Z",
"data": {
"before": null,
"after": {"id": 1, "total": 1500.00, "status": "pending"},
"op": "c"
}
}
Running the Example
1. Set Environment Variables
export MYSQL_DSN="mysql://user:password@localhost:3306/shop"
export KAFKA_BROKERS="localhost:9092"
export REDIS_URI="redis://localhost:6379"
export NATS_URL="nats://localhost:4222"
2. Start Infrastructure
./dev.sh up
./dev.sh k-create orders.cdc 6
./dev.sh nats-stream-add ORDERS 'orders.>'
3. Start DeltaForge
cargo run -p runner -- --config fan-out.yaml
4. Insert Test Data
INSERT INTO shop.orders (customer_id, total, status)
VALUES (1, 1500.00, 'pending');
5. Verify Each Sink
# Kafka
./dev.sh k-consume orders.cdc --from-beginning
# Redis
./dev.sh redis-read orders:cache 10
# NATS
./dev.sh nats-sub 'orders.>'
Variations
Quorum Mode
Require 2 of 3 sinks to succeed:
sinks:
- type: kafka
config:
id: kafka-1
required: true # Counts toward quorum
- type: redis
config:
id: redis-1
required: true # Counts toward quorum
- type: nats
config:
id: nats-1
required: true # Counts toward quorum
commit_policy:
mode: quorum
quorum: 2 # Any 2 must succeed
All-or-Nothing
Require all sinks to succeed (strongest consistency):
commit_policy:
mode: all
⚠️ Warning:
mode: allmeans any sink failure blocks the entire pipeline. Use only when all destinations are equally critical.
Key Concepts Demonstrated
- Multi-Sink Fan-Out: Single source to multiple destinations
- Format Adaptation: Different envelope per consumer requirement
- Selective Checkpointing:
requiredflag controls which sinks gate progress - Failure Isolation: Non-critical sinks don’t block the pipeline
- Tag-Based Enrichment: Use
event.tagsfor routing metadata
Processor Constraints: JavaScript processors can only modify
event.before,event.after, andevent.tags. Arbitrary top-level fields would be lost during serialization.
Related Documentation
- Sinks Overview - Multi-sink patterns and commit policies
- Envelopes - Output format options
- Commit Policy - Checkpoint gating modes
- Configuration Reference - Full spec documentation
Event Filtering with JavaScript
This example demonstrates using JavaScript processors to filter and selectively drop events before they reach sinks.
Overview
| Component | Configuration |
|---|---|
| Source | MySQL binlog CDC |
| Processor | JavaScript filter + redaction |
| Sink | Kafka |
| Pattern | Event filtering and transformation |
Use Case
You have a MySQL database and want to:
- Filter out events from certain tables or with specific conditions
- Drop low-value events to reduce downstream load
- Redact sensitive fields conditionally
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: filtered-orders
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.order_items
- shop.audit_log # We'll filter most of these out
processors:
- type: javascript
id: filter-and-redact
inline: |
function processBatch(events) {
return events
// 1. Filter out audit_log events except errors
.filter(event => {
if (event.table === 'shop.audit_log') {
// Only keep error-level audit events
return event.after && event.after.level === 'error';
}
return true;
})
// 2. Filter out soft-deleted records
.filter(event => {
if (event.after && event.after.deleted_at !== null) {
return false; // Drop soft-deleted records
}
return true;
})
// 3. Filter out test/staging data
.filter(event => {
if (event.after && event.after.email) {
// Drop test accounts
if (event.after.email.endsWith('@test.local')) {
return false;
}
}
return true;
})
// 4. Transform remaining events
.map(event => {
// Redact PII for non-admin tables
if (event.after) {
if (event.after.email) {
event.after.email = maskEmail(event.after.email);
}
if (event.after.phone) {
event.after.phone = '[redacted]';
}
}
// Add filter tag (tags is a valid Event field)
event.tags = (event.tags || []).concat(['filtered']);
return event;
});
}
// helper: mask email keeping domain
function maskEmail(email) {
const [local, domain] = email.split('@');
if (!domain) return '[invalid-email]';
const masked = local.charAt(0) + '***' + local.charAt(local.length - 1);
return masked + '@' + domain;
}
limits:
timeout_ms: 1000
mem_mb: 128
sinks:
- type: kafka
config:
id: filtered-kafka
brokers: ${KAFKA_BROKERS}
topic: orders.filtered
envelope:
type: native
encoding: json
required: true
batch:
max_events: 500
max_ms: 500
respect_source_tx: true
commit_policy:
mode: required
Filter Patterns
Drop Events (Return Empty Array Element)
.filter(event => {
// return false to drop the event
if (event.table === 'internal_logs') {
return false;
}
return true;
})
Conditional Field-Based Filtering
.filter(event => {
// drop events where status is 'draft'
if (event.after && event.after.status === 'draft') {
return false;
}
return true;
})
Drop by Operation Type
.filter(event => {
// only keep inserts and updates, drop deletes
return event.op === 'c' || event.op === 'u';
})
Sample Events (Rate Limiting)
// keep only 10% of events (for high-volume tables)
.filter(event => {
if (event.table === 'high_volume_table') {
return Math.random() < 0.1;
}
return true;
})
Running the Example
1. Set Environment Variables
export MYSQL_DSN="mysql://user:password@localhost:3306/shop"
export KAFKA_BROKERS="localhost:9092"
2. Start DeltaForge
cargo run -p runner -- --config filtered-orders.yaml
3. Insert Test Data
-- This will be captured and transformed
INSERT INTO shop.orders (customer_email, total, status)
VALUES ('alice@example.com', 99.99, 'pending');
-- This will be filtered out (test account)
INSERT INTO shop.orders (customer_email, total, status)
VALUES ('test@test.local', 50.00, 'pending');
-- This will be filtered out (soft-deleted)
INSERT INTO shop.orders (customer_email, total, status, deleted_at)
VALUES ('bob@example.com', 75.00, 'pending', NOW());
-- Audit log - will be filtered out (not error level)
INSERT INTO shop.audit_log (level, message)
VALUES ('info', 'User logged in');
-- Audit log - will be kept (error level)
INSERT INTO shop.audit_log (level, message)
VALUES ('error', 'Payment failed');
4. Verify Filtered Output
./dev.sh k-consume orders.filtered --from-beginning
You should only see:
- Alice’s order (with masked email:
a***e@example.com) - The error-level audit log entry
Performance Considerations
Tip: Filtering early reduces downstream load. If you’re filtering out 50% of events, your sinks process half the data.
processors:
- type: javascript
id: filter
inline: |
function processBatch(events) {
// Filter FIRST, then transform
return events
.filter(e => shouldKeep(e)) // Reduces array size
.map(e => transform(e)); // Processes fewer events
}
limits:
timeout_ms: 500 # Increase if filtering logic is complex
mem_mb: 128 # Increase for large batches
Key Concepts Demonstrated
- Event Filtering: Drop events before they reach sinks
- Conditional Logic: Filter based on table, operation, or field values
- PII Redaction: Mask sensitive data in remaining events
- Sampling: Rate-limit high-volume event streams
Processor Constraints: JavaScript processors can only modify
event.before,event.after, andevent.tags. Arbitrary top-level fields likeevent.filtered_atwould be lost during serialization.
Related Documentation
- Processors - JavaScript processor configuration
- Envelopes - Output format options
- Configuration Reference - Full spec documentation
Schema Sensing and Drift Detection
This example demonstrates DeltaForge’s automatic schema inference for JSON columns and drift detection capabilities.
Overview
| Component | Configuration |
|---|---|
| Source | PostgreSQL logical replication |
| Processor | None |
| Sink | Kafka |
| Feature | Schema sensing with deep JSON inspection |
Use Case
You have a PostgreSQL database with JSON/JSONB columns and want to:
- Automatically discover the structure of JSON payloads
- Detect when JSON schemas change over time (drift)
- Export inferred schemas as JSON Schema for downstream validation
- Monitor schema evolution without manual tracking
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: products-with-sensing
tenant: acme
spec:
source:
type: postgres
config:
id: products-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_products
publication: products_pub
tables:
- public.products
- public.product_variants
start_position: earliest
sinks:
- type: kafka
config:
id: products-kafka
brokers: ${KAFKA_BROKERS}
topic: products.changes
envelope:
type: debezium
encoding: json
required: true
batch:
max_events: 500
max_ms: 1000
respect_source_tx: true
commit_policy:
mode: required
# Schema sensing configuration
schema_sensing:
enabled: true
deep_inspect:
enabled: true
max_depth: 5 # How deep to traverse nested JSON
max_sample_size: 500 # Events to analyze for deep inspection
sampling:
warmup_events: 100 # Analyze every event during warmup
sample_rate: 20 # After warmup, analyze 1 in 20 events
structure_cache: true # Cache seen structures to avoid re-analysis
structure_cache_size: 100
tracking:
detect_drift: true # Enable drift detection
drift_threshold: 0.1 # Alert if >10% of events have new fields
output:
emit_schemas: true # Include schema info in API responses
json_schema_format: draft-07
Table Structure
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
sku TEXT UNIQUE,
price DECIMAL(10,2),
-- JSON columns that schema sensing will analyze
metadata JSONB, -- Product attributes, tags, etc.
specifications JSONB, -- Technical specs
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE product_variants (
id SERIAL PRIMARY KEY,
product_id INTEGER REFERENCES products(id),
variant_name TEXT,
-- Nested JSON with variable structure
attributes JSONB, -- Color, size, material, etc.
pricing JSONB, -- Regional pricing, discounts
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Create publication
CREATE PUBLICATION products_pub FOR TABLE products, product_variants;
Sample Data
-- Insert products with JSON metadata
INSERT INTO products (name, sku, price, metadata, specifications) VALUES
(
'Wireless Headphones',
'WH-1000',
299.99,
'{
"brand": "AudioTech",
"category": "Electronics",
"tags": ["wireless", "bluetooth", "noise-canceling"],
"ratings": {"average": 4.5, "count": 1250}
}',
'{
"battery_life_hours": 30,
"driver_size_mm": 40,
"frequency_response": {"min_hz": 20, "max_hz": 20000},
"connectivity": ["bluetooth", "aux"]
}'
);
-- Insert variant with nested attributes
INSERT INTO product_variants (product_id, variant_name, attributes, pricing) VALUES
(
1,
'Midnight Black',
'{
"color": {"name": "Midnight Black", "hex": "#1a1a2e"},
"material": "Premium Plastic",
"weight_grams": 250
}',
'{
"base_price": 299.99,
"regional": {
"US": {"price": 299.99, "currency": "USD"},
"EU": {"price": 279.99, "currency": "EUR"}
},
"discounts": [
{"code": "SAVE20", "percent": 20, "expires": "2025-12-31"}
]
}'
);
Running the Example
1. Set Environment Variables
export POSTGRES_DSN="postgres://user:password@localhost:5432/shop"
export KAFKA_BROKERS="localhost:9092"
2. Start DeltaForge
cargo run -p runner -- --config products-sensing.yaml
3. Insert Data and Let Sensing Analyze
-- Insert several products to build schema profile
INSERT INTO products (name, sku, price, metadata, specifications) VALUES
('Smart Watch', 'SW-200', 399.99,
'{"brand": "TechWear", "tags": ["fitness", "smart"]}',
'{"battery_days": 7, "water_resistant": true}');
Using the Schema Sensing API
List Inferred Schemas
curl http://localhost:8080/pipelines/products-with-sensing/sensing/schemas
Response:
{
"schemas": [
{
"table": "public.products",
"columns": {
"metadata": {
"type": "object",
"inferred_at": "2025-01-15T10:30:00Z",
"sample_count": 150
},
"specifications": {
"type": "object",
"inferred_at": "2025-01-15T10:30:00Z",
"sample_count": 150
}
}
}
]
}
Get Detailed Schema for a Table
curl http://localhost:8080/pipelines/products-with-sensing/sensing/schemas/public.products
Response:
{
"table": "public.products",
"json_columns": {
"metadata": {
"inferred_schema": {
"type": "object",
"properties": {
"brand": {"type": "string"},
"category": {"type": "string"},
"tags": {"type": "array", "items": {"type": "string"}},
"ratings": {
"type": "object",
"properties": {
"average": {"type": "number"},
"count": {"type": "integer"}
}
}
}
},
"sample_count": 150,
"last_updated": "2025-01-15T10:35:00Z"
}
}
}
Export as JSON Schema
curl http://localhost:8080/pipelines/products-with-sensing/sensing/schemas/public.products/json-schema
Response:
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "public.products.metadata",
"type": "object",
"properties": {
"brand": {"type": "string"},
"category": {"type": "string"},
"tags": {
"type": "array",
"items": {"type": "string"}
},
"ratings": {
"type": "object",
"properties": {
"average": {"type": "number"},
"count": {"type": "integer"}
}
}
}
}
Check Drift Detection
curl http://localhost:8080/pipelines/products-with-sensing/drift
Response:
{
"drift_detected": true,
"tables": {
"public.products": {
"metadata": {
"new_fields": ["promotion", "seasonal"],
"removed_fields": [],
"type_changes": [],
"drift_percentage": 0.15,
"first_seen": "2025-01-15T11:00:00Z"
}
}
}
}
Get Sensing Statistics
curl http://localhost:8080/pipelines/products-with-sensing/sensing/stats
Response:
{
"total_events_analyzed": 1500,
"total_events_sampled": 250,
"cache_hits": 1250,
"cache_misses": 250,
"tables_tracked": 2,
"json_columns_tracked": 4
}
Performance Tuning
Performance tip: Schema sensing can be CPU-intensive. Tune based on your throughput needs.
High-Throughput Configuration
schema_sensing:
enabled: true
deep_inspect:
enabled: true
max_depth: 3 # Limit depth for faster processing
max_sample_size: 200 # Fewer samples
sampling:
warmup_events: 50 # Shorter warmup
sample_rate: 100 # Analyze 1 in 100 events
structure_cache: true
structure_cache_size: 200 # Larger cache
Development/Debugging Configuration
schema_sensing:
enabled: true
deep_inspect:
enabled: true
max_depth: 10 # Full depth
max_sample_size: 1000 # More samples
sampling:
warmup_events: 500 # Longer warmup
sample_rate: 1 # Analyze every event
Key Concepts Demonstrated
- Automatic Schema Inference: Discover JSON structure without manual definition
- Deep JSON Inspection: Traverse nested objects and arrays
- Drift Detection: Alert when schemas change unexpectedly
- JSON Schema Export: Generate standard schemas for validation
- Sampling Strategy: Balance accuracy vs. performance
Related Documentation
- Schema Sensing - Detailed schema sensing documentation
- PostgreSQL Source - Replication setup
- Configuration Reference - Full sensing options
- API Reference - All sensing endpoints
Production Kafka Configuration
This example demonstrates a production-ready Kafka sink configuration with authentication, high availability settings, and some performance tuning.
Overview
| Component | Configuration |
|---|---|
| Source | PostgreSQL logical replication |
| Processor | None |
| Sink | Kafka with SASL/SSL authentication |
| Pattern | Production-grade reliability |
Use Case
You’re deploying DeltaForge to production and need:
- Secure authentication (SASL/SCRAM or mTLS)
- High availability with proper acknowledgment settings
- Optimal batching and compression for throughput
- Exactly-once semantics for critical data
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-to-kafka-prod
tenant: acme
spec:
source:
type: postgres
config:
id: orders-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_orders
publication: orders_pub
tables:
- public.orders
- public.order_items
- public.payments
start_position: earliest
sinks:
- type: kafka
config:
id: orders-kafka
brokers: ${KAFKA_BROKERS}
topic: orders.cdc.events
envelope:
type: debezium
encoding: json
required: true
# enable exactly-once semantics
exactly_once: true
# timeout for individual sends
send_timeout_secs: 60
# librdkafka client configuration
client_conf:
# security - SASL/SCRAM authentication
security.protocol: "SASL_SSL"
sasl.mechanism: "SCRAM-SHA-512"
sasl.username: "${KAFKA_USERNAME}"
sasl.password: "${KAFKA_PASSWORD}"
# SSL/TLS configuration
ssl.ca.location: "/etc/ssl/certs/kafka-ca.pem"
ssl.endpoint.identification.algorithm: "https"
# reliability - wait for all replicas
acks: "all"
# idempotence (required for exactly-once)
enable.idempotence: "true"
# retries and timeouts
retries: "2147483647"
retry.backoff.ms: "100"
delivery.timeout.ms: "300000"
request.timeout.ms: "30000"
# kafka batching for throughput
batch.size: "65536"
linger.ms: "10"
# compression
compression.type: "lz4"
# buffer management
queue.buffering.max.messages: "100000"
queue.buffering.max.kbytes: "1048576"
batch:
max_events: 1000
max_bytes: 1048576
max_ms: 100
respect_source_tx: true
max_inflight: 4
commit_policy:
mode: required
Security Configurations
SASL/SCRAM (Username/Password)
client_conf:
security.protocol: "SASL_SSL"
sasl.mechanism: "SCRAM-SHA-512"
sasl.username: "${KAFKA_USERNAME}"
sasl.password: "${KAFKA_PASSWORD}"
ssl.ca.location: "/etc/ssl/certs/ca.pem"
mTLS (Mutual TLS)
client_conf:
security.protocol: "SSL"
ssl.ca.location: "/etc/ssl/certs/kafka-ca.pem"
ssl.certificate.location: "/etc/ssl/certs/client.pem"
ssl.key.location: "/etc/ssl/private/client.key"
ssl.key.password: "${SSL_KEY_PASSWORD}"
AWS MSK with IAM
client_conf:
security.protocol: "SASL_SSL"
sasl.mechanism: "AWS_MSK_IAM"
sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required;"
sasl.client.callback.handler.class: "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
Confluent Cloud
client_conf:
security.protocol: "SASL_SSL"
sasl.mechanism: "PLAIN"
sasl.username: "${CONFLUENT_API_KEY}"
sasl.password: "${CONFLUENT_API_SECRET}"
Performance Tuning
High Throughput
Optimize for maximum events per second:
client_conf:
acks: "1" # Leader-only ack (faster, less safe)
batch.size: "131072" # 128KB batches
linger.ms: "50" # Wait longer to fill batches
compression.type: "lz4" # Fast compression
batch:
max_events: 5000 # Larger batches
max_ms: 200 # More time to accumulate
max_inflight: 8 # More concurrent requests
Low Latency
Optimize for minimal delay:
client_conf:
acks: "1" # Don't wait for replicas
batch.size: "16384" # Smaller batches
linger.ms: "0" # Send immediately
compression.type: "none" # Skip compression
batch:
max_events: 100 # Smaller batches
max_ms: 10 # Flush quickly
max_inflight: 2 # Limit in-flight
Maximum Durability
Optimize for zero data loss:
client_conf:
acks: "all" # All replicas must ack
enable.idempotence: "true" # Prevent duplicates
max.in.flight.requests.per.connection: "5" # Required for idempotence
retries: "2147483647" # Infinite retries
exactly_once: true # Transactional producer
batch:
respect_source_tx: true # Preserve source transactions
max_inflight: 1 # Strict ordering
Running the Example
1. Set Environment Variables
export POSTGRES_DSN="postgres://user:password@localhost:5432/orders"
export KAFKA_BROKERS="kafka1:9093,kafka2:9093,kafka3:9093"
export KAFKA_USERNAME="deltaforge"
export KAFKA_PASSWORD="secret"
2. Create Kafka Topic
kafka-topics.sh --create \
--topic orders.cdc.events \
--partitions 12 \
--replication-factor 3 \
--config min.insync.replicas=2 \
--bootstrap-server ${KAFKA_BROKERS}
3. Start DeltaForge
cargo run -p runner --release -- --config kafka-prod.yaml
Monitoring
Key Metrics to Watch
deltaforge_sink_events_sent_total— Events delivereddeltaforge_sink_send_latency_seconds— Delivery latencydeltaforge_sink_errors_total— Delivery failuresdeltaforge_checkpoint_lag_events— Events pending checkpoint
Health Check
curl http://localhost:8080/health
Pipeline Status
curl http://localhost:8080/pipelines/orders-to-kafka-prod
Key Concepts Demonstrated
- SASL/SSL Authentication: Secure broker connections
- Exactly-Once Semantics: Transactional producer for no duplicates
- Acknowledgment Modes: Trade-off between durability and latency
- Batching & Compression: Optimize throughput
- Production Tuning: Real-world configuration patterns
Related Documentation
- Kafka Sink - Full configuration reference
- Envelopes - Output format options
- Observability - Metrics and monitoring
- Configuration Reference - Full spec documentation
Redis Cache Invalidation
This example demonstrates streaming database changes to a Redis stream where a worker can consume them to invalidate cache entries, ensuring cache consistency without polling.
Overview
| Component | Configuration |
|---|---|
| Source | MySQL binlog CDC |
| Processor | JavaScript cache key generator |
| Sink | Redis Streams |
| Pattern | CDC-driven cache invalidation |
Use Case
You have a MySQL database with Redis caching and want to:
- Stream change events that trigger cache invalidation
- Avoid stale cache issues without TTL-based expiration
- Generate cache keys matching your application’s key format
Architecture
┌─────────┐ ┌─────────────┐ ┌────────────────┐ ┌─────────┐
│ MySQL │────>│ DeltaForge │────>│ Redis Stream │────>│ Worker │
│ │ │ │ │ (invalidations)│ │ │
└─────────┘ └─────────────┘ └────────────────┘ └────┬────┘
│
┌───────────────┐ │
│ Redis Cache │<──────────┘
│ (DEL keys) │
└───────────────┘
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: cache-invalidation
tenant: acme
spec:
source:
type: mysql
config:
id: app-mysql
dsn: ${MYSQL_DSN}
tables:
- app.users
- app.products
- app.orders
- app.inventory
processors:
- type: javascript
id: generate-cache-keys
inline: |
function processBatch(events) {
return events.map(event => {
const keys = generateCacheKeys(event);
const strategy = getStrategy(event);
// Store cache keys in event.after metadata
// (we can modify event.after since it's a JSON Value)
if (event.after) {
event.after._cache_keys = keys;
event.after._invalidation_strategy = strategy;
} else if (event.before) {
// For deletes, add to before
event.before._cache_keys = keys;
event.before._invalidation_strategy = strategy;
}
// Add tags for routing/filtering
event.tags = (event.tags || []).concat([
'cache:invalidate',
`strategy:${strategy}`,
`keys:${keys.length}`
]);
return event;
});
}
function generateCacheKeys(event) {
const table = event.table.split('.')[1];
const keys = [];
const record = event.after || event.before;
if (!record || !record.id) return keys;
const id = record.id;
switch (table) {
case 'users':
keys.push(`user:${id}`);
if (record.email) {
keys.push(`user:email:${record.email}`);
}
// If email changed, invalidate old email key
if (event.before && event.before.email &&
event.before.email !== record.email) {
keys.push(`user:email:${event.before.email}`);
}
keys.push(`user:${id}:orders`);
keys.push(`user:${id}:profile`);
break;
case 'products':
keys.push(`product:${id}`);
keys.push(`product:${id}:details`);
if (record.category_id) {
keys.push(`category:${record.category_id}:products`);
}
break;
case 'orders':
keys.push(`order:${id}`);
if (record.user_id) {
keys.push(`user:${record.user_id}:orders`);
}
break;
case 'inventory':
keys.push(`inventory:${record.product_id}`);
keys.push(`product:${record.product_id}:stock`);
break;
}
return keys;
}
function getStrategy(event) {
const table = event.table.split('.')[1];
if (table === 'inventory') return 'immediate';
if (event.op === 'd') return 'immediate';
return 'batched';
}
limits:
timeout_ms: 500
mem_mb: 128
sinks:
- type: redis
config:
id: invalidation-stream
uri: ${REDIS_URI}
stream: cache:invalidations
envelope:
type: native
encoding: json
required: true
batch:
max_events: 100
max_ms: 100
respect_source_tx: true
commit_policy:
mode: required
JavaScript Processor Constraints
Important: The processor stores cache keys in
event.after._cache_keys(orevent.beforefor deletes) because we can only modify existing Event fields. Arbitrary top-level fields likeevent.cache_keyswould be lost.
Cache Worker (Consumer)
// cache-invalidation-worker.js
const Redis = require('ioredis');
const redis = new Redis(process.env.REDIS_URI);
const streamKey = 'cache:invalidations';
const consumerGroup = 'cache-workers';
const consumerId = `worker-${process.pid}`;
async function processInvalidations() {
try {
await redis.xgroup('CREATE', streamKey, consumerGroup, '0', 'MKSTREAM');
} catch (e) {
if (!e.message.includes('BUSYGROUP')) throw e;
}
console.log(`Starting cache invalidation worker: ${consumerId}`);
while (true) {
try {
const results = await redis.xreadgroup(
'GROUP', consumerGroup, consumerId,
'COUNT', 100, 'BLOCK', 5000,
'STREAMS', streamKey, '>'
);
if (!results) continue;
for (const [stream, messages] of results) {
for (const [id, fields] of messages) {
const event = JSON.parse(fields[1]);
await invalidateKeys(event);
await redis.xack(streamKey, consumerGroup, id);
}
}
} catch (error) {
console.error('Worker error:', error);
await new Promise(r => setTimeout(r, 1000));
}
}
}
async function invalidateKeys(event) {
// Get cache keys from the event payload
const record = event.after || event.before;
const cacheKeys = record?._cache_keys || [];
if (cacheKeys.length === 0) return;
for (const key of cacheKeys) {
if (key.includes('*')) {
await scanAndDelete(key);
} else {
await redis.del(key);
}
}
console.log(`Invalidated ${cacheKeys.length} keys for ${event.table}`);
}
async function scanAndDelete(pattern) {
let cursor = '0';
do {
const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100);
cursor = nextCursor;
if (keys.length > 0) {
await redis.del(...keys);
}
} while (cursor !== '0');
}
processInvalidations().catch(console.error);
Running the Example
1. Set Environment Variables
export MYSQL_DSN="mysql://user:password@localhost:3306/app"
export REDIS_URI="redis://localhost:6379"
2. Start DeltaForge
cargo run -p runner -- --config cache-invalidation.yaml
3. Start Worker(s)
node cache-invalidation-worker.js
4. Test Invalidation
UPDATE app.users SET email = 'alice.new@example.com' WHERE id = 1;
Worker output:
Invalidated 5 keys for app.users
Key Concepts Demonstrated
- CDC to Stream: DeltaForge captures changes and writes to Redis Streams
- Custom Key Generation: Processor computes cache keys for downstream worker
- Consumer Groups: Scalable worker processing with acknowledgments
- Before/After Diffing: Compute invalidation keys for both old and new values
Note: DeltaForge streams events to Redis; a separate worker (shown above) consumes them and performs the actual cache invalidation.
Related Documentation
- MySQL Source - Binlog configuration
- Redis Sink - Stream configuration
- Processors - JavaScript processor options
Audit Trail and Compliance Logging
This example demonstrates building an immutable audit trail for compliance requirements (SOC2, HIPAA, GDPR) by capturing all database changes with full context.
Overview
| Component | Configuration |
|---|---|
| Source | PostgreSQL logical replication |
| Processor | JavaScript audit enrichment |
| Sink | Kafka (durable audit log) |
| Pattern | Compliance-grade change tracking |
Use Case
You need to meet compliance requirements and want to:
- Capture every change to sensitive tables with before/after values
- Add audit metadata (classification, retention, regulations)
- Redact sensitive fields while preserving change records
- Store immutable audit logs for required retention periods
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: compliance-audit-trail
tenant: acme
spec:
source:
type: postgres
config:
id: app-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_audit
publication: audit_pub
tables:
- public.users
- public.user_profiles
- public.payment_methods
- public.transactions
- public.roles
- public.permissions
- public.user_roles
start_position: earliest
processors:
- type: javascript
id: audit-enrichment
inline: |
function processBatch(events) {
return events.map(event => {
const table = event.table.split('.')[1];
// Build audit tags (event.tags is a valid Event field)
const auditTags = [
'audited',
`sensitivity:${classifySensitivity(table)}`,
`classification:${getDataClassification(table)}`,
`retention:${getRetentionPeriod(table)}d`
];
// Add regulation tags
for (const reg of getApplicableRegulations(table)) {
auditTags.push(`regulation:${reg}`);
}
// Track changed fields for updates
if (event.before && event.after) {
const changed = detectChangedFields(event.before, event.after);
for (const field of changed) {
auditTags.push(`changed:${field}`);
}
}
event.tags = (event.tags || []).concat(auditTags);
// Redact sensitive fields in before/after
if (event.before) {
event.before = sanitizeRecord(event.before, table);
}
if (event.after) {
event.after = sanitizeRecord(event.after, table);
}
return event;
});
}
function classifySensitivity(table) {
const highSensitivity = ['users', 'payment_methods', 'transactions'];
const mediumSensitivity = ['user_profiles', 'user_roles'];
if (highSensitivity.includes(table)) return 'HIGH';
if (mediumSensitivity.includes(table)) return 'MEDIUM';
return 'LOW';
}
function sanitizeRecord(record, table) {
if (!record) return null;
const sanitized = { ...record };
const sensitiveFields = {
'users': ['password_hash', 'ssn', 'tax_id'],
'payment_methods': ['card_number', 'cvv', 'account_number'],
'user_profiles': ['date_of_birth']
};
const fieldsToMask = sensitiveFields[table] || [];
for (const field of fieldsToMask) {
if (sanitized[field] !== undefined) {
sanitized[`_${field}_redacted`] = true;
sanitized[field] = '[REDACTED]';
}
}
return sanitized;
}
function detectChangedFields(before, after) {
const changed = [];
const allKeys = new Set([...Object.keys(before), ...Object.keys(after)]);
for (const key of allKeys) {
if (JSON.stringify(before[key]) !== JSON.stringify(after[key])) {
changed.push(key);
}
}
return changed;
}
function getRetentionPeriod(table) {
if (['transactions', 'payment_methods'].includes(table)) return 2555;
if (['users', 'user_profiles'].includes(table)) return 2190;
return 1095;
}
function getDataClassification(table) {
if (['payment_methods', 'transactions'].includes(table)) return 'PCI';
if (['users', 'user_profiles'].includes(table)) return 'PII';
return 'INTERNAL';
}
function getApplicableRegulations(table) {
const regs = [];
if (['users', 'user_profiles'].includes(table)) regs.push('GDPR', 'CCPA');
if (['payment_methods', 'transactions'].includes(table)) regs.push('PCI-DSS', 'SOX');
return regs;
}
limits:
timeout_ms: 1000
mem_mb: 256
sinks:
- type: kafka
config:
id: audit-kafka
brokers: ${KAFKA_BROKERS}
topic: audit.trail.events
envelope:
type: debezium
encoding: json
required: true
exactly_once: true
client_conf:
acks: "all"
enable.idempotence: "true"
compression.type: "gzip"
batch:
max_events: 500
max_bytes: 1048576
max_ms: 1000
respect_source_tx: true
commit_policy:
mode: required
JavaScript Processor Constraints
Important: The JavaScript processor can only modify fields that exist on DeltaForge’s
Eventstruct. You can:
- Modify
event.beforeandevent.aftervalues (JSON objects)- Set
event.tags(array of strings)- Filter out events (return empty array)
You cannot add arbitrary top-level fields like
event.auditorevent.metadata- they will be lost during serialization. This limitation will be addressed soon.
This example uses event.tags to store audit metadata as key:value strings that downstream systems can parse.
PostgreSQL Setup
-- Create publication for audited tables
CREATE PUBLICATION audit_pub FOR TABLE
public.users,
public.user_profiles,
public.payment_methods,
public.transactions,
public.roles,
public.permissions,
public.user_roles
WITH (publish = 'insert, update, delete');
-- Enable REPLICA IDENTITY FULL to capture before values on updates
ALTER TABLE public.users REPLICA IDENTITY FULL;
ALTER TABLE public.user_profiles REPLICA IDENTITY FULL;
ALTER TABLE public.payment_methods REPLICA IDENTITY FULL;
ALTER TABLE public.transactions REPLICA IDENTITY FULL;
Sample Audit Event Output
With Debezium envelope:
{
"payload": {
"before": {
"id": 42,
"email": "alice@old-domain.com",
"name": "Alice Smith",
"password_hash": "[REDACTED]",
"_password_hash_redacted": true
},
"after": {
"id": 42,
"email": "alice@new-domain.com",
"name": "Alice Smith",
"password_hash": "[REDACTED]",
"_password_hash_redacted": true
},
"source": {
"version": "0.1.0",
"connector": "postgres",
"name": "app-postgres",
"ts_ms": 1705312199123,
"db": "app",
"schema": "public",
"table": "users"
},
"op": "u",
"ts_ms": 1705312200000,
"tags": [
"audited",
"sensitivity:HIGH",
"classification:PII",
"retention:2190d",
"regulation:GDPR",
"regulation:CCPA",
"changed:email"
]
}
}
Parsing Audit Tags
Downstream consumers can parse the structured tags:
// Parse audit tags into an object
function parseAuditTags(tags) {
const audit = {};
for (const tag of tags || []) {
if (tag.includes(':')) {
const [key, value] = tag.split(':', 2);
if (audit[key]) {
if (!Array.isArray(audit[key])) {
audit[key] = [audit[key]];
}
audit[key].push(value);
} else {
audit[key] = value;
}
} else {
audit[tag] = true;
}
}
return audit;
}
// Example: parseAuditTags(event.tags)
// Returns: {
// audited: true,
// sensitivity: "HIGH",
// classification: "PII",
// retention: "2190d",
// regulation: ["GDPR", "CCPA"],
// changed: "email"
// }
Kafka Topic Configuration
kafka-topics.sh --create \
--topic audit.trail.events \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=189216000000 \
--config cleanup.policy=delete \
--config min.insync.replicas=2 \
--config compression.type=gzip \
--bootstrap-server ${KAFKA_BROKERS}
Running the Example
1. Set Environment Variables
export POSTGRES_DSN="postgres://user:password@localhost:5432/app"
export KAFKA_BROKERS="kafka1:9092,kafka2:9092,kafka3:9092"
2. Start DeltaForge
cargo run -p runner -- --config audit-trail.yaml
3. Verify Audit Events
./dev.sh k-consume audit.trail.events --from-beginning
Querying Audit Data
-- Find high-sensitivity changes
SELECT * FROM audit_events
WHERE ARRAY_CONTAINS(payload.tags, 'sensitivity:HIGH');
-- Find all email changes
SELECT * FROM audit_events
WHERE ARRAY_CONTAINS(payload.tags, 'changed:email');
-- Find PCI-regulated changes
SELECT * FROM audit_events
WHERE ARRAY_CONTAINS(payload.tags, 'regulation:PCI-DSS');
Key Concepts Demonstrated
- Full Change Capture: Before and after values with REPLICA IDENTITY FULL
- PII Redaction: Sensitive fields masked, presence tracked via
_field_redacted - Tag-Based Metadata: Audit info stored in
event.tagsas parseable strings - Immutable Storage: Exactly-once delivery to append-only Kafka log
- Compliance Tagging: Retention periods, classifications, regulations as tags
Related Documentation
- PostgreSQL Source - Logical replication setup
- Kafka Sink - Exactly-once and durability settings
- Processors - JavaScript processor constraints
- Envelopes - Output format options
Real-Time Analytics Preprocessing Pipeline
This example demonstrates preparing CDC events for real-time analytics by enriching events with dimensions, metrics, and routing tags (pre-processing).
Overview
| Component | Configuration |
|---|---|
| Source | MySQL binlog CDC |
| Processor | JavaScript analytics enrichment |
| Sinks | Kafka (stream processing) + Redis (real-time counters) |
| Pattern | Analytics-ready event preparation |
Use Case
You have an e-commerce MySQL database and want to:
- Stream order events to real-time dashboards
- Feed a worker that maintains live counters in Redis
- Prepare events for stream processing (Flink, Spark Streaming)
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: ecommerce-analytics
tenant: acme
spec:
source:
type: mysql
config:
id: ecommerce-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.order_items
- shop.payments
- shop.cart_events
processors:
- type: javascript
id: analytics-enrichment
inline: |
function processBatch(events) {
return events.map(event => {
const table = event.table.split('.')[1];
const record = event.after || event.before;
// Add analytics metadata to event.after
// (we can modify event.after since it's a JSON Value)
if (event.after) {
event.after._analytics = {
event_type: `${table}.${mapOperation(event.op)}`,
dimensions: extractDimensions(table, record, event),
metrics: extractMetrics(table, record)
};
}
// Add routing tags
event.tags = generateTags(table, record, event);
return event;
});
}
function mapOperation(op) {
return { 'c': 'created', 'u': 'updated', 'd': 'deleted', 'r': 'snapshot' }[op] || op;
}
function extractDimensions(table, record, event) {
if (!record) return {};
const dims = {
hour_of_day: new Date(event.timestamp).getUTCHours(),
day_of_week: new Date(event.timestamp).getUTCDay()
};
switch (table) {
case 'orders':
dims.customer_id = record.customer_id;
dims.status = record.status;
dims.channel = record.channel || 'web';
break;
case 'order_items':
dims.order_id = record.order_id;
dims.product_id = record.product_id;
break;
case 'payments':
dims.order_id = record.order_id;
dims.payment_method = record.method;
break;
}
return dims;
}
function extractMetrics(table, record) {
if (!record) return {};
const metrics = { event_count: 1 };
switch (table) {
case 'orders':
metrics.order_total = parseFloat(record.total) || 0;
metrics.item_count = parseInt(record.item_count) || 0;
break;
case 'order_items':
metrics.quantity = parseInt(record.quantity) || 1;
metrics.line_total = parseFloat(record.line_total) || 0;
break;
case 'payments':
metrics.payment_amount = parseFloat(record.amount) || 0;
break;
}
return metrics;
}
function generateTags(table, record, event) {
const tags = [table, event.op];
if (!record) return tags;
if (table === 'orders' && record.total > 500) {
tags.push('high_value');
}
if (record.status) {
tags.push(`status:${record.status}`);
}
return tags;
}
limits:
timeout_ms: 500
mem_mb: 256
sinks:
- type: kafka
config:
id: analytics-kafka
brokers: ${KAFKA_BROKERS}
topic: analytics.events
envelope:
type: native
encoding: json
required: true
client_conf:
compression.type: "lz4"
- type: redis
config:
id: realtime-redis
uri: ${REDIS_URI}
stream: analytics:realtime
envelope:
type: native
encoding: json
required: false
batch:
max_events: 500
max_bytes: 1048576
max_ms: 100
respect_source_tx: false
commit_policy:
mode: required
JavaScript Processor Constraints
Important: Analytics metadata is stored in
event.after._analyticsbecause the processor can only modify existing Event fields (before,after,tags). Arbitrary top-level fields would be lost during serialization.
Sample Event Output
{
"before": null,
"after": {
"id": 98765,
"customer_id": 12345,
"total": 299.99,
"status": "pending",
"channel": "mobile",
"_analytics": {
"event_type": "orders.created",
"dimensions": {
"hour_of_day": 10,
"day_of_week": 3,
"customer_id": 12345,
"status": "pending",
"channel": "mobile"
},
"metrics": {
"event_count": 1,
"order_total": 299.99,
"item_count": 3
}
}
},
"source": {
"connector": "mysql",
"db": "shop",
"table": "orders"
},
"op": "c",
"ts_ms": 1705312200000,
"tags": ["orders", "c", "status:pending"]
}
Redis Counter Worker
const Redis = require('ioredis');
const redis = new Redis(process.env.REDIS_URI);
async function processAnalyticsEvents() {
let lastId = '0';
while (true) {
const results = await redis.xread(
'COUNT', 100, 'BLOCK', 1000,
'STREAMS', 'analytics:realtime', lastId
);
if (!results) continue;
for (const [stream, messages] of results) {
for (const [id, fields] of messages) {
const event = JSON.parse(fields[1]);
await updateCounters(event);
lastId = id;
}
}
}
}
async function updateCounters(event) {
const pipe = redis.pipeline();
const now = new Date();
const hourKey = `${now.getUTCFullYear()}:${now.getUTCMonth()+1}:${now.getUTCDate()}:${now.getUTCHours()}`;
const dayKey = `${now.getUTCFullYear()}:${now.getUTCMonth()+1}:${now.getUTCDate()}`;
// Get analytics from event.after
const analytics = event.after?._analytics || {};
const eventType = analytics.event_type || `${event.table}.${event.op}`;
const metrics = analytics.metrics || {};
const dimensions = analytics.dimensions || {};
// Event counts
pipe.hincrby(`stats:events:${hourKey}`, eventType, 1);
// Order-specific counters
if (eventType === 'orders.created') {
pipe.incr(`stats:orders:count:${hourKey}`);
pipe.incrbyfloat(`stats:orders:revenue:${hourKey}`, metrics.order_total || 0);
if (dimensions.channel) {
pipe.hincrby(`stats:orders:channel:${dayKey}`, dimensions.channel, 1);
}
}
// High-value alerts
if (event.tags?.includes('high_value')) {
pipe.lpush('alerts:high_value_orders', JSON.stringify({
order_id: event.after?.id,
total: metrics.order_total
}));
pipe.ltrim('alerts:high_value_orders', 0, 99);
}
pipe.expire(`stats:events:${hourKey}`, 90000);
await pipe.exec();
}
processAnalyticsEvents().catch(console.error);
Running the Example
1. Set Environment Variables
export MYSQL_DSN="mysql://user:password@localhost:3306/shop"
export KAFKA_BROKERS="localhost:9092"
export REDIS_URI="redis://localhost:6379"
2. Create Kafka Topic
./dev.sh k-create analytics.events 12
3. Start DeltaForge
cargo run -p runner -- --config analytics-pipeline.yaml
4. Start Counter Worker
node realtime-counter-worker.js
Key Concepts Demonstrated
- Event Enrichment: Add dimensions/metrics in
event.after._analytics - Tag-Based Filtering: Use
event.tagsfor high-value order detection - Multi-Sink Fan-Out: Kafka for stream processing + Redis for worker consumption
- Worker Pattern: Separate worker consumes Redis stream to update counters
Related Documentation
- MySQL Source - Binlog configuration
- Kafka Sink - Producer settings
- Redis Sink - Stream configuration
Outbox Pattern
This example demonstrates the transactional outbox pattern - writing business data and a domain event in the same database transaction, then streaming the event to Kafka via DeltaForge.
Overview
| Component | Configuration |
|---|---|
| Source | MySQL binlog CDC |
| Processor | Outbox (extract + route) |
| Sink | Kafka (per-aggregate topics) |
| Pattern | Transactional outbox with raw payload delivery |
Use Case
Your application writes orders and needs to publish OrderCreated, OrderShipped, etc. events to Kafka. You want:
- Atomicity: event published if and only if the transaction commits
- Clean payloads: consumers receive the application’s JSON, not CDC envelopes
- Per-aggregate routing: events land on
Order.OrderCreated,Payment.PaymentReceived, etc. - Zero polling: DeltaForge tails the binlog, no application-side outbox relay
Database Setup
-- Business table
CREATE TABLE orders (
id INT AUTO_INCREMENT PRIMARY KEY,
customer VARCHAR(64),
total DECIMAL(10,2),
status VARCHAR(32),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Outbox table (BLACKHOLE = binlog only, no disk storage)
CREATE TABLE outbox (
id INT AUTO_INCREMENT PRIMARY KEY,
aggregate_type VARCHAR(64),
aggregate_id VARCHAR(64),
event_type VARCHAR(64),
payload JSON
) ENGINE=BLACKHOLE;
Application Code
Write both in the same transaction:
BEGIN;
INSERT INTO orders (customer, total, status)
VALUES ('alice', 149.99, 'pending');
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES (
'Order',
LAST_INSERT_ID(),
'OrderCreated',
JSON_OBJECT('customer', 'alice', 'total', 149.99, 'status', 'pending')
);
COMMIT;
If the transaction rolls back, neither the order nor the event exist. If it commits, both do.
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-outbox
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.outbox
outbox:
tables: ["shop.outbox"]
processors:
- type: outbox
topic: "${aggregate_type}.${event_type}"
default_topic: events.unrouted
raw_payload: true
sinks:
- type: kafka
config:
id: events-kafka
brokers: ${KAFKA_BROKERS}
topic: "cdc.${source.db}.${source.table}"
envelope:
type: debezium
encoding: json
required: true
batch:
max_events: 500
max_ms: 500
respect_source_tx: true
commit_policy:
mode: required
What lands on Kafka
Outbox event –> topic Order.OrderCreated:
{"customer": "alice", "total": 149.99, "status": "pending"}
Raw payload, no envelope. Kafka headers carry metadata: df-aggregate-type: Order, df-aggregate-id: 1, df-event-type: OrderCreated.
CDC event –> topic cdc.shop.orders (from sink’s topic template):
{
"payload": {
"before": null,
"after": {"id": 1, "customer": "alice", "total": 149.99, "status": "pending"},
"source": {"connector": "mysql", "db": "shop", "table": "orders"},
"op": "c",
"ts_ms": 1700000000000
}
}
Full Debezium envelope. Both flow through the same pipeline - the outbox processor only touches events tagged as outbox.
Running the Example
1. Start Infrastructure
./dev.sh up
./dev.sh k-create Order.OrderCreated 3
./dev.sh k-create Order.OrderShipped 3
./dev.sh k-create cdc.shop.orders 3
2. Set Environment Variables
export MYSQL_DSN="mysql://deltaforge:dfpw@localhost:3306/shop"
export KAFKA_BROKERS="localhost:9092"
3. Start DeltaForge
cargo run -p runner -- --config outbox.yaml
4. Insert Test Data
BEGIN;
INSERT INTO shop.orders (customer, total, status) VALUES ('alice', 149.99, 'pending');
INSERT INTO shop.outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES ('Order', LAST_INSERT_ID(), 'OrderCreated',
'{"customer":"alice","total":149.99,"status":"pending"}');
COMMIT;
5. Verify
# Outbox event (raw payload, per-aggregate topic)
./dev.sh k-inspect Order.OrderCreated
# CDC event (Debezium envelope, per-table topic)
./dev.sh k-inspect cdc.shop.orders
Variations
PostgreSQL with WAL Messages
PostgreSQL doesn’t need an outbox table - write directly to the WAL:
BEGIN;
INSERT INTO orders (customer, total, status) VALUES ('alice', 149.99, 'pending');
SELECT pg_logical_emit_message(
true, 'outbox',
'{"aggregate_type":"Order","aggregate_id":"1","event_type":"OrderCreated","payload":{"customer":"alice","total":149.99}}'
);
COMMIT;
source:
type: postgres
config:
id: orders-pg
dsn: ${POSTGRES_DSN}
slot: deltaforge_orders
publication: orders_pub
tables: [public.orders]
outbox:
prefixes: [outbox]
No table, no index, no vacuum - just a WAL entry.
Multiple Outbox Channels
Route order events and payment events to different topic hierarchies:
processors:
- type: outbox
tables: [orders_outbox]
topic: "orders.${event_type}"
raw_payload: true
- type: outbox
tables: [payments_outbox]
topic: "payments.${event_type}"
raw_payload: true
columns:
payload: data
Additional Headers for Tracing
Forward trace and correlation IDs as Kafka headers:
processors:
- type: outbox
topic: "${aggregate_type}.${event_type}"
raw_payload: true
additional_headers:
x-trace-id: trace_id
x-correlation-id: correlation_id
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload, trace_id, correlation_id)
VALUES ('Order', '42', 'OrderCreated', '{"total":99.99}', 'abc-123', 'req-456');
Migrating from Debezium
If your outbox table uses Debezium’s column names (aggregatetype, aggregateid, type):
processors:
- type: outbox
topic: "${aggregatetype}.${type}"
raw_payload: true
columns:
aggregate_type: aggregatetype
aggregate_id: aggregateid
event_type: type
additional_headers:
x-trace-id: traceid
x-tenant: tenant
Column mappings control header extraction. The topic template uses raw column names directly.
Key Concepts Demonstrated
- Transactional outbox: atomicity without distributed transactions
- BLACKHOLE engine: binlog-only storage for zero-cost outbox tables on MySQL
- Raw payload delivery:
raw_payload: truebypasses envelope wrapping - consumers get exactly what the application wrote - Mixed pipeline: outbox events and CDC events coexist in the same pipeline with different serialization
- Per-aggregate routing: topic template routes events by
aggregate_typeandevent_type
Related Documentation
- Outbox Pattern - Full outbox reference with configuration details
- MySQL Source - Binlog prerequisites
- PostgreSQL Source - WAL message setup
- Dynamic Routing - Template syntax for topic/key resolution
- Envelopes - Native, Debezium, CloudEvents formats
Troubleshooting
Common issues and quick checks when running DeltaForge.
- 🩺 Health-first: start with
/healthzand/readyzto pinpoint failing components.
Runner fails to start
- Confirm the config path passed to
--configexists and is readable. - Validate YAML syntax and that required fields like
metadata.nameandspec.sourceare present. - Ensure environment variables referenced in the spec are set (
dsn,brokers,uri, etc.).
Pipelines remain unready
- Check the
/readyzendpoint for per-pipeline status and error messages. - Verify upstream credentials allow replication (MySQL binlog). Other engines are experimental unless explicitly documented.
- Inspect sink connectivity; a required sink that cannot connect will block checkpoints.
Slow throughput
- Increase
batch.max_eventsorbatch.max_bytesto reduce flush frequency. - Adjust
max_inflightto allow more concurrent batches if sinks can handle parallelism. - Reduce processor work or add guardrails (
limits) to prevent slow JavaScript from stalling the pipeline.
Checkpoints not advancing
- Review the commit policy:
mode: allorrequiredsinks that are unavailable will block progress. - Look for sink-specific errors (for example, Kafka broker unreachability or Redis backpressure).
- Pause and resume the pipeline to force a clean restart after addressing the underlying issue.
Development Guide
Use this guide to build, test, and extend DeltaForge. It covers local workflows, optional dependency containers, and how to work with Docker images.
All contributions are welcome and highly appreciated.
Local prerequisites
- Rust toolchain 1.89+ (install via
rustup). - Optional: Docker or Podman for running the dev dependency stack and the container image.
Workspace layout
crates/deltaforge-core: shared event model, pipeline engine, and checkpointing primitives.crates/deltaforge-config: YAML config parsing, environment variable expansion, and pipeline spec types.crates/sources: database CDC readers (MySQL binlog, Postgres logical replication) implemented as pluggable sources.crates/processors: JavaScript-based processors and support code for transforming batches.crates/sinks: sink implementations (Kafka producer, Redis streams, NATS JetStream) plus sink utilities.crates/rest-api: HTTP control plane with health/readiness and pipeline lifecycle endpoints.crates/runner: CLI entrypoint that wires the runtime, metrics, and control plane together.
Use these crate boundaries as reference points when adding new sources, sinks, or pipeline behaviors.
Start dev dependencies
Bring up the optional backing services (MySQL, Kafka, Redis) with Docker Compose:
docker compose -f docker-compose.dev.yml up -d
Each service is exposed on localhost for local runs (5432, 3306, 9092, 6379). The MySQL container seeds demo data from ./init-scripts and configures binlog settings required for CDC.
Prefer the convenience dev.sh wrapper to keep common tasks consistent:
./dev.sh up # start the dependency stack
./dev.sh down # stop and remove it
./dev.sh ps # see container status
Build and test locally
Run the usual Rust workflow from the repo root:
cargo fmt --all
cargo clippy --workspace --all-targets --all-features
cargo test --workspace
Or use the helper script for a single command that mirrors CI expectations:
./dev.sh build # build project (debug)
./dev.sh build-release # build project (release)
./dev.sh run # run with examples/dev.yaml
./dev.sh fmt # format code
./dev.sh lint # clippy with warnings as errors
./dev.sh test # full test suite
./dev.sh check # fmt --check + clippy + tests (mirrors CI)
./dev.sh cov # generate coverage report
Docker images
Use pre-built images
Multi-arch images (amd64/arm64) are published to GHCR and Docker Hub:
# Minimal (~57MB, scratch-based, no shell)
docker pull ghcr.io/vnvo/deltaforge:latest
docker pull vnvohub/deltaforge:latest
# Debug (~140MB, includes shell for troubleshooting)
docker pull ghcr.io/vnvo/deltaforge:latest-debug
docker pull vnvohub/deltaforge:latest-debug
| Variant | Size | Base | Use case |
|---|---|---|---|
latest | ~57MB | scratch | Production |
latest-debug | ~140MB | debian-slim | Troubleshooting, has shell |
Build locally
Two Dockerfiles are provided:
# Minimal image (~57MB)
docker build -t deltaforge:local .
# Debug image (~140MB, includes shell)
docker build -t deltaforge:local-debug -f Dockerfile.debug .
Or use the dev helper:
./dev.sh docker # build minimal image
./dev.sh docker-debug # build debug image
./dev.sh docker-test # test minimal image runs
./dev.sh docker-test-debug # test debug image runs
./dev.sh docker-all # build and test all variants
./dev.sh docker-shell # open shell in debug container
Build multi-arch locally
To build for both amd64 and arm64:
./dev.sh docker-multi-setup # create buildx builder (once)
./dev.sh docker-multi # build both architectures
Note: Multi-arch builds use QEMU emulation and take ~30-35 minutes. The images are not loaded locally - use --push to push to a registry.
Run the image
Run the container by mounting your pipeline specs and exposing the API and metrics ports:
docker run --rm \
-p 8080:8080 -p 9000:9000 \
-v $(pwd)/examples/dev.yaml:/etc/deltaforge/pipeline.yaml:ro \
-v deltaforge-checkpoints:/app/data \
ghcr.io/vnvo/deltaforge:latest \
--config /etc/deltaforge/pipeline.yaml
Notes:
- The container listens on
0.0.0.0:8080for the control plane API with metrics on:9000. - Checkpoints are written to
/app/data/df_checkpoints.json; mount a volume to persist them across restarts. - Environment variables inside the YAML are expanded before parsing.
- Pass any other runner flags as needed (e.g.,
--api-addror--metrics-addr).
Debug a running container
Use the debug image to troubleshoot:
# Run with shell access
docker run --rm -it --entrypoint /bin/bash ghcr.io/vnvo/deltaforge:latest-debug
# Exec into a running container
docker exec -it <container_id> /bin/bash
Dev helper commands
The dev.sh script provides shortcuts for common tasks:
./dev.sh help # show all commands
Infrastructure
./dev.sh up # start MySQL, Kafka, Redis
./dev.sh down # stop and remove containers
./dev.sh ps # list running services
Kafka
./dev.sh k-list # list topics
./dev.sh k-create <topic> # create topic
./dev.sh k-consume <topic> --from-beginning
./dev.sh k-produce <topic> # interactive producer
Redis
./dev.sh redis-cli # open redis-cli
./dev.sh redis-read <stream> # read from stream
Database shells
./dev.sh pg-sh # psql into Postgres
./dev.sh mysql-sh # mysql into MySQL
Documentation
./dev.sh docs # serve docs locally (opens browser)
./dev.sh docs-build # build docs
Pre-release checks
./dev.sh release-check # run all checks + build all Docker variants
Contributing
- Fork the repository
- Create a branch from
main(e.g.,feature/new-sink,fix/checkpoint-bug) - Make your changes
- Run
./dev.sh checkto ensure CI will pass - Submit a PR against
main
Things to Remember
Tests
There a few #[ignore] tests, run them when making deep changes to the sources, pipeline coordination and anything with impact on core functionality.
Logging hygiene
- Include
pipeline,tenant,source_id/sink_id, andbatch_idfields on all warnings/errors to make traces joinable in log aggregation tools. - Normalize retry/backoff logs so they include the attempt count and sleep duration; consider a structured
reasonfield alongside error details for dashboards. - Add info-level summaries on interval (e.g., every N batches) reporting batches processed, average batch size, lag, and sink latency percentiles pulled from the metrics registry to create human-friendly breadcrumbs.
- Add metrics in a backward-compatible way: prefer new metric names over redefining existing ones to avoid breaking dashboards. Validate cardinality (bounded label sets) before merging.
- Gate noisy logs behind levels (
debugfor per-event traces,infofor batch summaries,warn/errorfor retries and failures). - Exercise the new metrics in integration tests by asserting counters change when sending synthetic events through pipelines.