Introduction
DeltaForge is a versatile, high-performance Change Data Capture (CDC) engine built in Rust. It streams database changes into downstream systems like Kafka, Redis, NATS, and HTTP endpoints - giving you full control over how events are routed, transformed, and delivered. Supports JSON and Avro encoding (with Confluent Schema Registry), end-to-end exactly-once delivery via Kafka transactions, and built-in schema discovery that automatically infers and tracks the shape of your data as it flows through.
Pipelines are defined declaratively in YAML, making it straightforward to onboard new use cases without custom code.
| Built with | Sources | Processors | Sinks | Output Formats |
|
Rust |
MySQL · PostgreSQL |
JavaScript · Outbox |
Kafka · Redis · NATS · HTTP |
|
Why DeltaForge?
Core Capabilities
- ⚡ Powered by Rust : Predictable performance, memory safety, and minimal resource footprint.
- 🔌 Pluggable architecture : Sources, processors, and sinks are modular and independently extensible.
- 🧩 Declarative pipelines : Define sources, transforms, sinks, and commit policies in version-controlled YAML with environment variable expansion for secrets.
- 📦 Reliable checkpointing : Per-sink independent checkpoints. Exactly-once delivery via Kafka transactions. At-least-once with dedup for NATS and Redis.
- 🔁 Avro encoding : Confluent wire format with Schema Registry. DDL-derived schemas with exact types and nullability. Safe defaults for unsigned integers, enums, and timestamps.
- 🪦 Dead letter queue : Poison events routed to DLQ instead of blocking the pipeline. REST API for inspection, filtering, and replay.
- 🔀 Dynamic routing : Route events to per-table topics, streams, or subjects using templates or JavaScript logic.
- 📤 Transactional outbox : Publish domain events atomically with database writes. Per-aggregate routing, raw payload delivery, zero polling.
- 🛠️ Cloud-native ready : Single binary, Docker images, JSON logs, Prometheus metrics, and liveness/readiness probes for Kubernetes.
Schema Intelligence
- 🔍 Schema sensing : Automatically infer and track schema from event payloads, including deep inspection of nested JSON structures.
- 🗺️ High-cardinality handling : Detect and normalize dynamic map keys (session IDs, trace IDs) to prevent false schema evolution events.
- 🏷️ Schema fingerprinting : SHA-256 based change detection with schema-to-checkpoint correlation for reliable replay.
- 🗃️ Source-owned semantics : Preserves native database types (PostgreSQL arrays, MySQL JSON, etc.) instead of normalizing to a universal type system.
Operational Features
- 🔄 Graceful failover : Handles source failover with automatic schema revalidation - no manual intervention needed.
- 🧬 Zero-downtime schema evolution : Detects DDL changes and reloads schemas automatically, no pipeline restart needed.
- 🎯 Flexible table selection : Wildcard patterns (
db.*,schema.prefix%) for easy onboarding. - 📀 Transaction boundaries : Optionally keep source transactions intact across batches.
- ⚙️ Commit policies : Control checkpoint behavior with
all,required, orquorummodes across multiple sinks. - 🔧 Live pipeline management : Pause, resume, patch, and inspect running pipelines via the REST API.
- 🗄️ Safe initial snapshot : Consistent parallel backfill of existing tables before streaming begins, with binlog/WAL retention validation, background guards, and crash-resume at table granularity.
Use Cases
DeltaForge is designed for:
- Real-time data synchronization : Keep caches, search indexes, and analytics systems in sync with your primary database.
- Event-driven architectures : Stream database changes to Kafka or NATS for downstream microservices.
- Transactional messaging : Use the outbox pattern to publish domain events atomically with database writes - no distributed transactions needed.
- Audit trails and compliance : Capture every mutation with full before/after images for SOC2, HIPAA, or GDPR requirements.
- Lightweight ETL : Transform, filter, and route data in-flight with JavaScript processors - no Spark or Flink cluster needed.
DeltaForge is not a DAG-based stream processor. It is a focused CDC engine meant to replace tools like Debezium when you need a lighter, cloud-native, and more customizable runtime.
Getting Started
| Guide | Description |
|---|---|
| Quickstart | Get DeltaForge running in minutes |
| CDC Overview | Understand Change Data Capture concepts |
| Configuration | Pipeline spec reference |
| Development | Build from source, contribute |
Quick Example
metadata:
name: orders-to-kafka
tenant: acme
spec:
source:
type: mysql
config:
dsn: ${MYSQL_DSN}
tables: [shop.orders]
processors:
- type: javascript
inline: |
(event) => {
event.processed_at = Date.now();
return [event];
}
sinks:
- type: kafka
config:
brokers: ${KAFKA_BROKERS}
topic: order-events
Installation
Docker (recommended):
docker pull ghcr.io/vnvo/deltaforge:latest
From source:
git clone https://github.com/vnvo/deltaforge.git
cd deltaforge
cargo build --release
See the Development Guide for detailed build instructions and available Docker image variants.
License
DeltaForge is dual-licensed under MIT and Apache 2.0.
Change Data Capture (CDC)
Change Data Capture (CDC) is the practice of streaming database mutations as ordered events so downstream systems can stay in sync without periodic full loads. Rather than asking “what does my data look like now?”, CDC answers “what changed, and when?”
DeltaForge is a CDC engine built to make log-based change streams reliable, observable, and operationally simple in modern, containerized environments. It focuses on row-level CDC from MySQL binlog and Postgres logical replication to keep consumers accurate and latency-aware while minimizing impact on source databases.
In short: DeltaForge tails committed transactions from MySQL and Postgres logs, preserves ordering and transaction boundaries, and delivers events to Kafka and Redis with checkpointed delivery, configurable batching, and Prometheus metrics - without requiring a JVM or distributed coordinator.
Why CDC matters
Traditional data integration relies on periodic batch jobs that query source systems, compare snapshots, and push differences downstream. This approach worked for decades, but modern architectures demand something better.
The batch ETL problem: A nightly sync means your analytics are always a day stale. An hourly sync still leaves gaps and hammers your production database with expensive SELECT * queries during business hours. As data volumes grow, these jobs take longer, fail more often, and compete for the same resources your customers need.
CDC flips the model. Instead of pulling data on a schedule, you subscribe to changes as they happen.
| Aspect | Batch ETL | CDC |
|---|---|---|
| Latency | Minutes to hours | Seconds to milliseconds |
| Source load | High (repeated scans) | Minimal (log tailing) |
| Data freshness | Stale between runs | Near real-time |
| Failure recovery | Re-run entire job | Resume from checkpoint |
| Change detection | Diff comparison | Native from source |
These benefits compound as systems scale and teams decentralize:
- Deterministic replay: Ordered events allow consumers to reconstruct state or power exactly-once delivery with checkpointing.
- Polyglot delivery: The same change feed can serve caches, queues, warehouses, and search indexes simultaneously without additional source queries.
How CDC works
All CDC implementations share a common goal: detect changes and emit them as events. The approaches differ in how they detect those changes.
Log-based CDC
Databases maintain transaction logs (MySQL binlog, Postgres WAL) that record every committed change for durability and replication. Log-based CDC reads these logs directly, capturing changes without touching application tables.
┌─────────────┐ commits ┌─────────────┐ tails ┌─────────────┐
│ Application │──────────────▶│ Database │─────────────▶│ CDC Engine │
└─────────────┘ │ + WAL │ └──────┬──────┘
└─────────────┘ │
▼
┌─────────────┐
│ Kafka / │
│ Redis │
└─────────────┘
Advantages:
- Zero impact on source table performance
- Captures all changes including those from triggers and stored procedures
- Preserves transaction boundaries and ordering
- Can capture deletes without soft-delete columns
Trade-offs:
- Requires database configuration (replication slots, binlog retention)
- Schema changes need careful handling
- Log retention limits how far back you can replay
DeltaForge uses log-based CDC exclusively. This allows DeltaForge to provide stronger ordering guarantees, lower source impact, and simpler operational semantics than hybrid approaches that mix log tailing with polling or triggers.
Trigger-based CDC
Database triggers fire on INSERT, UPDATE, and DELETE operations, writing change records to a shadow table that a separate process polls.
Advantages:
- Works on databases without accessible transaction logs
- Can capture application-level context unavailable in logs
Trade-offs:
- Adds write overhead to every transaction
- Triggers can be disabled or forgotten during schema migrations
- Shadow tables require maintenance and can grow unbounded
Polling-based CDC
A process periodically queries tables for rows modified since the last check, typically using an updated_at timestamp or incrementing ID.
Advantages:
- Simple to implement
- No special database configuration required
Trade-offs:
- Cannot reliably detect deletes
- Requires
updated_atcolumns on every table - Polling frequency trades off latency against database load
- Clock skew and transaction visibility can cause missed or duplicate events
Anatomy of a CDC event
A well-designed CDC event contains everything downstream consumers need to process the change correctly.
{
"id": "evt_01J7K9X2M3N4P5Q6R7S8T9U0V1",
"source": {
"database": "shop",
"table": "orders",
"server_id": "mysql-prod-1"
},
"operation": "update",
"timestamp": "2025-01-15T14:32:01.847Z",
"transaction": {
"id": "gtid:3E11FA47-71CA-11E1-9E33-C80AA9429562:42",
"position": 15847293
},
"before": {
"id": 12345,
"status": "pending",
"total": 99.99,
"updated_at": "2025-01-15T14:30:00.000Z"
},
"after": {
"id": 12345,
"status": "shipped",
"total": 99.99,
"updated_at": "2025-01-15T14:32:01.000Z"
},
"schema_version": "v3"
}
Key components:
| Field | Purpose |
|---|---|
operation | INSERT, UPDATE, DELETE, or DDL |
before / after | Row state before and after the change (enables diff logic) |
transaction | Groups changes from the same database transaction |
timestamp | When the change was committed at the source |
schema_version | Helps consumers handle schema evolution |
Not all fields are present for every operation-before is omitted for INSERTs, after is omitted for DELETEs - but the event envelope and metadata fields are consistent across MySQL and Postgres sources.
Real-world use cases
Cache invalidation and population
Problem: Your Redis cache serves product catalog data, but cache invalidation logic is scattered across dozens of services. Some forget to invalidate, others invalidate too aggressively, and debugging staleness issues takes hours.
CDC solution: Stream changes from the products table to a message queue. A dedicated consumer reads the stream and updates or deletes cache keys based on the operation type. This centralizes invalidation logic, provides replay capability for cache rebuilds, and removes cache concerns from application code entirely.
┌──────────┐ CDC ┌─────────────┐ consumer ┌─────────────┐
│ MySQL │─────────────▶│ Stream │─────────────▶│ Cache Keys │
│ products │ │ (Kafka/ │ │ product:123 │
└──────────┘ │ Redis) │ └─────────────┘
└─────────────┘
Event-driven microservices
Problem: Your order service needs to notify inventory, shipping, billing, and analytics whenever an order changes state. Direct service-to-service calls create tight coupling and cascade failures.
CDC solution: Publish order changes to a durable message queue. Each downstream service subscribes independently, processes at its own pace, and can replay events after failures or during onboarding. The order service doesn’t need to know about its consumers.
┌─────────────┐ CDC ┌─────────────┐
│ Orders │─────────────▶│ Message │
│ Database │ │ Queue │
└─────────────┘ └──────┬──────┘
┌───────────────┼───────────────┐
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Inventory │ │ Shipping │ │ Analytics │
└───────────┘ └───────────┘ └───────────┘
Search index synchronization
Problem: Your Elasticsearch index drifts from the source of truth. Full reindexing takes hours and blocks search updates during the process.
CDC solution: Stream changes continuously to keep the index synchronized. Use the before image to remove old documents and the after image to index new content.
Data warehouse incremental loads
Problem: Nightly full loads into your warehouse take 6 hours and block analysts until noon. Business users complain that dashboards show yesterday’s numbers.
CDC solution: Stream changes to a staging topic, then micro-batch into your warehouse every few minutes. Analysts get near real-time data without impacting source systems.
Audit logging and compliance
Problem: Regulations require you to maintain a complete history of changes to sensitive data. Application-level audit logging is inconsistent and can be bypassed.
CDC solution: The transaction log captures every committed change regardless of how it was made - application code, admin scripts, or direct SQL. Stream these events to immutable storage for compliance.
Cross-region replication
Problem: You need to replicate data to a secondary region for disaster recovery, but built-in replication doesn’t support the transformations you need.
CDC solution: Stream changes through a CDC pipeline that filters, transforms, and routes events to the target region’s databases or message queues.
Architecture patterns
The outbox pattern
When you need to update a database and publish an event atomically, the outbox pattern provides exactly-once semantics without distributed transactions.
┌─────────────────────────────────────────┐
│ Single Transaction │
│ ┌─────────────┐ ┌─────────────────┐ │
│ │ UPDATE │ │ INSERT INTO │ │
│ │ orders SET │ + │ outbox (event) │ │
│ │ status=... │ │ │ │
│ └─────────────┘ └─────────────────┘ │
└─────────────────────────────────────────┘
│
▼ CDC
┌─────────────┐
│ Kafka │
└─────────────┘
- The application writes business data and an event record in the same transaction.
- CDC tails the outbox table and publishes events to Kafka.
- A cleanup process (or CDC itself) removes processed outbox rows.
This guarantees that events are published if and only if the transaction commits.
When to skip the outbox: If your only requirement is to react to committed database state (not application intent), direct CDC from business tables is often simpler than introducing an outbox. The outbox pattern adds value when you need custom event payloads, explicit event versioning, or when the event schema differs significantly from table structure.
Event sourcing integration
CDC complements event sourcing by bridging legacy systems that weren’t built event-first. Stream changes from existing tables into event stores, then gradually migrate to native event sourcing.
CQRS (Command Query Responsibility Segregation)
CDC naturally supports CQRS by populating read-optimized projections from the write model. Changes flow through the CDC pipeline to update denormalized views, search indexes, or cache layers.
Challenges and solutions
Schema evolution
Databases change. Columns get added, renamed, or removed. Types change. CDC pipelines need to handle this gracefully.
Strategies:
- Schema registry: Store and version schemas centrally (e.g., Confluent Schema Registry with Avro/Protobuf).
- Forward compatibility: Add columns as nullable; avoid removing columns that consumers depend on.
- Consumer tolerance: Design consumers to ignore unknown fields and handle missing optional fields.
- Processor transforms: Use DeltaForge’s JavaScript processors to normalize schemas before sinks.
Ordering guarantees
Events must arrive in the correct order for consumers to reconstruct state accurately. A DELETE arriving before its corresponding INSERT would be catastrophic.
DeltaForge guarantees:
- Source-order preservation per table partition (always enabled)
- Transaction boundary preservation when
respect_source_tx: trueis configured in batch settings
Kafka sink uses consistent partitioning by primary key to maintain ordering within a partition at the consumer.
Delivery guarantees
Network failures, process crashes, and consumer restarts can cause duplicates or gaps. End-to-end exactly-once requires coordination between source, pipeline, and sink.
DeltaForge approach:
- Checkpoints track the last committed position in the source log.
- Configurable commit policies (
all,required,quorum) control when checkpoints advance. - Kafka: end-to-end exactly-once via transactional producer (
exactly_once: true). Consumers setisolation.level=read_committed. - NATS: at-least-once with server-side dedup via
Nats-Msg-Idheader withinduplicate_window. - Redis: at-least-once with consumer-side dedup via
idempotency_keyfield.
Default behavior: DeltaForge provides at-least-once delivery out of the box. End-to-end exactly-once is available for Kafka with exactly_once: true. See the Guarantees page for the full delivery tier matrix.
High availability
Production CDC pipelines need to handle failures without data loss or extended downtime.
Best practices:
- Run multiple pipeline instances with leader election.
- Store checkpoints in durable storage (DeltaForge persists to local files, mountable volumes in containers).
- Monitor lag between source position and checkpoint position.
- Set up alerts for pipeline failures and excessive lag.
Expectations: DeltaForge checkpoints ensure no data loss on restart, but does not currently include built-in leader election. For HA deployments, use external coordination (Kubernetes leader election, etcd locks) or run active-passive with health-check-based failover.
Backpressure
When sinks can’t keep up with the change rate, pipelines need to slow down gracefully rather than dropping events or exhausting memory.
DeltaForge handles backpressure through:
- Configurable batch sizes (
max_events,max_bytes,max_ms). - In-flight limits (
max_inflight) that bound concurrent sink writes. - Blocking reads from source when batches queue up.
Performance considerations
Batching trade-offs
| Setting | Low value | High value |
|---|---|---|
max_events | Lower latency, more overhead | Higher throughput, more latency |
max_ms | Faster flush, smaller batches | Larger batches, delayed flush |
max_bytes | Memory-safe, frequent commits | Efficient for large rows |
Start with DeltaForge defaults and tune based on observed latency and throughput.
Source database impact
Log-based CDC has minimal impact, but consider:
- Replication slot retention: Paused pipelines cause WAL/binlog accumulation.
- Connection limits: Each pipeline holds a replication connection.
- Network bandwidth: High-volume tables generate significant log traffic.
Sink throughput
- Kafka: Tune
batch.size,linger.ms, and compression inclient_conf. - Redis: Use pipelining and connection pooling for high-volume streams.
Monitoring and observability
CDC pipelines are long-running, stateful processes; without metrics and alerts, failures are silent by default. A healthy pipeline requires visibility into lag, throughput, and errors.
Key metrics to track
| Metric | Description | Alert threshold |
|---|---|---|
cdc_lag_seconds | Time between event timestamp and processing | > 60s |
events_processed_total | Throughput counter | Sudden drops |
checkpoint_lag_events | Events since last checkpoint | > 10,000 |
sink_errors_total | Failed sink writes | Any sustained errors |
batch_size_avg | Events per batch | Outside expected range |
DeltaForge exposes Prometheus metrics on the configurable metrics endpoint (default :9000).
Health checks
GET /health: Liveness probe - is the process running?GET /ready: Readiness probe - are pipelines connected and processing?GET /pipelines: Detailed status of each pipeline including configuration.
Choosing a CDC solution
When evaluating CDC tools, consider:
| Factor | Questions to ask |
|---|---|
| Source support | Does it support your databases? MySQL binlog? Postgres logical replication? |
| Sink flexibility | Can it write to your target systems? Kafka, Redis, HTTP, custom? |
| Transformation | Can you filter, enrich, or reshape events in-flight? |
| Operational overhead | How much infrastructure does it require? JVM? Distributed coordinator? |
| Resource efficiency | What’s the memory/CPU footprint per pipeline? |
| Cloud-native | Does it containerize cleanly? Support health checks? Emit metrics? |
Where DeltaForge fits
DeltaForge intentionally avoids the operational complexity of JVM-based CDC stacks (Kafka Connect-style deployments with Zookeeper, Connect workers, and converter configurations) while remaining compatible with Kafka-centric architectures.
DeltaForge is designed for teams that want:
- Lightweight runtime: Single binary, minimal memory footprint, no JVM warmup.
- Config-driven pipelines: YAML specs instead of code for common patterns.
- Inline transformation: JavaScript processors for custom logic without recompilation.
- Container-native operations: Built for Kubernetes with health endpoints and Prometheus metrics.
DeltaForge is not designed for:
- Complex DAG-based stream processing with windowed aggregations
- Stateful joins across multiple streams
- Sources beyond MySQL and Postgres (currently)
- Built-in schema registry integration (use external registries)
If you need those capabilities, consider dedicated stream processors or the broader Kafka ecosystem. DeltaForge excels at getting data out of databases and into your event infrastructure reliably and efficiently.
Getting started
Ready to try CDC with DeltaForge? Head to the Quickstart guide to run your first pipeline in minutes.
For production deployments, review the Development guide for container builds and operational best practices.
Quickstart
Get DeltaForge running in minutes.
1. Prepare a pipeline spec
Create a YAML file that defines your CDC pipeline. Environment variables are expanded at runtime, so secrets stay out of version control.
metadata:
name: orders-mysql-to-kafka
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
processors:
- type: javascript
id: transform
inline: |
(event) => {
event.tags = ["processed"];
return [event];
}
sinks:
- type: kafka
config:
id: orders-kafka
brokers: ${KAFKA_BROKERS}
topic: orders
required: true
batch:
max_events: 500
max_bytes: 1048576
max_ms: 1000
2. Start DeltaForge
Using Docker (recommended):
docker run --rm \
-p 8080:8080 -p 9000:9000 \
-e MYSQL_DSN="mysql://user:pass@host:3306/db" \
-e KAFKA_BROKERS="kafka:9092" \
-v $(pwd)/pipeline.yaml:/etc/deltaforge/pipeline.yaml:ro \
-v deltaforge-checkpoints:/app/data \
ghcr.io/vnvo/deltaforge:latest \
--config /etc/deltaforge/pipeline.yaml
From source:
cargo run -p runner -- --config ./pipeline.yaml
Runner options
| Flag | Default | Description |
|---|---|---|
--config | (required) | Path to pipeline spec file or directory |
--api-addr | 0.0.0.0:8080 | REST API address |
--metrics-addr | 0.0.0.0:9095 | Prometheus metrics address |
3. Verify it’s running
Check health and pipeline status:
# Liveness probe
curl http://localhost:8080/health
# Readiness with pipeline status
curl http://localhost:8080/ready
# List all pipelines
curl http://localhost:8080/pipelines
4. Manage pipelines
Control pipelines via the REST API:
# Pause a pipeline
curl -X POST http://localhost:8080/pipelines/orders-mysql-to-kafka/pause
# Resume a pipeline
curl -X POST http://localhost:8080/pipelines/orders-mysql-to-kafka/resume
# Stop a pipeline
curl -X POST http://localhost:8080/pipelines/orders-mysql-to-kafka/stop
Next steps
- CDC Overview : Understand how Change Data Capture works
- Configuration : Full pipeline spec reference
- Sources : MySQL and Postgres setup
- Sinks : Kafka, Redis, and NATS configuration
- Dynamic Routing : Route events to per-table destinations
- Envelopes : Native, Debezium, and CloudEvents output formats
- Development : Build from source, run locally
Configuration
Pipelines are defined as YAML documents that map directly to the PipelineSpec type. Environment variables are expanded before parsing using ${VAR} syntax, so secrets and connection strings can be injected at runtime. Unknown variables (e.g. ${source.table}) pass through for use as routing templates.
Document structure
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: <pipeline-name>
tenant: <tenant-id>
spec:
source: { ... }
processors: [ ... ]
sinks: [ ... ]
batch: { ... }
commit_policy: { ... }
schema_sensing: { ... }
Metadata
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Unique pipeline identifier. Used in API routes and metrics. |
tenant | string | Yes | Business-oriented tenant label for multi-tenancy. |
Spec fields
| Field | Type | Required | Description |
|---|---|---|---|
source | object | Yes | Database source configuration. See Sources. |
processors | array | No | Ordered list of processors. See Processors. |
sinks | array | Yes (at least one) | One or more sinks that receive each batch. See Sinks. |
sharding | object | No | Optional hint for downstream distribution. |
connection_policy | object | No | How the runtime establishes upstream connections. |
batch | object | No | Commit unit thresholds. See Batching. |
commit_policy | object | No | How sink acknowledgements gate checkpoints. See Commit policy. |
schema_sensing | object | No | Automatic schema inference from event payloads. See Schema sensing. |
journal | object | No | Event journal (DLQ). See Dead Letter Queue. |
Sources
MySQL
Captures row-level changes via binlog replication. See MySQL source documentation for prerequisites and detailed configuration.
|
|
Table patterns support SQL LIKE syntax:
db.table- exact matchdb.prefix%- tables matching prefixdb.%- all tables in database
PostgreSQL
Captures row-level changes via logical replication. See PostgreSQL source documentation for prerequisites and detailed configuration.
|
|
Note: The source-level
outboxfield only tags matching events with the__outboxsentinel. Routing and transformation are handled by theoutboxprocessor.
Processors
Processors transform events between source and sinks. They run in order and can filter, enrich, or modify events.
JavaScript
|
|
Flatten
processors:
- type: flatten
id: flat
separator: "__"
max_depth: 3
on_collision: last
empty_object: preserve
lists: preserve
empty_list: drop
| Field | Type | Default | Description |
|---|---|---|---|
id | string | "flatten" | Processor identifier |
separator | string | "__" | Separator between path segments |
max_depth | int | unlimited | Stop recursing at this depth; objects at the boundary kept as-is |
on_collision | string | last | Key collision policy: last, first, or error |
empty_object | string | preserve | Empty object policy: preserve, drop, or null |
lists | string | preserve | Array policy: preserve or index |
empty_list | string | preserve | Empty array policy: preserve, drop, or null |
Filter
processors:
- type: filter
id: only-active-orders
ops: [create, update]
tables:
include: ["shop.orders"]
exclude: ["*.tmp"]
fields:
- path: status
op: eq
value: "active"
- path: total
op: gte
value: 100
match: all
| Field | Type | Default | Description |
|---|---|---|---|
id | string | "filter" | Processor identifier |
ops | list | [] | Op types to keep. Empty = all. create, update, delete, read, truncate |
tables.include | list | [] | Table glob patterns to include. Empty = all |
tables.exclude | list | [] | Table glob patterns to exclude. Takes priority over include |
fields | list | [] | Field predicates against event.after. See Filter operators |
match | string | all | all - every predicate must match; any - at least one |
Outbox
Transforms raw outbox events into routed, sink-ready events. Requires the source to have outbox configured so events are tagged before reaching this processor. See Outbox pattern documentation for full details.
processors:
- type: outbox
id: outbox
topic: "${aggregate_type}.${event_type}"
default_topic: "events.unrouted"
raw_payload: true
columns:
payload: data
additional_headers:
x-trace-id: trace_id
| Field | Type | Default | Description |
|---|---|---|---|
id | string | "outbox" | Processor identifier |
tables | array | [] | Filter: only process outbox events matching these patterns. Empty = all outbox events. |
topic | string | — | Topic template resolved against the raw payload using ${field} placeholders |
default_topic | string | — | Fallback topic when template resolution fails and no topic column exists |
key | string | — | Key template resolved against raw payload. Default: aggregate_id value. |
raw_payload | bool | false | Deliver the extracted payload as-is to sinks, bypassing envelope wrapping |
strict | bool | false | Fail the batch if required fields are missing rather than silently falling back |
columns.payload | string | payload | Column containing the event payload |
columns.aggregate_type | string | aggregate_type | Column for aggregate type |
columns.aggregate_id | string | aggregate_id | Column for aggregate ID |
columns.event_type | string | event_type | Column for event type |
columns.topic | string | topic | Column for pre-computed topic override |
columns.event_id | string | id | Column extracted as df-event-id header |
additional_headers | map | {} | Forward extra payload fields as routing headers: header-name: column-name |
Sinks
Sinks deliver events to downstream systems. Each sink supports configurable envelope formats and wire encodings to match consumer expectations. See the Sinks documentation for detailed information on multi-sink patterns, commit policies, and failure handling.
Envelope and encoding
All sinks support these serialization options:
| Field | Type | Default | Description |
|---|---|---|---|
envelope | object | native | Output structure format. See Envelopes. |
encoding | string | json | Wire encoding format |
Envelope types:
native- Direct Debezium payload structure (default, most efficient)debezium- Full{"payload": ...}wrappercloudevents- CloudEvents 1.0 specification (requirestype_prefix)
# Native (default)
envelope:
type: native
# Debezium wrapper
envelope:
type: debezium
# CloudEvents
envelope:
type: cloudevents
type_prefix: "com.example.cdc"
Kafka
See Kafka sink documentation for detailed configuration options and best practices.
|
|
CloudEvents example:
sinks:
- type: kafka
config:
id: events-kafka
brokers: ${KAFKA_BROKERS}
topic: events
envelope:
type: cloudevents
type_prefix: "com.acme.cdc"
encoding: json
Redis
See Redis sink documentation for detailed configuration options and best practices.
|
|
NATS
See NATS sink documentation for detailed configuration options and best practices.
|
|
Batching
|
|
Commit policy
|
|
Schema sensing
Schema sensing automatically infers and tracks schema from event payloads. See the Schema Sensing documentation for detailed information on how it works, drift detection, and API endpoints.
Performance tip: Schema sensing can be CPU-intensive, especially with deep JSON inspection. Consider your throughput requirements when configuring:
- Set
enabled: falseif you don’t need runtime schema inference- Limit
deep_inspect.max_depthto avoid traversing deeply nested structures- Increase
sampling.sample_rateto analyze fewer events (e.g., 1 in 100 instead of 1 in 10)- Reduce
sampling.warmup_eventsif you’re confident in schema stability
|
|
Complete examples
MySQL to Kafka with Debezium envelope
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-mysql-to-kafka
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
processors:
- type: javascript
id: transform
inline: |
function processBatch(events) {
return events.map(event => {
event.tags = (event.tags || []).concat(["normalized"]);
return event;
});
}
limits:
cpu_ms: 50
mem_mb: 128
timeout_ms: 500
sinks:
- type: kafka
config:
id: orders-kafka
brokers: ${KAFKA_BROKERS}
topic: orders
envelope:
type: debezium
encoding: json
required: true
exactly_once: false
client_conf:
message.timeout.ms: "5000"
batch:
max_events: 500
max_bytes: 1048576
max_ms: 1000
respect_source_tx: true
max_inflight: 2
commit_policy:
mode: required
PostgreSQL to Kafka with CloudEvents
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: users-postgres-to-kafka
tenant: acme
spec:
source:
type: postgres
config:
id: users-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_users
publication: users_pub
tables:
- public.users
- public.user_sessions
start_position: earliest
sinks:
- type: kafka
config:
id: users-kafka
brokers: ${KAFKA_BROKERS}
topic: user-events
envelope:
type: cloudevents
type_prefix: "com.acme.users"
encoding: json
required: true
batch:
max_events: 500
max_ms: 1000
respect_source_tx: true
commit_policy:
mode: required
Multi-sink with different formats
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-multi-sink
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
sinks:
# Kafka Connect expects Debezium format
- type: kafka
config:
id: connect-sink
brokers: ${KAFKA_BROKERS}
topic: connect-events
envelope:
type: debezium
required: true
# Lambda expects CloudEvents
- type: kafka
config:
id: lambda-sink
brokers: ${KAFKA_BROKERS}
topic: lambda-events
envelope:
type: cloudevents
type_prefix: "com.acme.cdc"
required: false
# Redis cache uses native format
- type: redis
config:
id: cache-redis
uri: ${REDIS_URI}
stream: orders-cache
envelope:
type: native
required: false
batch:
max_events: 500
max_ms: 1000
respect_source_tx: true
commit_policy:
mode: required
MySQL to NATS
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-mysql-to-nats
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.order_items
sinks:
- type: nats
config:
id: orders-nats
url: ${NATS_URL}
subject: orders.events
stream: ORDERS
envelope:
type: native
encoding: json
required: true
batch:
max_events: 500
max_ms: 1000
respect_source_tx: true
commit_policy:
mode: required
REST API Reference
DeltaForge exposes a REST API for health checks, pipeline management, schema inspection, and drift detection. All endpoints return JSON.
Base URL
Default: http://localhost:8080
Configure with --api-addr:
deltaforge --config pipelines.yaml --api-addr 0.0.0.0:9090
Health Endpoints
Liveness Probe
GET /health
Returns ok when the process is running and all pipelines are healthy. Returns 503 if any pipeline has entered a failed state (e.g. position lost after failover, binlog purged, unrecoverable source error). Use for Kubernetes liveness probes — a 503 indicates the process should be restarted.
Response: 200 OK — all pipelines healthy
{"status": "healthy", "pipelines": 3}
Response: 503 Service Unavailable — one or more pipelines failed
{"status": "unhealthy", "failed_pipelines": ["orders-cdc"]}
Readiness Probe
GET /ready
Returns pipeline states. Use for Kubernetes readiness probes.
Response: 200 OK
{
"status": "ready",
"pipelines": [
{
"name": "orders-cdc",
"status": "running",
"spec": { ... }
}
]
}
Pipeline Management
List Pipelines
GET /pipelines
GET /pipelines?label=env:prod
GET /pipelines?label=env:prod&label=team:platform
Returns all pipelines with current status. Filter by labels with AND logic. Key-only filter (?label=env) matches any value.
Response: 200 OK
[
{
"name": "orders-cdc",
"status": "running",
"spec": {
"metadata": { "name": "orders-cdc", "tenant": "acme" },
"spec": { ... }
}
}
]
Get Pipeline
GET /pipelines/{name}
Returns a single pipeline by name with operational status.
Response: 200 OK
{
"name": "orders-cdc",
"status": "running",
"spec": { ... },
"ops": {
"uptime_seconds": 3600.5,
"dlq_entries": 0,
"sink_errors": {},
"checkpoints": [
{"sink_id": "kafka-primary", "position": {"file": "mysql-bin.000005", "pos": 12345}, "age_seconds": 0.3}
]
}
}
Errors:
404 Not Found- Pipeline doesn’t exist
Create Pipeline
POST /pipelines
Content-Type: application/json
Creates a new pipeline from a full spec.
Request:
{
"metadata": {
"name": "orders-cdc",
"tenant": "acme"
},
"spec": {
"source": {
"type": "mysql",
"config": {
"id": "mysql-1",
"dsn": "mysql://user:pass@host/db",
"tables": ["shop.orders"]
}
},
"processors": [],
"sinks": [
{
"type": "kafka",
"config": {
"id": "kafka-1",
"brokers": "localhost:9092",
"topic": "orders"
}
}
]
}
}
Response: 200 OK
{
"name": "orders-cdc",
"status": "running",
"spec": { ... }
}
Errors:
409 Conflict- Pipeline already exists
Update Pipeline
PATCH /pipelines/{name}
Content-Type: application/json
Applies a partial update to an existing pipeline. The spec is merged, the pipeline is restarted from its last saved checkpoint, and the new config takes effect immediately. Only the fields present in the request body are changed — omitted fields retain their current values.
If the pipeline is currently stopped, PATCH applies the new config and restarts it from the saved checkpoint. This is the recommended way to tune throughput settings before resuming after a planned stop.
Request:
{
"spec": {
"batch": {
"max_events": 1000,
"max_ms": 500
}
}
}
Response: 200 OK
{
"name": "orders-cdc",
"status": "running",
"spec": { ... }
}
Errors:
404 Not Found- Pipeline doesn’t exist400 Bad Request- Invalid field value or name mismatch in patch
Pause Pipeline
POST /pipelines/{name}/pause
Suspends event processing while keeping the source connection alive. No new events are consumed from the binlog/WAL. Resume restarts processing from exactly where it paused — no events are missed.
Response: 200 OK
{
"name": "orders-cdc",
"status": "paused",
"spec": { ... }
}
Resume Pipeline
POST /pipelines/{name}/resume
Resumes a paused or stopped pipeline.
- From paused — restarts event processing immediately; source connection was kept alive.
- From stopped — reconnects to the source and replays from the last saved checkpoint; any events written to the binlog/WAL while stopped are replayed in order.
Response: 200 OK
{
"name": "orders-cdc",
"status": "running",
"spec": { ... }
}
Stop Pipeline
POST /pipelines/{name}/stop
Gracefully stops a pipeline: flushes in-flight events, saves the binlog/WAL
checkpoint, and disconnects from the source. The pipeline remains in the
registry and can be resumed with POST /pipelines/{name}/resume or by
issuing a PATCH with updated config.
Use stop (rather than delete) when you intend to restart the pipeline later — for example, before a planned maintenance window or when tuning config for a backlog drain.
Response: 200 OK
{
"name": "orders-cdc",
"status": "stopped",
"spec": { ... }
}
Delete Pipeline
DELETE /pipelines/{name}
Permanently removes a pipeline from the runtime. The checkpoint is not
preserved. Use stop first if you may want to restart the pipeline later.
Response: 204 No Content
Errors:
404 Not Found- Pipeline doesn’t exist
Schema Management
List Database Schemas
GET /pipelines/{name}/schemas
Returns all tracked database schemas for a pipeline. These are the schemas loaded directly from the source database.
Response: 200 OK
[
{
"database": "shop",
"table": "orders",
"column_count": 5,
"primary_key": ["id"],
"fingerprint": "sha256:a1b2c3d4e5f6...",
"registry_version": 2
},
{
"database": "shop",
"table": "customers",
"column_count": 8,
"primary_key": ["id"],
"fingerprint": "sha256:f6e5d4c3b2a1...",
"registry_version": 1
}
]
Get Schema Details
GET /pipelines/{name}/schemas/{db}/{table}
Returns detailed schema information including all columns.
Response: 200 OK
{
"database": "shop",
"table": "orders",
"columns": [
{
"name": "id",
"type": "bigint(20) unsigned",
"nullable": false,
"default": null,
"extra": "auto_increment"
},
{
"name": "customer_id",
"type": "bigint(20)",
"nullable": false,
"default": null
}
],
"primary_key": ["id"],
"fingerprint": "sha256:a1b2c3d4..."
}
Schema Sensing
Schema sensing automatically infers schema structure from JSON event payloads. This is useful for sources that don’t provide schema metadata or for detecting schema evolution in JSON columns.
List Inferred Schemas
GET /pipelines/{name}/sensing/schemas
Returns all schemas inferred via sensing for a pipeline.
Response: 200 OK
[
{
"table": "orders",
"fingerprint": "sha256:abc123...",
"sequence": 3,
"event_count": 1500,
"stabilized": true,
"first_seen": "2025-01-15T10:30:00Z",
"last_seen": "2025-01-15T14:22:00Z"
}
]
| Field | Description |
|---|---|
table | Table name (or table:column for JSON column sensing) |
fingerprint | SHA-256 content hash of current schema |
sequence | Monotonic version number (increments on evolution) |
event_count | Total events observed |
stabilized | Whether schema has stopped sampling (structure stable) |
first_seen | First observation timestamp |
last_seen | Most recent observation timestamp |
Get Inferred Schema Details
GET /pipelines/{name}/sensing/schemas/{table}
Returns detailed inferred schema including all fields.
Response: 200 OK
{
"table": "orders",
"fingerprint": "sha256:abc123...",
"sequence": 3,
"event_count": 1500,
"stabilized": true,
"fields": [
{
"name": "id",
"types": ["integer"],
"nullable": false,
"optional": false
},
{
"name": "metadata",
"types": ["object"],
"nullable": true,
"optional": false,
"nested_field_count": 5
},
{
"name": "tags",
"types": ["array"],
"nullable": false,
"optional": true,
"array_element_types": ["string"]
}
],
"first_seen": "2025-01-15T10:30:00Z",
"last_seen": "2025-01-15T14:22:00Z"
}
Export JSON Schema
GET /pipelines/{name}/sensing/schemas/{table}/json-schema
Exports the inferred schema as a standard JSON Schema document.
Response: 200 OK
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "orders",
"type": "object",
"properties": {
"id": { "type": "integer" },
"metadata": { "type": ["object", "null"] },
"tags": {
"type": "array",
"items": { "type": "string" }
}
},
"required": ["id", "metadata"]
}
Get Sensing Cache Statistics
GET /pipelines/{name}/sensing/stats
Returns cache performance statistics for schema sensing.
Response: 200 OK
{
"tables": [
{
"table": "orders",
"cached_structures": 3,
"max_cache_size": 100,
"cache_hits": 1450,
"cache_misses": 50
}
],
"total_cache_hits": 1450,
"total_cache_misses": 50,
"hit_rate": 0.9667
}
Drift Detection
Drift detection compares expected database schema against observed data patterns to detect mismatches, unexpected nulls, and type drift.
Get Drift Results
GET /pipelines/{name}/drift
Returns drift detection results for all tables in a pipeline.
Response: 200 OK
[
{
"table": "orders",
"has_drift": true,
"columns": [
{
"column": "amount",
"expected_type": "decimal(10,2)",
"observed_types": ["string"],
"mismatch_count": 42,
"examples": ["\"99.99\""]
}
],
"events_analyzed": 1500,
"events_with_drift": 42
}
]
Get Table Drift
GET /pipelines/{name}/drift/{table}
Returns drift detection results for a specific table.
Response: 200 OK
{
"table": "orders",
"has_drift": false,
"columns": [],
"events_analyzed": 1000,
"events_with_drift": 0
}
Errors:
404 Not Found- Table not found or no drift data available
Dead Letter Queue
See the DLQ page for full documentation.
Peek DLQ Entries
GET /pipelines/{name}/journal/dlq?limit=50&sink_id=kafka-primary&error_kind=serialization
Returns DLQ entries (oldest first). All query params are optional.
DLQ Count
GET /pipelines/{name}/journal/dlq/count
Response: 200 OK
{"count": 42}
Acknowledge DLQ Entries
POST /pipelines/{name}/journal/dlq/ack
Content-Type: application/json
{"up_to_seq": 42}
Permanently removes entries from the head up to the given sequence number.
Response: 200 OK
{"acked": 12}
Purge DLQ
DELETE /pipelines/{name}/journal/dlq
Response: 200 OK
{"purged": 42}
Checkpoint Inspection
Get Checkpoints
GET /pipelines/{name}/checkpoints
Returns per-sink checkpoint positions and ages.
Response: 200 OK
[
{"sink_id": "kafka-primary", "position": {"file": "mysql-bin.000005", "pos": 12345}, "age_seconds": 0.3},
{"sink_id": "redis-cache", "position": {"file": "mysql-bin.000005", "pos": 11000}, "age_seconds": 2.1}
]
System Endpoints
Log Level
GET /log-level
Returns the current RUST_LOG value.
Response: 200 OK
{"level": "deltaforge=info,sources=info,sinks=info,warn"}
Validate Config
POST /validate
Content-Type: application/json
Dry-run validation of a pipeline config without creating it.
Response: 200 OK — config is valid
{"valid": true, "pipeline": "orders-cdc", "source_type": "mysql", "sink_count": 2}
Response: 400 Bad Request — config has errors
{"valid": false, "error": "spec: missing field `processors` at line 7 column 3"}
Error Responses
All error responses return structured JSON:
{
"code": "PIPELINE_NOT_FOUND",
"message": "pipeline orders-cdc not found"
}
| Status Code | Code | Meaning |
|---|---|---|
400 Bad Request | PIPELINE_NAME_MISMATCH | Invalid request body or name mismatch |
404 Not Found | PIPELINE_NOT_FOUND | Resource doesn’t exist |
409 Conflict | PIPELINE_ALREADY_EXISTS | Resource already exists |
500 Internal Server Error | INTERNAL_ERROR | Unexpected server error |
Pipelines
Each pipeline is created from a single PipelineSpec. The runtime spawns the source, processors, and sinks defined in the spec and coordinates them with batching and checkpointing.
- 🔄 Live control: pause, resume, or stop pipelines through the REST API without redeploying.
- 📦 Coordinated delivery: batching and commit policy keep sinks consistent even when multiple outputs are configured.
Lifecycle controls
The REST API addresses pipelines by metadata.name and returns PipeInfo records containing the live spec and status.
GET /health- liveness probe.GET /ready- readiness with pipeline states.GET /pipelines- list pipelines.POST /pipelines- create from a full spec.PATCH /pipelines/{name}- merge a partial spec (for example, adjust batch thresholds) and restart the pipeline.POST /pipelines/{name}/pause- pause ingestion and coordination.POST /pipelines/{name}/resume- resume a paused pipeline.POST /pipelines/{name}/stop- stop a running pipeline.
Pausing halts both source ingestion and the coordinator. Resuming re-enables both ends so buffered events can drain cleanly.
Processors
Processors run in the declared order for each batch. The built-in processor type is JavaScript, powered by deno_core.
type: javascriptid: processor label.inline: JS source. Export aprocessBatch(events)function that returns the transformed batch.limits(optional): resource guardrails (cpu_ms,mem_mb,timeout_ms).
Batching
The coordinator builds batches using soft thresholds:
max_events: flush after this many events.max_bytes: flush after the serialized size reaches this limit.max_ms: flush after this much time has elapsed since the batch started.respect_source_tx: when true, never split a single source transaction across batches.max_inflight: cap the number of batches being processed concurrently.
Commit policy
When multiple sinks are configured, checkpoints can wait for different acknowledgement rules:
all: every sink must acknowledge.required(default): only sinks markedrequired: truemust acknowledge; others are best-effort.quorum: checkpoint after at leastquorumsinks acknowledge.
Sources
DeltaForge captures database changes through pluggable source connectors. Each source is configured under spec.source with a type field and a config object. Environment variables are expanded before parsing using ${VAR} syntax.
Supported Sources
| Source | Status | Description |
|---|---|---|
mysql | ✅ Production | MySQL binlog CDC with GTID support |
postgres | ✅ Production | PostgreSQL logical replication via pgoutput |
turso | 🔧 Beta | Turso/libSQL CDC with multiple modes |
Common Behavior
All sources share these characteristics:
- Checkpointing: Progress is automatically saved and resumed on restart
- Schema tracking: Table schemas are loaded and fingerprinted for change detection
- At-least-once delivery: Events may be redelivered after failures; sinks should be idempotent
- Batching: Events are batched according to the pipeline’s
batchconfiguration - Transaction boundaries:
respect_source_tx: true(default) keeps source transactions intact
Source Interface
Sources implement a common trait that provides:
#![allow(unused)]
fn main() {
trait Source {
fn checkpoint_key(&self) -> &str;
async fn run(&self, tx: Sender<Event>, checkpoint_store: Arc<dyn CheckpointStore>) -> SourceHandle;
}
}
The returned SourceHandle supports pause/resume and graceful cancellation.
Adding Custom Sources
The source interface is pluggable. To add a new source:
- Implement the
Sourcetrait - Add configuration parsing in
deltaforge-config - Register the source type in the pipeline builder
See existing sources for implementation patterns.
MySQL source
DeltaForge tails the MySQL binlog to capture row-level changes with automatic checkpointing and schema tracking.
Prerequisites
MySQL Server Configuration
Ensure your MySQL server has binary logging enabled with row-based format:
-- Required server settings (my.cnf or SET GLOBAL)
log_bin = ON
binlog_format = ROW
binlog_row_image = FULL -- Recommended for complete before-images
If binlog_row_image is not FULL, DeltaForge will warn at startup and before-images on UPDATE/DELETE events may be incomplete.
User Privileges
Create a dedicated replication user with the required grants:
-- Create user with mysql_native_password (required by binlog connector)
CREATE USER 'deltaforge'@'%' IDENTIFIED WITH mysql_native_password BY 'your_password';
-- Replication privileges (required)
GRANT REPLICATION REPLICA, REPLICATION CLIENT ON *.* TO 'deltaforge'@'%';
-- Schema introspection (required for table discovery)
GRANT SELECT, SHOW VIEW ON your_database.* TO 'deltaforge'@'%';
FLUSH PRIVILEGES;
For capturing all databases, grant SELECT on *.* instead.
Configuration
Set spec.source.type to mysql and provide a config object:
| Field | Type | Required | Description |
|---|---|---|---|
id | string | Yes | Unique identifier used for checkpoints, server_id derivation, and metrics |
dsn | string | Yes | MySQL connection string with replication privileges |
tables | array | No | Table patterns to capture; omit or leave empty to capture all user tables |
Table Patterns
The tables field supports flexible pattern matching using SQL LIKE syntax:
tables:
- shop.orders # exact match: database "shop", table "orders"
- shop.order_% # LIKE pattern: tables starting with "order_" in "shop"
- analytics.* # wildcard: all tables in "analytics" database
- %.audit_log # cross-database: "audit_log" table in any database
# omit entirely to capture all user tables (excludes system schemas)
System schemas (mysql, information_schema, performance_schema, sys) are always excluded.
Example
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.order_items
Resume Behavior
DeltaForge automatically checkpoints progress and resumes from the last position on restart. The resume strategy follows this priority:
- GTID: Preferred if the MySQL server has GTID enabled. Provides the most reliable resume across binlog rotations and failovers.
- File:position: Used when GTID is not available. Resumes from the exact binlog file and byte offset.
- Binlog tail: On first run with no checkpoint, starts from the current end of the binlog (no historical replay).
Checkpoints are stored using the id field as the key.
Snapshot (Initial Load)
DeltaForge can perform a consistent initial snapshot of existing table data before starting binlog streaming. This is the recommended approach for migrating existing tables without manual backfills.
How it works
DeltaForge opens all worker connections simultaneously, each with
START TRANSACTION WITH CONSISTENT SNAPSHOT. The binlog position is captured
after all workers have started - InnoDB guarantees every visible row was committed
at or before that position, so CDC streaming from there has no gaps.
No FLUSH TABLES WITH READ LOCK or RELOAD privilege is required.
Additional privileges
-- Required for snapshot (SELECT is already needed for introspection)
GRANT SELECT ON your_database.* TO 'deltaforge'@'%';
Configuration
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
snapshot:
mode: initial # initial | always | never (default: never)
max_parallel_tables: 8 # tables snapshotted concurrently
chunk_size: 10000 # rows per chunk for integer-PK tables
| Field | Default | Description |
|---|---|---|
mode | never | initial: run once if no checkpoint exists; always: re-snapshot on every restart; never: skip |
max_parallel_tables | 8 | Tables snapshotted concurrently |
chunk_size | 10000 | Rows per range chunk (integer single-column PK tables only; others do a full scan) |
Snapshot events
Snapshot rows are emitted as Op::Read events (Debezium op: "r"), distinguishable
from live CDC Op::Create events. The binlog position captured at snapshot time becomes
the CDC resume point, so no rows are missed or duplicated.
Resume after interruption
If the snapshot is interrupted, DeltaForge resumes at table granularity on the next restart - already-completed tables are skipped.
Binlog retention safety
DeltaForge validates binlog retention before starting a snapshot and monitors it throughout. This prevents the silent failure mode where a long snapshot completes successfully but CDC startup then fails because the captured binlog position was purged.
Preflight checks (before any rows are read):
- Fails hard if
log_bin=0orbinlog_format != ROW - Estimates snapshot duration from table sizes and
max_parallel_tables - Warns at ≥50% of
binlog_expire_logs_secondsusage; HIGH RISK at ≥80%
During snapshot:
- Background task polls
SHOW BINARY LOGSevery 30s - Cancels the snapshot immediately if the captured file is purged
After all tables complete:
- Synchronous final check before writing
finished=true finished=truemeans the position is confirmed valid for CDC resume, not just that rows were emitted
If you see retention risk warnings, the recommended actions are:
- Increase
binlog_expire_logs_secondsto cover the estimated snapshot duration - Use a read replica as the snapshot source to avoid load on the primary
Server ID Handling
MySQL replication requires each replica to have a unique server_id. DeltaForge derives this automatically from the source id using a CRC32 hash:
server_id = 1 + (CRC32(id) % 4,000,000,000)
When running multiple DeltaForge instances against the same MySQL server, ensure each has a unique id to avoid server_id conflicts.
Schema Tracking
DeltaForge has a built-in schema registry to track table schemas, per source. For MySQL source:
- Schemas are preloaded at startup by querying
INFORMATION_SCHEMA - Each schema is fingerprinted using SHA-256 for change detection
- Events carry
schema_version(fingerprint) andschema_sequence(monotonic counter) - Schema-to-checkpoint correlation enables reliable replay
Schema changes (DDL) trigger automatic reload of affected table schemas.
Timeouts and Heartbeats
| Behavior | Value | Description |
|---|---|---|
| Heartbeat interval | 15s | Server sends heartbeat if no events |
| Read timeout | 90s | Maximum wait for next binlog event |
| Inactivity timeout | 60s | Triggers reconnect if no data received |
| Connect timeout | 30s | Maximum time to establish connection |
These values are currently fixed. Reconnection uses exponential backoff with jitter.
Event Format
Each captured row change produces an event with:
op:insert,update, ordeletebefore: Previous row state (updates and deletes only, requiresbinlog_row_image = FULL)after: New row state (inserts and updates only)table: Fully qualified table name (database.table)tx_id: GTID if availablecheckpoint: Binlog position for resumeschema_version: Schema fingerprintschema_sequence: Monotonic sequence for schema correlation
Troubleshooting
Connection Issues
If you see authentication errors mentioning mysql_native_password:
ALTER USER 'deltaforge'@'%' IDENTIFIED WITH mysql_native_password BY 'password';
Missing Before-Images
If UPDATE/DELETE events have incomplete before data:
SET GLOBAL binlog_row_image = 'FULL';
Binlog Not Enabled
Check binary logging status:
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
SHOW BINARY LOG STATUS; -- or SHOW MASTER STATUS on older versions
Privilege Issues
Verify grants for your user:
SHOW GRANTS FOR 'deltaforge'@'%';
Required grants include REPLICATION REPLICA and REPLICATION CLIENT.
PostgreSQL source
DeltaForge captures row-level changes from PostgreSQL using logical replication with the pgoutput plugin.
Prerequisites
PostgreSQL Server Configuration
Enable logical replication in postgresql.conf:
# Required settings
wal_level = logical
max_replication_slots = 10 # At least 1 per DeltaForge pipeline
max_wal_senders = 10 # At least 1 per DeltaForge pipeline
Restart PostgreSQL after changing these settings.
User Privileges
Create a replication user with the required privileges:
-- Create user with replication capability
CREATE ROLE deltaforge WITH LOGIN REPLICATION PASSWORD 'your_password';
-- Grant connect access
GRANT CONNECT ON DATABASE your_database TO deltaforge;
-- Grant schema usage and table access for schema introspection
GRANT USAGE ON SCHEMA public TO deltaforge;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO deltaforge;
-- For automatic publication/slot creation (optional)
-- If you prefer manual setup, skip this and create them yourself
ALTER ROLE deltaforge SUPERUSER; -- Or use manual setup below
pg_hba.conf
Ensure your pg_hba.conf allows replication connections:
# TYPE DATABASE USER ADDRESS METHOD
host replication deltaforge 0.0.0.0/0 scram-sha-256
host your_database deltaforge 0.0.0.0/0 scram-sha-256
Replication Slot and Publication
DeltaForge can automatically create the replication slot and publication on first run. Alternatively, create them manually:
-- Create publication for specific tables
CREATE PUBLICATION my_pub FOR TABLE public.orders, public.order_items;
-- Or for all tables
CREATE PUBLICATION my_pub FOR ALL TABLES;
-- Create replication slot
SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput');
Replica Identity
For complete before-images on UPDATE and DELETE operations, set tables to REPLICA IDENTITY FULL:
ALTER TABLE public.orders REPLICA IDENTITY FULL;
ALTER TABLE public.order_items REPLICA IDENTITY FULL;
Without this setting:
- FULL: Complete row data in before-images
- DEFAULT (primary key): Only primary key columns in before-images
- NOTHING: No before-images at all
DeltaForge warns at startup if tables don’t have REPLICA IDENTITY FULL.
Configuration
Set spec.source.type to postgres and provide a config object:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
id | string | Yes | — | Unique identifier for checkpoints and metrics |
dsn | string | Yes | — | PostgreSQL connection string |
slot | string | Yes | — | Replication slot name |
publication | string | Yes | — | Publication name |
tables | array | Yes | — | Table patterns to capture |
start_position | string/object | No | earliest | Where to start when no checkpoint exists |
DSN Formats
DeltaForge accepts both URL-style and key=value DSN formats:
# URL style
dsn: "postgres://user:pass@localhost:5432/mydb"
# Key=value style
dsn: "host=localhost port=5432 user=deltaforge password=pass dbname=mydb"
Table Patterns
The tables field supports flexible pattern matching:
tables:
- public.orders # exact match: schema "public", table "orders"
- public.order_% # LIKE pattern: tables starting with "order_"
- myschema.* # wildcard: all tables in "myschema"
- %.audit_log # cross-schema: "audit_log" table in any schema
- orders # defaults to public schema: "public.orders"
System schemas (pg_catalog, information_schema, pg_toast) are always excluded.
Start Position
Controls where replication begins when no checkpoint exists:
# Start from the earliest available position (slot's restart_lsn)
start_position: earliest
# Start from current WAL position (skip existing data)
start_position: latest
# Start from a specific LSN
start_position:
lsn: "0/16B6C50"
Example
source:
type: postgres
config:
id: orders-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_orders
publication: orders_pub
tables:
- public.orders
- public.order_items
start_position: earliest
Resume Behavior
DeltaForge checkpoints progress using PostgreSQL’s LSN (Log Sequence Number):
- With checkpoint: Resumes from the stored LSN
- Without checkpoint: Uses the slot’s
confirmed_flush_lsnorrestart_lsn - New slot: Starts from
pg_current_wal_lsn()or the configuredstart_position
Checkpoints are stored using the id field as the key.
Snapshot (Initial Load)
DeltaForge performs a consistent initial snapshot using PostgreSQL’s exported snapshot mechanism before starting logical replication.
How it works
A coordinator connection exports a snapshot and captures the current WAL LSN
in a single round trip. Worker connections each import the shared snapshot into
their own REPEATABLE READ transaction - all workers see the same consistent
DB state with no locks held on the source.
Tables with a single integer primary key use PK-range chunking. All others fall back to ctid page-range chunking.
Configuration
source:
type: postgres
config:
id: orders-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_orders
publication: orders_pub
tables:
- public.orders
snapshot:
mode: initial # initial | always | never (default: never)
max_parallel_tables: 8 # tables snapshotted concurrently
chunk_size: 10000 # rows per chunk for integer-PK tables
| Field | Default | Description |
|---|---|---|
mode | never | initial: run once if no checkpoint exists; always: re-snapshot on every restart; never: skip |
max_parallel_tables | 8 | Tables snapshotted concurrently |
chunk_size | 10000 | Rows per range chunk (integer PK tables only; others use ctid chunking) |
Snapshot events
Snapshot rows are emitted as Op::Read events (Debezium op: "r"),
distinguishable from live CDC Op::Create events. The WAL LSN captured at
snapshot time becomes the CDC resume point - no rows are missed or duplicated.
Resume after interruption
If the snapshot is interrupted, DeltaForge resumes at table granularity on the next restart - already-completed tables are skipped.
WAL slot retention safety
DeltaForge validates replication slot health before starting a snapshot and monitors it throughout. This prevents the slot from being invalidated during a long snapshot, which would make the captured LSN unreachable for CDC resume.
Preflight checks (before any rows are read):
- Fails hard if the slot does not exist or is already invalidated
- Fails hard if
wal_status=lost - Warns if
wal_status=unreserved(WAL retention no longer guaranteed) - Estimates WAL generated during snapshot (~2× data size) against
max_slot_wal_keep_size; warns at ≥50%, HIGH RISK at ≥80%
During snapshot:
- Background task polls
pg_replication_slotsevery 30s - Cancels immediately on slot invalidation or disappearance
- Warns but continues on
wal_status=unreserved
After all tables complete:
- Synchronous final check before writing
finished=true finished=truemeans the position is confirmed valid for CDC resume, not just that rows were emitted
If you see WAL retention risk warnings:
ALTER SYSTEM SET max_slot_wal_keep_size = '10GB';
SELECT pg_reload_conf();
Type Handling
DeltaForge preserves PostgreSQL’s native type semantics:
| PostgreSQL Type | JSON Representation |
|---|---|
boolean | true / false |
integer, bigint | JSON number |
real, double precision | JSON number |
numeric | JSON string (preserves precision) |
text, varchar | JSON string |
json, jsonb | Parsed JSON object/array |
bytea | {"_base64": "..."} |
uuid | JSON string |
timestamp, date, time | ISO 8601 string |
Arrays (int[], text[], etc.) | JSON array |
| TOAST unchanged | {"_unchanged": true} |
Event Format
Each captured row change produces an event with:
op:insert,update,delete, ortruncatebefore: Previous row state (updates and deletes, requires appropriate replica identity)after: New row state (inserts and updates)table: Fully qualified table name (schema.table)tx_id: PostgreSQL transaction ID (xid)checkpoint: LSN position for resumeschema_version: Schema fingerprintschema_sequence: Monotonic sequence for schema correlation
WAL Management
Logical replication slots prevent WAL segments from being recycled until the consumer confirms receipt. To avoid disk space issues:
- Monitor slot lag: Check
pg_replication_slots.restart_lsnvspg_current_wal_lsn() - Set retention limits: Configure
max_slot_wal_keep_size(PostgreSQL 13+) - Handle stale slots: Drop unused slots with
pg_drop_replication_slot('slot_name')
-- Check slot status and lag
SELECT slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;
Troubleshooting
Connection Issues
If you see authentication errors:
-- Verify user has replication privilege
SELECT rolname, rolreplication FROM pg_roles WHERE rolname = 'deltaforge';
-- Check pg_hba.conf allows replication connections
-- Ensure the line type includes "replication" database
Missing Before-Images
If UPDATE/DELETE events have incomplete before data:
-- Check current replica identity
SELECT relname, relreplident
FROM pg_class
WHERE relname = 'your_table';
-- d = default, n = nothing, f = full, i = index
-- Set to FULL for complete before-images
ALTER TABLE your_table REPLICA IDENTITY FULL;
Slot/Publication Not Found
-- List existing publications
SELECT * FROM pg_publication;
-- List existing slots
SELECT * FROM pg_replication_slots;
-- Create if missing
CREATE PUBLICATION my_pub FOR TABLE public.orders;
SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput');
WAL Disk Usage Growing
-- Check slot lag
SELECT slot_name,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;
-- If slot is inactive and not needed, drop it
SELECT pg_drop_replication_slot('unused_slot');
Logical Replication Not Enabled
-- Check wal_level
SHOW wal_level; -- Should be 'logical'
-- If not, update postgresql.conf and restart PostgreSQL
-- wal_level = logical
Turso Source
⚠️ STATUS: EXPERIMENTAL / PAUSED
The Turso source is not yet ready for production use. Native CDC in Turso/libSQL is still evolving and has limitations:
- CDC is per-connection (only changes from the enabling connection are captured)
- File locking prevents concurrent access
- sqld Docker image doesn’t have CDC support yet
This documentation is retained for reference. The code exists but is not officially supported.
The Turso source captures changes from Turso and libSQL databases. It supports multiple CDC modes to work with different database configurations and Turso versions.
CDC Modes
DeltaForge supports four CDC modes for Turso/libSQL:
| Mode | Description | Requirements |
|---|---|---|
native | Uses Turso’s built-in CDC via turso_cdc table | Turso v0.1.2+ with CDC enabled |
triggers | Shadow tables populated by database triggers | Standard SQLite/libSQL |
polling | Tracks changes via rowid comparison | Any SQLite/libSQL (inserts only) |
auto | Automatic fallback: native → triggers → polling | Any |
Native Mode
Native mode uses Turso’s built-in CDC capabilities. This is the most efficient mode when available.
Requirements:
- Turso database with CDC enabled
- Turso server v0.1.2 or later
How it works:
- Queries the
turso_cdcsystem table for changes - Uses
bin_record_json_object()to extract row data as JSON - Tracks position via change ID in checkpoints
Triggers Mode
Triggers mode uses shadow tables and database triggers to capture changes. This works with standard SQLite and libSQL without requiring native CDC support.
How it works:
- Creates shadow tables (
_df_cdc_{table}) for each tracked table - Installs INSERT/UPDATE/DELETE triggers that write to shadow tables
- Polls shadow tables for new change records
- Cleans up processed records periodically
Polling Mode
Polling mode uses rowid tracking to detect new rows. This is the simplest mode but only captures inserts (not updates or deletes).
How it works:
- Tracks the maximum rowid seen per table
- Queries for rows with rowid greater than last seen
- Emits insert events for new rows
Limitations:
- Only captures INSERT operations
- Cannot detect UPDATE or DELETE
- Requires tables to have accessible rowid (not WITHOUT ROWID tables)
Auto Mode
Auto mode tries each CDC mode in order and uses the first one that works:
- Try native mode (check for
turso_cdctable) - Try triggers mode (check for existing CDC triggers)
- Fall back to polling mode
This is useful for deployments where the database capabilities may vary.
Configuration
source:
type: turso
config:
id: turso-main
url: "libsql://your-db.turso.io"
auth_token: "${TURSO_AUTH_TOKEN}"
tables: ["users", "orders", "order_items"]
cdc_mode: auto
poll_interval_ms: 1000
native_cdc:
level: data
Configuration Options
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
id | string | Yes | — | Logical identifier for metrics and logging |
url | string | Yes | — | Database URL (libsql://, http://, or file path) |
auth_token | string | No | — | Authentication token for Turso cloud |
tables | array | Yes | — | Tables to track (supports wildcards) |
cdc_mode | string | No | auto | CDC mode: native, triggers, polling, auto |
poll_interval_ms | integer | No | 1000 | Polling interval in milliseconds |
native_cdc.level | string | No | data | Native CDC level: binlog or data |
Table Patterns
The tables field supports patterns:
tables:
- users # Exact match
- order% # Prefix match (order, orders, order_items)
- "*" # All tables (excluding system tables)
System tables and DeltaForge infrastructure tables are automatically excluded:
sqlite_*— SQLite system tables_df_*— DeltaForge CDC shadow tables_litestream*— Litestream replication tables_turso*— Turso internal tablesturso_cdc— Turso CDC system table
Native CDC Levels
When using native mode, you can choose the CDC level:
| Level | Description |
|---|---|
data | Only row data changes (default, more efficient) |
binlog | Full binlog-style events with additional metadata |
Examples
Local Development
source:
type: turso
config:
id: local-dev
url: "http://127.0.0.1:8080"
tables: ["users", "orders"]
cdc_mode: auto
poll_interval_ms: 500
Turso Cloud
source:
type: turso
config:
id: turso-prod
url: "libsql://mydb-myorg.turso.io"
auth_token: "${TURSO_AUTH_TOKEN}"
tables: ["*"]
cdc_mode: native
poll_interval_ms: 1000
SQLite File (Polling Only)
source:
type: turso
config:
id: sqlite-file
url: "file:./data/myapp.db"
tables: ["events", "audit_log"]
cdc_mode: polling
poll_interval_ms: 2000
Checkpoints
Turso checkpoints track position differently depending on the CDC mode:
{
"mode": "native",
"last_change_id": 12345,
"table_positions": {}
}
For polling mode, positions are tracked per-table:
{
"mode": "polling",
"last_change_id": null,
"table_positions": {
"users": 1000,
"orders": 2500
}
}
Schema Loading
The Turso source includes a schema loader that:
- Queries
PRAGMA table_info()for column metadata - Detects SQLite type affinities (INTEGER, TEXT, REAL, BLOB, NUMERIC)
- Identifies primary keys and autoincrement columns
- Handles WITHOUT ROWID tables
- Checks for existing CDC triggers
Schema information is available via the REST API at /pipelines/{name}/schemas.
Notes
- WITHOUT ROWID tables: Polling mode cannot track WITHOUT ROWID tables. Use triggers or native mode instead.
- Type affinity: SQLite uses type affinity rather than strict types. The schema loader maps declared types to SQLite affinities.
- Trigger cleanup: In triggers mode, processed change records are cleaned up automatically based on checkpoint position.
- Connection handling: The source maintains a single connection and reconnects automatically on failure.
Processors
Processors can transform, modify or take extra action per event batch between the source and sinks. They run in the order listed in spec.processors, and each receives the output of the previous one. A processor can modify events, filter them out, or add routing metadata.
Available Processors
| Processor | Description |
|---|---|
javascript | Custom transformations using V8-powered JavaScript |
outbox | Transactional outbox pattern - extracts payload, resolves topic, sets routing headers |
flatten | Flatten nested JSON objects into conmbined keys |
filter | Drop events by op type, table pattern, or field value |
JavaScript
Run arbitrary JavaScript against each event batch. Uses the V8 engine via deno_core for near-native speed with configurable resource limits.
processors:
- type: javascript
id: enrich
inline: |
function processBatch(events) {
return events.map(e => {
e.tags = ["processed"];
return e;
});
}
limits:
cpu_ms: 50
mem_mb: 128
timeout_ms: 500
| Field | Type | Default | Description |
|---|---|---|---|
id | string | (required) | Processor identifier |
inline | string | (required) | JavaScript source code |
limits.cpu_ms | int | 50 | CPU time limit per batch |
limits.mem_mb | int | 128 | V8 heap memory limit |
limits.timeout_ms | int | 500 | Wall-clock timeout per batch |
Performance
JavaScript processing adds a fixed overhead per batch plus linear scaling per event. Benchmarks show ~70K+ events/sec throughput with typical transforms — roughly 61× slower than native Rust processors, but sufficient for most workloads. If you need higher throughput, keep transform logic simple or move heavy processing downstream.
Tips
- Return an empty array to drop all events in a batch.
- Return multiple copies of an event to fan out.
- JavaScript numbers are
f64— integer columns wider than 53 bits may lose precision. Use string representation for such values.
Outbox
The outbox processor transforms events captured by the outbox pattern into routed, sink-ready events. It extracts business fields from the raw outbox payload, resolves the destination topic, and passes through all non-outbox events unchanged.
processors:
- type: outbox
topic: "${aggregate_type}.${event_type}"
default_topic: "events.unrouted"
| Field | Type | Default | Description |
|---|---|---|---|
id | string | "outbox" | Processor identifier |
tables | array | [] | Only process outbox events matching these patterns. Empty = all. |
topic | string | — | Topic template with ${field} placeholders (resolved against raw payload columns) |
default_topic | string | — | Fallback when template resolution fails |
columns | object | (defaults below) | Field name mappings |
additional_headers | map | {} | Forward extra payload fields as routing headers. Key = header name, value = column name. |
raw_payload | bool | false | Deliver payload as-is, bypassing envelope wrapping |
Column Defaults
| Key | Default | Description |
|---|---|---|
payload | "payload" | Field containing the event body |
aggregate_type | "aggregate_type" | Domain aggregate type |
aggregate_id | "aggregate_id" | Aggregate identifier |
event_type | "event_type" | Domain event type |
topic | "topic" | Optional explicit topic override in payload |
See the Outbox Pattern guide for source configuration, complete examples, and multi-outbox routing.
Flatten
Flattens nested JSON objects in event payloads into top-level keys joined by a configurable separator. Works on every object-valued field present on the event without assuming any particular envelope structure - CDC before/after, outbox business payloads, or any custom fields introduced by upstream processors are all handled uniformly.
processors:
- type: flatten
id: flat
separator: "__"
max_depth: 3
on_collision: last
empty_object: preserve
lists: preserve
empty_list: drop
Input:
{
"after": {
"order_id": "abc",
"customer": {
"id": 1,
"address": { "city": "Berlin", "zip": "10115" }
},
"tags": ["vip"],
"meta": {}
}
}
Output (with defaults — separator: "__", empty_object: preserve, lists: preserve):
{
"after": {
"order_id": "abc",
"customer__id": 1,
"customer__address__city": "Berlin",
"customer__address__zip": "10115",
"tags": ["vip"],
"meta": {}
}
}
Configuration
| Field | Type | Default | Description |
|---|---|---|---|
id | string | "flatten" | Processor identifier |
separator | string | "__" | Separator inserted between path segments |
max_depth | int | unlimited | Stop recursing at this depth; objects at the boundary are kept as opaque leaves |
on_collision | string | last | What to do when two paths produce the same key. last, first, or error |
empty_object | string | preserve | How to handle {} values. preserve, drop, or null |
lists | string | preserve | How to handle array values. preserve (keep as-is) or index (expand to field__0, field__1, …) |
empty_list | string | preserve | How to handle [] values. preserve, drop, or null |
max_depth in practice
max_depth counts nesting levels from the top of the payload object. Objects at the boundary are treated as opaque leaves rather than expanded further, still subject to empty_object policy.
# max_depth: 2
depth 0: customer -> object, recurse
depth 1: customer__address -> object, recurse
depth 2: customer__address__geo -> STOP, kept as leaf {"lat": 52.5, "lng": 13.4}
Without max_depth, a deeply nested or recursive payload could produce a large number of keys. Setting a limit is recommended for payloads with variable or unknown nesting depth.
Collision policy
A collision occurs when two input paths produce the same flattened key - typically when a payload already contains a pre-flattened key (e.g. "a__b": 1) alongside a nested object ("a": {"b": 2}).
last- the last path to write a key wins (default, never fails)first- the first path to write a key wins, subsequent writes are ignorederror- the batch fails immediately, useful in strict pipelines where collisions indicate a schema problem
Working with outbox payloads
After the outbox processor runs, event.after holds the extracted business payload - there is no before. The flatten processor handles this naturally since it operates on whatever fields are present:
processors:
- type: outbox
topic: "${aggregate_type}.${event_type}"
- type: flatten
id: flat
separator: "."
empty_list: drop
Envelope interaction
The flatten processor runs on the raw Event struct before sink delivery. Envelope wrapping happens inside the sink, after all processors have run. This means the envelope always wraps already-flattened data, no special configuration needed.
Source → [flatten processor] → Sink (envelope → bytes)
All envelope formats work as expected:
// Native
{ "before": null, "after": { "customer__id": 1, "customer__address__city": "Berlin" }, "op": "c" }
// Debezium
{ "payload": { "before": null, "after": { "customer__id": 1, "customer__address__city": "Berlin" }, "op": "c" } }
// CloudEvents
{ "specversion": "1.0", ..., "data": { "before": null, "after": { "customer__id": 1, "customer__address__city": "Berlin" } } }
Outbox + raw_payload: true
When the outbox processor is configured with raw_payload: true, the sink delivers event.after directly, bypassing the envelope entirely. If the flatten processor runs after outbox, the raw payload delivered to the sink is the flattened object — which is the intended behavior for analytics sinks that can’t handle nested JSON.
processors:
- type: outbox
topic: "${aggregate_type}.${event_type}"
raw_payload: true # sink delivers event.after directly, no envelope
- type: flatten
id: flat
separator: "__"
empty_list: drop
The flatten processor runs second, so by the time the sink delivers the raw payload it is already flat.
Analytics sink example
When sending to column-oriented sinks (ClickHouse, BigQuery, S3 Parquet) that don’t handle nested JSON:
processors:
- type: flatten
id: flat
separator: "__"
lists: index # expand arrays to indexed columns
empty_object: drop # remove sparse marker objects
empty_list: drop # remove empty arrays
Filter
Drops events that do not pass configured criteria. Each criterion is independent — omit any of them to skip that check entirely. An event must pass all configured checks to be forwarded.
processors:
- type: filter
id: only-active-orders
ops: [create, update]
tables:
include: ["shop.orders"]
exclude: ["*.tmp"]
fields:
- path: status
op: eq
value: "active"
- path: total
op: gte
value: 100
match: all
| Field | Type | Default | Description |
|---|---|---|---|
id | string | "filter" | Processor identifier |
ops | list | [] (all) | Operation types to keep: create, update, delete, read, truncate |
tables.include | list | [] (all) | Table patterns to include. Uses AllowList glob syntax: db.table, shop.*, *.orders |
tables.exclude | list | [] (none) | Table patterns to exclude. Applied after include; takes priority |
fields | list | [] (skip) | Predicates evaluated against event.after |
match | string | all | How to combine multiple field predicates: all (every predicate must pass) or any (at least one must pass) |
Field operators
The path field is a dot-separated path into event.after, e.g. "status" or "order.total".
| Op | value | Description |
|---|---|---|
eq | scalar | Field equals value |
ne | scalar | Field does not equal value |
exists | — | Field is present and non-null |
not_exists | — | Field is absent or null |
gt / gte | number or string | Greater than / greater than or equal |
lt / lte | number or string | Less than / less than or equal |
in | array | Field value is one of the items in the array |
not_in | array | Field value is not in the array |
contains | scalar | String field contains the substring, or array field contains the element |
changed | — | Field value differs between event.before and event.after. Creates and Deletes always pass (no pair to compare) |
regex | string | String field matches the regex pattern. Compiled once at startup; invalid patterns fail pipeline initialization |
Notes
Numeric equality — eq and in compare integers and floats by value, so 42 and 42.0 are considered equal. This matters when events have passed through the JavaScript processor, which converts all numbers to f64.
not_in with a missing field — if the field is absent from the event, the event passes. Absence is not membership in any set.
regex on non-string fields — silently does not match. Use exists first if the field may be absent or non-string.
changed and the before image — the predicate reads event.before, which is only populated for update operations from sources configured with full row images. Verify your source has REPLICA IDENTITY FULL (PostgreSQL) or binlog_row_image = FULL (MySQL) before using changed in production.
Performance
The filter processor is pure Rust with no serialization overhead. Op and table checks are O(1) to O(patterns). Field predicate evaluation reads event.after directly. Regex patterns are compiled once at construction time. Put a filter early in the processor chain to reduce the batch size before heavier processors (JavaScript, flatten) run.
Examples
Drop deletes and snapshot reads:
- type: filter
id: no-deletes
ops: [create, update]
Alert stream - pass if status is failed OR retry count exhausted:
- type: filter
id: alert-worthy
match: any
fields:
- path: status
op: eq
value: "failed"
- path: retry_count
op: gte
value: 3
Filter before enrichment to avoid paying JavaScript overhead on the full stream:
processors:
- type: filter
id: low-stock
ops: [create, update]
tables:
include: ["inventory.products"]
fields:
- path: stock_qty
op: lt
value: 10
- type: javascript
id: enrich
inline: |
function processBatch(events) {
return events.map(e => {
e.after.alert_level = e.after.stock_qty === 0 ? "critical" : "warning";
return e;
});
}
Only forward rows where a status column actually changed value (suppresses no-op updates):
- type: filter
id: real-status-changes
ops: [update]
fields:
- path: status
op: changed
Processor Chain
Processors execute in order. Events flow through each processor sequentially:
Source → [Processor 1] → [Processor 2] → ... → Sinks
Each processor receives a Vec<Event> and returns a Vec<Event>. This means processors can:
- Transform: Modify event fields in place
- Filter: Return a subset of events (drop unwanted ones)
- Fan-out: Return more events than received
- Route: Set
event.routing.topicto override sink defaults - Enrich: Add headers via
event.routing.headers
Non-outbox events always pass through the outbox processor untouched, so it is safe to combine it with JavaScript processors in any order.
Adding Custom Processors
The processor interface is a simple trait:
#![allow(unused)]
fn main() {
#[async_trait]
pub trait Processor: Send + Sync {
fn id(&self) -> &str;
async fn process(&self, events: Vec<Event>) -> Result<Vec<Event>>;
}
}
To add a new processor:
- Implement the
Processortrait incrates/processors - Add a config variant to
ProcessorCfgindeltaforge-config - Register the build step in
build_processors()
Sinks
Sinks receive batches from the coordinator after processors run. Each sink lives under spec.sinks and can be marked as required or best-effort via the required flag. Checkpoint behavior is governed by the pipeline’s commit policy.
Envelope and Encoding
All sinks support configurable envelope formats and wire encodings. See the Envelopes and Encodings page for detailed documentation.
| Option | Values | Default | Description |
|---|---|---|---|
envelope | native, debezium, cloudevents | native | Output JSON structure |
encoding | json, avro | json | Wire format (avro requires Schema Registry) |
Quick example:
sinks:
- type: kafka
config:
id: events-kafka
brokers: localhost:9092
topic: events
envelope:
type: cloudevents
type_prefix: "com.example.cdc"
encoding: json
Available Sinks
| Sink | Description | |
|---|---|---|
kafka | Kafka producer sink | |
nats | NATS JetStream sink | |
redis | Redis stream sink | |
http | HTTP/Webhook sink |
Multiple sinks in one pipeline
You can combine multiple sinks in one pipeline to fan out events to different destinations. However, multi-sink pipelines introduce complexity that requires careful consideration.
Why multiple sinks are challenging
Different performance characteristics: Kafka might handle 100K events/sec while a downstream HTTP webhook processes 100/sec. The slowest sink becomes the bottleneck for the entire pipeline.
Independent failure modes: Each sink can fail independently. Redis might be healthy while Kafka experiences broker failures. Without proper handling, a single sink failure could block the entire pipeline or cause data loss.
No distributed transactions: DeltaForge cannot atomically commit across heterogeneous systems. If Kafka succeeds but Redis fails mid-batch, you face a choice: retry Redis (risking duplicates in Kafka) or skip Redis (losing data there).
Checkpoint semantics: The checkpoint represents “how far we’ve processed from the source.” With multiple sinks, when is it safe to advance? After one sink succeeds? All of them? A majority?
Read the required and commit_policy sections below for options to manage these challenges.
The required flag
The required flag on each sink determines whether that sink must acknowledge successful delivery before the checkpoint advances:
sinks:
- type: kafka
config:
id: primary-kafka
required: true # Must succeed for checkpoint to advance
- type: redis
config:
id: cache-redis
required: false # Best-effort; failures don't block checkpoint
When required: true (default): The sink must acknowledge the batch before the checkpoint can advance. If this sink fails, the pipeline blocks and retries until it succeeds or the operator intervenes.
When required: false: The sink is best-effort. Failures are logged but don’t prevent the checkpoint from advancing. Use this for non-critical destinations where some data loss is acceptable.
Commit policy
The commit_policy works with the required flag to determine checkpoint behavior:
| Policy | Behavior |
|---|---|
all | Every sink (regardless of required flag) must acknowledge |
required | Only sinks with required: true must acknowledge (default) |
quorum | At least N sinks must acknowledge |
commit_policy:
mode: required # Only wait for required sinks
sinks:
- type: kafka
config:
required: true # Checkpoint waits for this
- type: redis
config:
required: false # Checkpoint doesn't wait for this
- type: nats
config:
required: true # Checkpoint waits for this
Per-sink independent checkpoints
Each sink maintains its own checkpoint, committed independently after successful delivery. This means:
- Faster sinks are not held back by slower ones — each sink advances its own checkpoint
- The source replays from the minimum checkpoint across all sinks, so a slow sink only causes replay for itself, not re-delivery to sinks that are already ahead
- Adding a new sink to an existing pipeline triggers replay from the source’s earliest position for that sink only; existing sinks are unaffected
- Removing a sink cleans up its checkpoint automatically on the next pipeline patch
This architecture avoids the common CDC pitfall where the slowest sink becomes a bottleneck for all other sinks.
Delivery guarantee tiers
| Sink | Guarantee | Mechanism | Consumer action |
|---|---|---|---|
Kafka (exactly_once: true) | End-to-end exactly-once | Kafka transactions (two-phase commit) | Set isolation.level=read_committed |
Kafka (exactly_once: false) | At-least-once (idempotent) | Retries deduped; crash-replay produces duplicates | Dedup by event ID |
| NATS JetStream | At-least-once + server dedup | Nats-Msg-Id header within duplicate_window | Configure duplicate_window |
| Redis Streams | At-least-once + consumer dedup | idempotency_key field in XADD payload | Check key before processing |
| HTTP/Webhook | At-least-once | Retry on 5xx/timeout; no server-side dedup | Consumer must be idempotent (use event id) |
“Exactly-once” means DeltaForge guarantees no duplicates without consumer cooperation. All other sinks are “at-least-once” with a stated dedup mechanism.
Practical patterns
Primary + secondary: One critical sink (Kafka for durability) marked required: true, with secondary sinks (Redis for caching, testing or experimentation) marked required: false.
Quorum for redundancy: Three sinks with commit_policy.mode: quorum and quorum: 2. Checkpoint advances when any two succeed, providing fault tolerance.
All-or-nothing: Use commit_policy.mode: all when every destination is critical and you need the strongest consistency guarantee (but affecting rate of delivery).
Multi-format fan-out
For sending the same events to different consumers that expect different formats:
sinks:
# Kafka Connect expects Debezium format
- type: kafka
config:
id: connect-sink
brokers: ${KAFKA_BROKERS}
topic: connect-events
envelope:
type: debezium
required: true
# Lambda expects CloudEvents
- type: kafka
config:
id: lambda-sink
brokers: ${KAFKA_BROKERS}
topic: lambda-events
envelope:
type: cloudevents
type_prefix: "com.acme.cdc"
required: false
# Analytics wants raw events
- type: redis
config:
id: analytics-redis
uri: ${REDIS_URI}
stream: analytics
envelope:
type: native
required: false
This allows each consumer to receive events in their preferred format without post-processing.
Redis sink
The Redis sink publishes events to a Redis Stream for real-time consumption with consumer groups.
When to use Redis
Redis Streams shine when you need low-latency event delivery with simple operational requirements and built-in consumer group support.
Real-world applications
| Use Case | Description |
|---|---|
| Real-time notifications | Push database changes instantly to WebSocket servers for live UI updates |
| Cache invalidation | Trigger cache eviction when source records change; keep Redis cache consistent |
| Session synchronization | Replicate user session changes across application instances in real-time |
| Rate limiting state | Stream counter updates for distributed rate limiting decisions |
| Live dashboards | Feed real-time metrics and KPIs to dashboard backends |
| Job queuing | Use CDC events to trigger background job processing with consumer groups |
| Feature flags | Propagate feature flag changes instantly across all application instances |
Pros and cons
| Pros | Cons |
|---|---|
| ✅ Ultra-low latency - Sub-millisecond publish; ideal for real-time apps | ❌ Memory-bound - All data in RAM; expensive for high-volume retention |
| ✅ Simple operations - Single binary, minimal configuration | ❌ Limited retention - Not designed for long-term event storage |
| ✅ Consumer groups - Built-in competing consumers with acknowledgements | ❌ Durability trade-offs - AOF/RDB persistence has limitations |
| ✅ Familiar tooling - redis-cli, widespread client library support | ❌ Single-threaded - CPU-bound for very high throughput |
| ✅ Versatile - Combine with caching, pub/sub, and data structures | ❌ No native replay - XRANGE exists but no offset management |
| ✅ Atomic operations - MULTI/EXEC for transactional guarantees | ❌ Cluster complexity - Sharding requires careful key design |
Configuration
|
|
Idempotency
Every XADD command includes an idempotency_key field in the stream entry payload. This key is deterministic — the same source event always produces the same key, even across replays.
Redis Streams does not deduplicate server-side (entry IDs are auto-generated), so consumers are responsible for dedup using this field. A simple approach:
# Check idempotency_key before processing
seen = redis.smembers("processed_keys")
if data[b"idempotency_key"] in seen:
r.xack(stream, group, msg_id) # Skip duplicate
continue
process(event)
redis.sadd("processed_keys", data[b"idempotency_key"])
This makes Redis a Tier 2 delivery guarantee: at-least-once with idempotency keys for consumer-side dedup.
Consuming events
Consumer groups (recommended)
# Create consumer group
redis-cli XGROUP CREATE orders.events mygroup $ MKSTREAM
# Read as consumer (blocking)
redis-cli XREADGROUP GROUP mygroup consumer1 BLOCK 5000 COUNT 10 STREAMS orders.events >
# Acknowledge processing
redis-cli XACK orders.events mygroup 1234567890123-0
# Check pending (unacknowledged) messages
redis-cli XPENDING orders.events mygroup
Simple subscription
# Read latest entries
redis-cli XREAD COUNT 10 STREAMS orders.events 0-0
# Block for new entries
redis-cli XREAD BLOCK 5000 STREAMS orders.events $
Go consumer example
import "github.com/redis/go-redis/v9"
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
// Create consumer group (once)
rdb.XGroupCreateMkStream(ctx, "orders.events", "mygroup", "0")
for {
streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "mygroup",
Consumer: "worker1",
Streams: []string{"orders.events", ">"},
Count: 10,
Block: 5 * time.Second,
}).Result()
if err != nil {
continue
}
for _, stream := range streams {
for _, msg := range stream.Messages {
var event Event
json.Unmarshal([]byte(msg.Values["event"].(string)), &event)
process(event)
rdb.XAck(ctx, "orders.events", "mygroup", msg.ID)
}
}
}
Rust consumer example
#![allow(unused)]
fn main() {
use redis::AsyncCommands;
let client = redis::Client::open("redis://localhost:6379")?;
let mut con = client.get_async_connection().await?;
// Create consumer group
let _: () = redis::cmd("XGROUP")
.arg("CREATE").arg("orders.events").arg("mygroup").arg("0").arg("MKSTREAM")
.query_async(&mut con).await.unwrap_or(());
loop {
let results: Vec<StreamReadReply> = con.xread_options(
&["orders.events"],
&[">"],
&StreamReadOptions::default()
.group("mygroup", "worker1")
.count(10)
.block(5000)
).await?;
for stream in results {
for msg in stream.ids {
let event: Event = serde_json::from_str(&msg.map["event"])?;
process(event);
con.xack("orders.events", "mygroup", &[&msg.id]).await?;
}
}
}
}
Python consumer example
import redis
import json
r = redis.Redis.from_url("redis://localhost:6379")
# Create consumer group (once)
try:
r.xgroup_create("orders.events", "mygroup", id="0", mkstream=True)
except redis.ResponseError:
pass # Group already exists
# Consume events
while True:
events = r.xreadgroup("mygroup", "worker1", {"orders.events": ">"}, count=10, block=5000)
for stream, messages in events:
for msg_id, data in messages:
event = json.loads(data[b"event"])
process(event)
r.xack("orders.events", "mygroup", msg_id)
Failure modes
| Failure | Symptoms | DeltaForge behavior | Resolution |
|---|---|---|---|
| Server unavailable | Connection refused | Retries with backoff; blocks checkpoint | Restore Redis; check network |
| Authentication failure | NOAUTH / WRONGPASS | Fails fast, no retry | Fix auth details in URI |
| OOM (Out of Memory) | OOM command not allowed | Fails batch; retries | Increase maxmemory; enable eviction or trim streams |
| Stream doesn’t exist | Auto-created by XADD | No failure | N/A (XADD creates stream) |
| Connection timeout | Command hangs | Timeout after configured duration | Check network; increase timeout |
| Cluster MOVED/ASK | Redirect errors | Automatic redirect (if cluster mode) | Ensure cluster client configured |
| Replication lag | Writes to replica fail | Fails with READONLY | Write to master only |
| Max stream length | If MAXLEN enforced | Oldest entries trimmed | Expected behavior; not a failure |
| Network partition | Intermittent timeouts | Retries; may have gaps | Restore network |
Failure scenarios and data guarantees
Redis OOM during batch delivery
- DeltaForge sends batch of 100 events via pipeline
- 50 events written, Redis hits maxmemory
- Pipeline fails atomically (all or nothing per pipeline)
- DeltaForge retries entire batch
- If OOM persists: batch blocked until memory available
- Checkpoint only saved after ALL events acknowledged
DeltaForge crash after XADD, before checkpoint
- Batch written to Redis stream successfully
- DeltaForge crashes before saving checkpoint
- On restart: replays from last checkpoint
- Result: Duplicate events in stream (at-least-once)
- Consumer must handle idempotently (check event.id)
Redis failover (Sentinel/Cluster)
- Master fails, Sentinel promotes replica
- In-flight XADD may fail with connection error
- DeltaForge reconnects to new master
- Retries failed batch
- Possible duplicates if original write succeeded
Handling duplicates in consumers
# Idempotent consumer using event ID
processed_ids = set() # Or use Redis SET for distributed dedup
for msg_id, data in messages:
event = json.loads(data[b"event"])
event_id = event["id"]
if event_id in processed_ids:
r.xack("orders.events", "mygroup", msg_id)
continue # Skip duplicate
process(event)
processed_ids.add(event_id)
r.xack("orders.events", "mygroup", msg_id)
Monitoring
DeltaForge exposes these metrics for Redis sink monitoring:
# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink} # Events delivered
deltaforge_sink_batch_total{pipeline,sink} # Batches delivered
deltaforge_sink_latency_seconds{pipeline,sink} # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"} # Stage timing
For Redis server visibility, use Redis’s built-in monitoring:
# Monitor commands in real-time
redis-cli MONITOR
# Get server stats
redis-cli INFO stats
# Check memory usage
redis-cli INFO memory
# Stream-specific info
redis-cli XINFO STREAM orders.events
redis-cli XINFO GROUPS orders.events
Stream management
# Check stream length
redis-cli XLEN orders.events
# Trim to last 10000 entries (approximate)
redis-cli XTRIM orders.events MAXLEN ~ 10000
# Trim to exact length
redis-cli XTRIM orders.events MAXLEN 10000
# View consumer group info
redis-cli XINFO GROUPS orders.events
# Check pending messages
redis-cli XPENDING orders.events mygroup
# Claim stuck messages (after 60 seconds)
redis-cli XCLAIM orders.events mygroup worker2 60000 <message-id>
# Delete processed messages (careful!)
redis-cli XDEL orders.events <message-id>
Notes
- Redis Streams provide at-least-once delivery with consumer group acknowledgements
- Use
MAXLEN ~trimming to prevent unbounded memory growth (approximate is faster) - Consider Redis Cluster for horizontal scaling with multiple streams
- Combine with Redis pub/sub for fan-out to ephemeral subscribers
- For durability, enable AOF persistence with
appendfsync everysecoralways - Monitor memory usage closely; Redis will reject writes when
maxmemoryis reached
Kafka sink
The Kafka sink publishes batches to a Kafka topic using rdkafka.
When to use Kafka
Kafka excels as the backbone for event-driven architectures where durability, ordering, and replay capabilities are critical.
Real-world applications
| Use Case | Description |
|---|---|
| Event sourcing | Store all state changes as an immutable log; rebuild application state by replaying events |
| Microservices integration | Decouple services with async messaging; each service consumes relevant topics |
| Real-time analytics pipelines | Feed CDC events to Spark, Flink, or ksqlDB for streaming transformations |
| Data lake ingestion | Stream database changes to S3/HDFS via Kafka Connect for analytics and ML |
| Audit logging | Capture every database mutation for compliance, debugging, and forensics |
| Cross-datacenter replication | Use MirrorMaker 2 to replicate topics across regions for DR |
Pros and cons
| Pros | Cons |
|---|---|
| ✅ Durability - Configurable replication ensures no data loss | ❌ Operational complexity - Requires ZooKeeper/KRaft, careful tuning |
| ✅ Ordering guarantees - Per-partition ordering with consumer groups | ❌ Latency - Batching and replication add milliseconds of delay |
| ✅ Replay capability - Configurable retention allows reprocessing | ❌ Resource intensive - High disk I/O and memory requirements |
| ✅ Ecosystem - Connect, Streams, Schema Registry, ksqlDB | ❌ Learning curve - Partitioning, offsets, consumer groups to master |
| ✅ Throughput - Handles millions of messages per second | ❌ Cold start - Cluster setup and topic configuration overhead |
| ✅ Exactly-once semantics - Transactions for critical workloads | ❌ Cost - Managed services can be expensive at scale |
Configuration
|
|
Exactly-once semantics
When exactly_once: true, the Kafka sink uses a transactional producer. Each batch is wrapped in a Kafka transaction (begin_transaction / commit_transaction), so either all events in the batch are committed atomically or none are.
sinks:
- type: kafka
config:
id: orders-kafka
brokers: ${KAFKA_BROKERS}
topic: orders
exactly_once: true
How it works
- DeltaForge assigns a stable
transactional.idper pipeline-sink pair (deltaforge-{pipeline}-{sink_id}) - On startup,
init_transactions()registers with the broker and fences any zombie producer from a previous instance - Each batch:
begin_transaction()→ produce messages →commit_transaction() - If commit fails, the transaction is aborted and the batch retried
- Consumers using
isolation.level=read_committedonly see committed batches
Requirements
- Kafka 2.5+ (transaction support)
isolation.level=read_committedon consumers (otherwise they see uncommitted messages)- Only one DeltaForge instance per pipeline at a time (the broker will fence duplicates)
Transactional overrides
When exactly_once: true, DeltaForge automatically configures:
| Setting | Value | Why |
|---|---|---|
transactional.id | deltaforge-{pipeline}-{sink_id} | Broker fencing |
transaction.timeout.ms | 60000 | Max transaction duration |
enable.idempotence | true | Required for transactions |
acks | all | Required for transactions |
message.timeout.ms | 30000 | Must be ≤ transaction.timeout.ms |
delivery.timeout.ms | 30000 | Must be ≤ transaction.timeout.ms |
You do not need to set these in client_conf — they are applied automatically and cannot be overridden.
Fatal errors
If the broker fences the producer (another instance started with the same transactional.id), DeltaForge treats this as a fatal error and stops the pipeline. This is not retryable — resolve the duplicate instance before restarting.
Performance impact
Transactions add ~1-3ms overhead per batch for the two-phase commit. With properly sized batches (max_events=16000, max_bytes=16MB), throughput impact is ~7-11%. See the Performance guide for tuning details and benchmark results.
Recommended client_conf settings
| Setting | Recommended | Description |
|---|---|---|
acks | all | Wait for all replicas for durability |
message.timeout.ms | 30000 | Total time to deliver a message |
retries | 2147483647 | Retry indefinitely (with backoff) |
enable.idempotence | true | Prevent duplicates on retry |
compression.type | lz4 | Balance between CPU and bandwidth |
Consuming events
Kafka CLI
# Consume from beginning
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--from-beginning
# Consume with consumer group
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--group deltaforge-consumers
Go consumer example
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest
group, _ := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
for {
err := group.Consume(ctx, []string{"orders"}, &handler{})
if err != nil {
log.Printf("Consumer error: %v", err)
}
}
type handler struct{}
func (h *handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
var event Event
json.Unmarshal(msg.Value, &event)
process(event)
session.MarkMessage(msg, "")
}
return nil
}
Rust consumer example
#![allow(unused)]
fn main() {
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", "my-group")
.set("auto.offset.reset", "earliest")
.create()?;
consumer.subscribe(&["orders"])?;
loop {
match consumer.recv().await {
Ok(msg) => {
let payload = msg.payload_view::<str>().unwrap()?;
let event: Event = serde_json::from_str(payload)?;
process(event);
consumer.commit_message(&msg, CommitMode::Async)?;
}
Err(e) => eprintln!("Kafka error: {}", e),
}
}
}
Python consumer example
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
event = message.value
process(event)
consumer.commit()
Failure modes
| Failure | Symptoms | DeltaForge behavior | Resolution |
|---|---|---|---|
| Broker unavailable | Connection refused, timeout | Retries with backoff; blocks checkpoint | Restore broker; check network |
| Topic not found | UnknownTopicOrPartition | Fails batch; retries | Create topic or enable auto-create |
| Authentication failure | SaslAuthenticationFailed | Fails fast, no retry | Fix credentials in config |
| Authorization failure | TopicAuthorizationFailed | Fails fast, no retry | Grant ACLs for producer |
| Message too large | MessageSizeTooLarge | Fails message permanently | Increase message.max.bytes or filter large events |
| Leader election | NotLeaderForPartition | Automatic retry after metadata refresh | Wait for election; usually transient |
| Disk full | KafkaStorageException | Retries indefinitely | Add disk space; purge old segments |
| Network partition | Timeouts, partial failures | Retries; may produce duplicates | Restore network; idempotence prevents dups |
| Producer fenced | ProducerFenced error | Fatal — pipeline stops immediately | Ensure only one instance per pipeline; restart after resolving |
| Transaction timeout | transaction.timeout.ms exceeded | Transaction aborted; batch retried | Increase timeout or reduce batch size |
| SSL/TLS errors | Handshake failures | Fails fast | Fix certificates, verify truststore |
Failure scenarios and data guarantees
Broker failure during batch delivery
- DeltaForge sends batch of 100 events
- 50 events delivered, broker crashes
- rdkafka detects failure, retries remaining 50
- If idempotence enabled: no duplicates
- If not: possible duplicates of events near failure point
- Checkpoint only saved after ALL events acknowledged
DeltaForge crash after Kafka ack, before checkpoint
- Batch delivered to Kafka successfully
- DeltaForge crashes before saving checkpoint
- On restart: replays from last checkpoint
- Without
exactly_once: duplicate events in Kafka (at-least-once); consumer must handle idempotently - With
exactly_once: replayed batch gets a new transaction; consumers withread_committedsee duplicates but each batch is atomic
Monitoring recommendations
DeltaForge exposes these metrics for Kafka sink monitoring:
# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink} # Events delivered
deltaforge_sink_batch_total{pipeline,sink} # Batches delivered
deltaforge_sink_latency_seconds{pipeline,sink} # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"} # Stage timing
For deeper Kafka broker visibility, monitor your Kafka cluster directly:
- Broker metrics via JMX or Kafka’s built-in metrics
- Consumer lag via
kafka-consumer-groups.sh - Topic throughput via broker dashboards
Note: Internal
rdkafkaproducer statistics (message queues, broker RTT, etc.) are not currently exposed by DeltaForge. This is a potential future enhancement.
Notes
- Combine Kafka with other sinks to fan out data; use commit policy to control checkpoint behavior
- For exactly-once semantics, set
exactly_once: trueand ensure your Kafka cluster supports transactions (2.5+) - With
exactly_once: false(default), idempotent production is still enabled — duplicates are prevented during retries but not across DeltaForge restarts - Adjust
client_conffor durability (acks=all) or performance based on your requirements - Consider partitioning strategy for ordering guarantees within partitions
NATS sink
The NATS sink publishes events to a NATS JetStream stream for durable, at-least-once delivery.
When to use NATS
NATS JetStream is ideal when you need a lightweight, high-performance messaging system with persistence, without the operational overhead of Kafka.
Real-world applications
| Use Case | Description |
|---|---|
| Edge computing | Lightweight footprint perfect for IoT gateways and edge nodes syncing to cloud |
| Microservices mesh | Request-reply and pub/sub patterns with automatic load balancing |
| Multi-cloud sync | Leaf nodes and superclusters for seamless cross-cloud data replication |
| Kubernetes-native events | NATS Operator for cloud-native deployment; sidecar-friendly architecture |
| Real-time gaming | Low-latency state synchronization for multiplayer game servers |
| Financial data feeds | Stream market data with subject-based routing and wildcards |
| Command and control | Distribute configuration changes and commands to distributed systems |
Pros and cons
| Pros | Cons |
|---|---|
| ✅ Lightweight - Single binary ~20MB; minimal resource footprint | ❌ Smaller ecosystem - Fewer connectors and integrations than Kafka |
| ✅ Simple operations - Zero external dependencies; easy clustering | ❌ Younger persistence - JetStream newer than Kafka’s battle-tested log |
| ✅ Low latency - Sub-millisecond message delivery | ❌ Community size - Smaller community than Kafka or Redis |
| ✅ Flexible patterns - Pub/sub, queues, request-reply, streams | ❌ Tooling maturity - Fewer monitoring and management tools |
✅ Subject hierarchy - Powerful wildcard routing (orders.>, *.events) | ❌ Learning curve - JetStream concepts differ from traditional queues |
| ✅ Multi-tenancy - Built-in accounts and security isolation | ❌ Less enterprise adoption - Fewer case studies at massive scale |
| ✅ Cloud-native - Designed for Kubernetes and distributed systems |
Configuration
|
|
Authentication options
# Credentials file
credentials_file: /etc/nats/creds/user.creds
# Username/password
username: ${NATS_USER}
password: ${NATS_PASSWORD}
# Token
token: ${NATS_TOKEN}
Deduplication
DeltaForge sets the Nats-Msg-Id header on every published message using a deterministic idempotency key derived from the event’s source, transaction, and row identity. JetStream uses this header for server-side deduplication within the stream’s duplicate_window.
This means: if DeltaForge replays a batch after a crash, duplicate messages are automatically discarded by the server — no consumer-side dedup needed (within the dedup window).
Configure duplicate_window on the stream to match your maximum expected replay window:
nats stream add ORDERS \
--subjects "orders.>" \
--duplicate-window 5m # Dedup messages replayed within 5 minutes
The default duplicate_window is 2 minutes. Increase this if DeltaForge might be down longer before restarting.
JetStream setup
Before using the NATS sink with JetStream, create a stream that captures your subject:
# Using NATS CLI
nats stream add ORDERS \
--subjects "orders.>" \
--retention limits \
--storage file \
--replicas 3 \
--max-age 7d
# Verify stream
nats stream info ORDERS
Consuming events
NATS CLI
# Subscribe to subject (ephemeral)
nats sub "orders.>"
# Create durable consumer
nats consumer add ORDERS orders-processor \
--pull \
--ack explicit \
--deliver all \
--max-deliver 3 \
--filter "orders.events"
# Consume messages
nats consumer next ORDERS orders-processor --count 10
Go consumer example
nc, _ := nats.Connect("nats://localhost:4222")
js, _ := nc.JetStream()
// Create or bind to consumer
sub, _ := js.PullSubscribe("orders.events", "orders-processor",
nats.Durable("orders-processor"),
nats.AckExplicit(),
)
for {
msgs, _ := sub.Fetch(10, nats.MaxWait(5*time.Second))
for _, msg := range msgs {
var event Event
json.Unmarshal(msg.Data, &event)
process(event)
msg.Ack()
}
}
Rust consumer example
#![allow(unused)]
fn main() {
use async_nats::jetstream;
let client = async_nats::connect("nats://localhost:4222").await?;
let js = jetstream::new(client);
let stream = js.get_stream("ORDERS").await?;
let consumer = stream.get_consumer("orders-processor").await?;
let mut messages = consumer.messages().await?;
while let Some(msg) = messages.next().await {
let msg = msg?;
let event: Event = serde_json::from_slice(&msg.payload)?;
process(event);
msg.ack().await?;
}
}
Monitoring
DeltaForge exposes these metrics for NATS sink monitoring:
# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink} # Events delivered
deltaforge_sink_batch_total{pipeline,sink} # Batches delivered
deltaforge_sink_latency_seconds{pipeline,sink} # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"} # Stage timing
For NATS server visibility, use the NATS CLI or monitoring endpoint:
# Server info
nats server info
# JetStream account info
nats account info
# Stream statistics
nats stream info ORDERS
# Consumer statistics
nats consumer info ORDERS orders-processor
# Real-time event monitoring
nats events
NATS also exposes a monitoring endpoint (default :8222) with JSON stats:
http://localhost:8222/varz- General server statshttp://localhost:8222/jsz- JetStream statshttp://localhost:8222/connz- Connection stats
Subject design patterns
| Pattern | Example | Use Case |
|---|---|---|
| Hierarchical | orders.us.created | Regional routing |
| Wildcard single | orders.*.created | Any region, specific event |
| Wildcard multi | orders.> | All order events |
| Versioned | v1.orders.events | API versioning |
Failure modes
| Failure | Symptoms | DeltaForge behavior | Resolution |
|---|---|---|---|
| Server unavailable | Connection refused | Retries with backoff; blocks checkpoint | Restore NATS; check network |
| Stream not found | stream not found error | Fails batch; no retry | Create stream or remove stream config |
| Authentication failure | authorization violation | Fails fast, no retry | Fix credentials |
| Subject mismatch | no responders (core NATS) | Fails if no subscribers | Add subscribers or use JetStream |
| JetStream disabled | jetstream not enabled | Fails fast | Enable JetStream on server |
| Storage full | insufficient resources | Retries; eventually fails | Add storage; adjust retention |
| Message too large | message size exceeds maximum | Fails message permanently | Increase max_payload or filter large events |
| Cluster partition | Intermittent failures | Retries with backoff | Restore network; wait for quorum |
| Slow consumer | Publish backpressure | Slows down; may timeout | Scale consumers; increase buffer |
| TLS errors | Handshake failures | Fails fast | Fix certificates |
Failure scenarios and data guarantees
NATS server restart during batch delivery
- DeltaForge sends batch of 100 events
- 50 events published, server restarts
- async_nats detects disconnect, starts reconnecting
- After reconnect, DeltaForge retries remaining 50
- JetStream deduplication prevents duplicates (if enabled)
- Checkpoint only saved after ALL events acknowledged
DeltaForge crash after JetStream ack, before checkpoint
- Batch published to JetStream successfully
- DeltaForge crashes before saving checkpoint
- On restart: replays from last checkpoint
- Result: Duplicate events in stream (at-least-once)
- Consumer must handle idempotently (check event.id)
Stream storage exhausted
- JetStream stream hits max_bytes or max_msgs limit
- With
discard: old→ oldest messages removed, publish succeeds - With
discard: new→ publish rejected - DeltaForge retries on rejection
- Resolution: Increase limits or enable
discard: old
JetStream acknowledgement levels
# Stream configuration affects durability
nats stream add ORDERS \
--replicas 3 \ # R=3 for production
--retention limits \ # or 'workqueue' for single consumer
--discard old \ # Remove oldest when full
--max-age 7d \ # Auto-expire after 7 days
--storage file # Persistent (vs memory)
| Replicas | Guarantee | Use Case |
|---|---|---|
| R=1 | Single node; lost if node fails | Development, non-critical |
| R=3 | Survives 1 node failure | Production default |
| R=5 | Survives 2 node failures | Critical data |
Handling duplicates in consumers
// Use event ID for idempotency
processedIDs := make(map[string]bool) // Or use Redis/DB
for _, msg := range msgs {
var event Event
json.Unmarshal(msg.Data, &event)
if processedIDs[event.ID] {
msg.Ack() // Already processed
continue
}
if err := process(event); err == nil {
processedIDs[event.ID] = true
}
msg.Ack()
}
Notes
- When
streamis specified, the sink verifies the stream exists at connection time - Without
stream, events are published to core NATS (no persistence guarantees) - Connection pooling ensures efficient reuse across batches
- Use replicated streams (
--replicas 3) for production durability - Combine with other sinks to fan out data; use commit policy to control checkpoint behavior
- JetStream provides at-least-once delivery with server-side deduplication via
Nats-Msg-Idwithin the configuredduplicate_window
HTTP/Webhook sink
The HTTP sink delivers CDC events via HTTP POST (or PUT) to any URL — internal services, serverless functions, third-party APIs, webhooks.
When to use HTTP
HTTP is the universal integration point. Use it when:
- Your consumer doesn’t run Kafka/NATS/Redis
- You need to call a REST API or webhook on every database change
- You want the simplest possible setup (DeltaForge + any HTTP server)
- You’re integrating with serverless functions (AWS Lambda, Cloud Functions)
Pros and cons
| Pros | Cons |
|---|---|
| Works with any HTTP server | Higher latency than message queues |
| No infrastructure dependencies | No built-in replay (consumer must be idempotent) |
| Simple auth (Bearer, Basic, headers) | No consumer groups or partitioning |
| URL templates for per-table routing | One request per event (unless batch mode) |
Configuration
|
|
Authentication
All auth is handled via the headers map. Values support ${ENV_VAR} shell expansion.
# Bearer token
headers:
Authorization: "Bearer ${API_TOKEN}"
# Basic auth
headers:
Authorization: "Basic ${BASIC_AUTH_B64}"
# Custom API key
headers:
X-API-Key: "${MY_API_KEY}"
# HMAC signature (computed externally, injected via env)
headers:
X-Signature: "${WEBHOOK_SIGNATURE}"
URL templates
Route events to different URLs based on event fields:
# Per-table endpoint
url: "https://api.example.com/cdc/${source.table}"
# → https://api.example.com/cdc/orders
# → https://api.example.com/cdc/customers
# Per-database endpoint
url: "https://${source.db}.api.internal/events"
# Static URL (most common)
url: "https://api.example.com/webhook"
Batch mode
By default, the sink sends one HTTP request per event. Enable batch_mode: true to send a JSON array of events in a single request:
# Per-event mode (default): one POST per event
batch_mode: false
# Body: {"id": "...", "op": "c", "after": {...}}
# Batch mode: one POST with JSON array
batch_mode: true
# Body: [{"id": "...", "op": "c", ...}, {"id": "...", "op": "u", ...}]
Batch mode reduces HTTP overhead but means the consumer must handle arrays.
Retry behavior
| Condition | Behavior |
|---|---|
| 2xx response | Success |
| 408, 429 | Retry with backoff (100ms → 10s, 3 attempts) |
| 5xx | Retry with backoff |
| Connection error | Retry with backoff |
| Timeout | Retry with backoff |
| 401, 403 | Auth error — fail immediately, no retry |
| Other 4xx | Permanent failure — fail batch |
Failure modes
| Failure | Symptoms | DeltaForge behavior | Resolution |
|---|---|---|---|
| Endpoint unavailable | Connection refused | Retries with backoff; blocks checkpoint | Restore endpoint |
| Authentication failure | 401/403 response | Fails fast, no retry | Fix credentials in headers |
| Rate limited | 429 response | Retries with backoff | Reduce throughput or increase rate limit |
| Timeout | Request exceeds send_timeout_secs | Retries | Increase timeout or fix slow endpoint |
| URL template error | Template resolves to empty | Event → DLQ (if enabled) | Fix template or event data |
Consuming events
Node.js / Express
app.post('/webhook', (req, res) => {
const event = req.body;
console.log(`${event.op} on ${event.source.table}: ${JSON.stringify(event.after)}`);
res.sendStatus(200);
});
Python / Flask
@app.route('/webhook', methods=['POST'])
def webhook():
event = request.json
process(event)
return '', 200
Go
http.HandleFunc("/webhook", func(w http.ResponseWriter, r *http.Request) {
var event Event
json.NewDecoder(r.Body).Decode(&event)
process(event)
w.WriteHeader(http.StatusOK)
})
Notes
- Connection pooling is automatic —
reqwestreuses TCP connections to the same host - The
Content-Typeheader is set toapplication/jsonby default - At-least-once delivery: on crash, events may be re-sent. Consumers should be idempotent.
- For per-event dedup, use the
idfield in the event payload (UUID v7, stable across replays)
Envelopes and Encodings
DeltaForge supports configurable envelope formats and wire encodings for sink output. This allows you to match the output format expected by your downstream consumers without forcing code changes on them.
Overview
Every CDC event flows through two stages before being written to a sink:
Event -> Envelope (structure) -> Encoding (bytes) -> Sink
- Envelope: Controls the JSON structure of the output (what fields exist, how they’re nested)
- Encoding: Controls the wire format (JSON bytes, Avro binary with Schema Registry)
Envelope Formats
Native (default)
The native envelope serializes events directly with minimal overhead. This is DeltaForge’s own format, optimized for efficiency and practical use cases.
Note
The native envelope format may evolve over time as we adapt to user needs and optimize for the lowest possible overhead. If you need a stable, standardized format, consider using
debeziumorcloudeventsenvelopes which follow their respective established specifications.
sinks:
- type: kafka
config:
id: events-kafka
brokers: localhost:9092
topic: events
envelope:
type: native
Output:
{
"before": null,
"after": {"id": 1, "name": "Alice", "email": "alice@example.com"},
"source": {
"version": "0.1.0",
"connector": "mysql",
"name": "orders-db",
"ts_ms": 1700000000000,
"db": "shop",
"table": "customers",
"server_id": 1,
"file": "mysql-bin.000003",
"pos": 12345
},
"op": "c",
"ts_ms": 1700000000000
}
When to use:
- Maximum performance with lowest overhead
- Custom consumers that parse the payload directly
- When format stability is less important than efficiency
- Internal systems where you control both producer and consumer
Debezium
The Debezium envelope wraps the event in a {"schema": null, "payload": ...} structure,
following the Debezium event format specification.
This uses schemaless mode (schema: null), which is equivalent to Debezium’s
JsonConverter with schemas.enable=false. This is the recommended configuration
for most production deployments as it avoids the overhead of inline schemas.
sinks:
- type: kafka
config:
id: events-kafka
brokers: localhost:9092
topic: events
envelope:
type: debezium
Output:
{
"schema": null,
"payload": {
"before": null,
"after": {"id": 1, "name": "Alice", "email": "alice@example.com"},
"source": {
"version": "0.1.0",
"connector": "mysql",
"name": "orders-db",
"ts_ms": 1700000000000,
"db": "shop",
"table": "customers"
},
"op": "c",
"ts_ms": 1700000000000
}
}
When to use:
- Kafka Connect consumers expecting full Debezium format
- Existing Debezium-based pipelines you’re migrating from
- Tools that specifically parse the
payloadwrapper - When you need a stable, well-documented format with broad ecosystem support
Note: When using Avro encoding with Schema Registry, schema handling is at the encoding layer — schema IDs are embedded in the Confluent wire format.
CloudEvents
The CloudEvents envelope restructures events to the CloudEvents 1.0 specification, a CNCF project that defines a vendor-neutral format for event data. This format strictly follows the CloudEvents spec and is guaranteed to remain compliant.
sinks:
- type: kafka
config:
id: events-kafka
brokers: localhost:9092
topic: events
envelope:
type: cloudevents
type_prefix: "com.example.cdc"
Output:
{
"specversion": "1.0",
"id": "550e8400-e29b-41d4-a716-446655440000",
"source": "deltaforge/orders-db/shop.customers",
"type": "com.example.cdc.created",
"time": "2024-01-15T10:30:00.000Z",
"datacontenttype": "application/json",
"subject": "shop.customers",
"data": {
"before": null,
"after": {"id": 1, "name": "Alice", "email": "alice@example.com"},
"op": "c"
}
}
The type field is constructed from your type_prefix plus the operation:
com.example.cdc.created(INSERT)com.example.cdc.updated(UPDATE)com.example.cdc.deleted(DELETE)com.example.cdc.snapshot(READ/snapshot)com.example.cdc.truncated(TRUNCATE)
When to use:
- AWS EventBridge, Azure Event Grid, or other CloudEvents-native platforms
- Serverless architectures (Lambda, Cloud Functions)
- Event-driven microservices using CloudEvents SDKs
- Standardized event routing based on
typefield - When you need a vendor-neutral, CNCF-backed standard format
Wire Encodings
JSON (default)
Standard UTF-8 JSON encoding. Human-readable and widely supported.
sinks:
- type: kafka
config:
id: events-kafka
brokers: localhost:9092
topic: events
encoding: json
Content-Type: application/json
When to use:
- Development and debugging
- Consumers that expect JSON
- When human readability matters
- Most use cases (good default)
Avro (with Schema Registry)
Avro encoding produces compact binary payloads using the Confluent wire format:
[0x00][4-byte schema ID (big-endian)][Avro binary payload]
This format is natively understood by Kafka Connect, ksqlDB, Apache Flink, and any Confluent-compatible consumer.
sinks:
- type: kafka
config:
id: events-kafka
brokers: localhost:9092
topic: events
encoding:
type: avro
schema_registry_url: "http://schema-registry:8081"
subject_strategy: topic_name # default
Content-Type: application/avro
Configuration
| Field | Type | Default | Description |
|---|---|---|---|
schema_registry_url | string | — | Confluent-compatible Schema Registry URL |
subject_strategy | string | topic_name | Subject naming strategy (see below) |
username | string | — | Basic auth username for Schema Registry |
password | string | — | Basic auth password for Schema Registry |
unsigned_bigint_mode | string | string | How to map MySQL BIGINT UNSIGNED (string or long) |
enum_mode | string | string | How to map ENUM types (string or enum) |
naive_timestamp_mode | string | string | How to map naive timestamps (string or timestamp) |
Subject naming strategies
| Strategy | Subject pattern | Use case |
|---|---|---|
topic_name | {topic}-value | One schema per Kafka topic (default, most common) |
record_name | {record_name} | One schema per record type, shared across topics |
topic_record_name | {topic}-{record_name} | Per-topic, per-record schema |
Schema source
When source DDL is available (MySQL INFORMATION_SCHEMA, PostgreSQL pg_catalog), DeltaForge derives precise Avro schemas from the actual column types and nullability. This is the recommended path for production.
When DDL is not available (e.g., processor-created synthetic events), DeltaForge falls back to inferring the Avro schema from the JSON event structure. This is less precise (no distinction between int/bigint, all fields nullable).
Schema IDs are cached per subject — only the first event per table triggers a Schema Registry HTTP call.
Type conversion policies
DeltaForge defaults to safe type mappings that prioritize correctness over convenience:
| Source type | Default Avro type | Why | Override |
|---|---|---|---|
MySQL BIGINT UNSIGNED | string | Values ≥ 2^63 overflow Avro long | unsigned_bigint_mode: long |
MySQL/PG ENUM | string | Avro enum symbol changes break compatibility | enum_mode: enum |
MySQL DATETIME | string (ISO-8601) | Not a UTC instant — Avro timestamp-millis is semantically wrong | naive_timestamp_mode: timestamp |
PG timestamp (no tz) | string (ISO-8601) | Same as above — naive local time, not an instant | naive_timestamp_mode: timestamp |
MySQL TIMESTAMP | timestamp-millis | Stored as UTC — safe to use logical type | — |
PG timestamptz | timestamp-micros | Stored as UTC — safe to use logical type | — |
DECIMAL(p,s) | decimal logical type | Uses exact precision/scale from DDL | — |
PG numeric (no precision) | string | Unbounded precision can’t be expressed in Avro decimal | — |
Full type mapping tables for MySQL and PostgreSQL are documented in the Avro Schema Registry RFC.
Schema evolution
When the source table schema changes (column added/removed/altered), DeltaForge derives a new Avro schema and registers it as a new version. The Schema Registry’s compatibility rules control acceptance:
- Default:
BACKWARD— new schema can read old data (consumers can upgrade first) - DeltaForge respects existing subject compatibility settings in the SR
Schema Registry failure handling
| Condition | Behavior |
|---|---|
| SR unavailable, schema cached | Continue encoding with cached schema ID. Metric: deltaforge_avro_sr_cache_fallback_total |
| SR unavailable, no cache | Fail the batch — cannot encode without a schema ID |
| SR rejects new schema (compatibility) | Try encoding with cached schema; if encoding fails → DLQ |
When to use
- Kafka Connect sinks expecting Avro (JDBC Sink, S3 Sink, Elasticsearch Sink)
- ksqlDB streams and tables
- Apache Flink CDC consumers
- When you need compact binary payloads (~40-60% smaller than JSON)
- When you want schema evolution enforcement via Schema Registry compatibility rules
- Production pipelines where schema governance matters
Configuration Examples
Kafka with CloudEvents
sinks:
- type: kafka
config:
id: orders-kafka
brokers: ${KAFKA_BROKERS}
topic: order-events
envelope:
type: cloudevents
type_prefix: "com.acme.orders"
encoding: json
required: true
Redis with Debezium envelope
sinks:
- type: redis
config:
id: orders-redis
uri: ${REDIS_URI}
stream: orders
envelope:
type: debezium
encoding: json
NATS with native envelope
sinks:
- type: nats
config:
id: orders-nats
url: ${NATS_URL}
subject: orders.events
stream: ORDERS
envelope:
type: native
encoding: json
Kafka with Avro encoding
sinks:
- type: kafka
config:
id: events-avro
brokers: ${KAFKA_BROKERS}
topic: cdc-events
envelope:
type: debezium
encoding:
type: avro
schema_registry_url: "http://schema-registry:8081"
subject_strategy: topic_name
required: true
Multi-sink with different formats
Different consumers may expect different formats. Configure each sink independently:
sinks:
# Kafka Connect expects Debezium format
- type: kafka
config:
id: connect-sink
brokers: ${KAFKA_BROKERS}
topic: connect-events
envelope:
type: debezium
required: true
# Lambda expects CloudEvents
- type: kafka
config:
id: lambda-sink
brokers: ${KAFKA_BROKERS}
topic: lambda-events
envelope:
type: cloudevents
type_prefix: "com.acme.cdc"
required: false
# Analytics wants raw events
- type: redis
config:
id: analytics-redis
uri: ${REDIS_URI}
stream: analytics
envelope:
type: native
Operation Mapping
DeltaForge uses Debezium-compatible operation codes:
| Operation | Code | Description |
|---|---|---|
| Create/Insert | c | New row inserted |
| Update | u | Existing row modified |
| Delete | d | Row deleted |
| Read | r | Snapshot read (initial load) |
| Truncate | t | Table truncated |
These codes appear in the op field regardless of envelope format.
Performance Considerations
| Envelope | Overhead | Format Stability | Use Case |
|---|---|---|---|
| Native | Baseline (minimal) | May evolve | High-throughput, internal systems |
| Debezium | ~14 bytes | Stable (follows Debezium spec) | Kafka Connect, Debezium ecosystem |
| CloudEvents | ~150-200 bytes | Stable (follows CNCF spec) | Serverless, event-driven architectures |
| Encoding | Size vs JSON | CPU cost | Schema governance |
|---|---|---|---|
| JSON | Baseline | Lowest | None |
| Avro | ~40-60% smaller | Moderate (schema lookup + binary encoding) | Schema Registry enforced |
The native envelope is recommended for maximum throughput when you control both ends of the pipeline. For interoperability with external systems or when format stability is critical, use debezium or cloudevents.
Defaults
If not specified, sinks use:
- Envelope:
native - Encoding:
json
The native envelope provides the lowest overhead for high-throughput scenarios. If you need format stability guarantees, use debezium or cloudevents which adhere to their respective established specifications.
For Kafka pipelines where schema governance and compact payloads matter, use avro encoding with the Debezium envelope — this is the standard pattern for production Kafka Connect integration.
Dynamic Routing
Dynamic routing controls where each CDC event is delivered - which Kafka topic, Redis stream, or NATS subject receives it. By default, all events go to the single destination configured in the sink (static routing). With dynamic routing, events can be split across destinations based on their content or other attributes of events, pipeline and etc.
Overview
There are two routing mechanisms, and they compose naturally:
- Template strings in sink config - resolve per-event from event fields
- JavaScript
ev.route()- programmatic per-event routing in processors
When both are used, ev.route() overrides take highest priority, then template resolution, then the static config value.
Template Routing
Replace static topic/stream/subject strings with templates containing ${...} variables. Templates are compiled once at startup and resolved per-event with zero regex overhead.
Kafka
sinks:
- type: kafka
config:
id: kafka-routed
brokers: ${KAFKA_BROKERS}
topic: "cdc.${source.db}.${source.table}"
key: "${after.customer_id}"
envelope:
type: debezium
Events from shop.orders -> topic cdc.shop.orders, partitioned by customer_id.
Redis
sinks:
- type: redis
config:
id: redis-routed
uri: ${REDIS_URI}
stream: "events:${source.table}"
key: "${after.id}"
Events from orders -> stream events:orders. The key value appears as the df-key field in each stream entry.
NATS
sinks:
- type: nats
config:
id: nats-routed
url: ${NATS_URL}
subject: "cdc.${source.db}.${source.table}"
key: "${after.id}"
stream: CDC
Events from shop.orders -> subject cdc.shop.orders. The key value appears as the df-key NATS header.
Available Variables
| Variable | Description | Example value |
|---|---|---|
${source.table} | Table name | orders |
${source.db} | Database name | shop |
${source.schema} | Schema name (PostgreSQL) | public |
${source.connector} | Source type | mysql |
${op} | Operation code | c, u, d, r, t |
${after.<field>} | Field from after image | 42, cust-abc |
${before.<field>} | Field from before image | old-value |
${tenant_id} | Pipeline tenant ID | acme |
Missing fields resolve to an empty string. A warning is logged once per unique template, not per event.
Static strings (no ${...}) are detected at parse time and have zero overhead on the hot path - no allocation, no resolution.
Env Vars vs Templates
Both use ${...} syntax. The config loader expands environment variables first. Unknown variables pass through as templates for runtime resolution:
brokers: ${KAFKA_BROKERS} # env var - expanded at load time
topic: "cdc.${source.table}" # template - passed through to runtime
key: "${after.customer_id}" # template - resolved per-event
JavaScript Routing
For routing logic that goes beyond field substitution, use ev.route() in a JavaScript processor. This lets you make conditional routing decisions based on event content.
processors:
- type: javascript
id: smart-router
inline: |
function processBatch(events) {
for (const ev of events) {
if (!ev.after) continue;
if (ev.after.total_amount > 10000) {
ev.route({
topic: "orders.priority",
key: String(ev.after.customer_id),
headers: {
"x-tier": "high-value",
"x-amount": String(ev.after.total_amount)
}
});
} else {
ev.route({
topic: "orders.standard",
key: String(ev.after.customer_id)
});
}
}
return events;
}
ev.route() fields
| Field | Type | Description |
|---|---|---|
topic | string | Override destination (topic, stream, or subject) |
key | string | Override message/partition key |
headers | object | Key-value pairs added to the message |
All fields are optional. Only set fields override; omitted fields fall through to config templates or static values.
Calling ev.route() replaces any previous routing on that event - it does not merge.
How headers are delivered
| Sink | Key delivery | Header delivery |
|---|---|---|
| Kafka | Kafka message key | Kafka message headers |
| Redis | df-key field in stream entry | df-headers field (JSON) |
| NATS | df-key NATS header | Individual NATS headers |
Resolution Order
For each event, the destination is resolved in priority order:
ev.route() override → config template → static config value
Specifically:
- If the event has
routing.topicset (viaev.route()or programmatically), use it - If the sink config contains a template (has
${...}), resolve it from event fields - Otherwise, use the static config string
The same order applies independently to key and headers.
Examples
See the complete example configurations:
- Dynamic Routing - template-based routing across Kafka, Redis, and NATS
- JavaScript Routing - conditional routing with
ev.route()based on business logic
Related
- Configuration Reference — full sink config fields
- JavaScript Processors — processor API reference
- Sinks Overview — multi-sink patterns and commit policies
Outbox Pattern
The transactional outbox pattern guarantees that domain events are published whenever the corresponding database change is committed - no two-phase commit required. DeltaForge supports this natively for both MySQL and PostgreSQL with zero application-side polling.
How It Works
┌─────────────────┐ ┌──────────────┐ ┌──────────────────┐ ┌──────┐
│ Application │────>│ Database │────>│ DeltaForge │────>│ Sink │
│ (writes data │ │ (outbox │ │ (captures + │ │ │
│ + outbox msg) │ │ table/WAL) │ │ transforms) │ │ │
└─────────────────┘ └──────────────┘ └──────────────────┘ └──────┘
- Your application writes business data and an outbox message in the same transaction.
- DeltaForge captures the outbox event through the database’s native replication stream.
- The
OutboxProcessorextracts the payload, resolves the destination topic, and sets routing headers. - The transformed event flows to sinks like any other CDC event.
Because the outbox write is part of the application transaction, the event is guaranteed to be published if and only if the transaction commits.
Source Configuration
Each database uses its native mechanism — there is nothing to install or poll.
PostgreSQL — WAL Messages
PostgreSQL uses pg_logical_emit_message() to write messages directly into the WAL. No table is needed.
-- In your application transaction:
BEGIN;
INSERT INTO orders (id, total) VALUES (42, 99.99);
SELECT pg_logical_emit_message(
true, -- transactional: tied to the enclosing TX
'outbox', -- prefix: matched by DeltaForge
'{"aggregate_type":"Order","aggregate_id":"42","event_type":"OrderCreated","payload":{"total":99.99}}'
);
COMMIT;
Source config:
source:
type: postgres
config:
id: orders-pg
dsn: ${POSTGRES_DSN}
slot: deltaforge_orders
publication: orders_pub
tables: [public.orders]
outbox:
prefixes: [outbox]
| Field | Type | Description |
|---|---|---|
outbox.prefixes | array | WAL message prefixes to capture. Supports glob patterns: outbox (exact), outbox_% (prefix match), * (all). |
The message prefix becomes source.table on the event, so processors can filter by prefix when multiple outbox channels share a pipeline.
MySQL — Table Inserts
MySQL uses a regular table. For production, use the BLACKHOLE storage engine so rows are written to the binlog but never stored on disk.
-- Create outbox table (BLACKHOLE = no disk storage, binlog only)
CREATE TABLE outbox (
id INT AUTO_INCREMENT PRIMARY KEY,
aggregate_type VARCHAR(64),
aggregate_id VARCHAR(64),
event_type VARCHAR(64),
payload JSON
) ENGINE=BLACKHOLE;
-- In your application transaction:
BEGIN;
INSERT INTO orders (id, total) VALUES (42, 99.99);
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES ('Order', '42', 'OrderCreated', '{"total": 99.99}');
COMMIT;
Source config:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.outbox
outbox:
tables: ["shop.outbox"]
| Field | Type | Description |
|---|---|---|
outbox.tables | array | Table patterns to tag as outbox events. Supports globs: shop.outbox (exact), *.outbox (any database), shop.outbox_% (prefix). |
Note: The outbox table must be included in the source’s
tableslist so DeltaForge subscribes to its binlog events. Only INSERTs are captured - UPDATE and DELETE on the outbox table are ignored.
Outbox Processor
The OutboxProcessor transforms raw outbox events into routed, sink-ready events. Add it to your processors list:
processors:
- type: outbox
topic: "${aggregate_type}.${event_type}"
default_topic: "events.unrouted"
| Field | Type | Default | Description |
|---|---|---|---|
id | string | "outbox" | Processor identifier |
tables | array | [] | Filter: only process outbox events matching these patterns. Empty = all outbox events. |
topic | string | - | Topic template resolved against the raw payload using ${field} placeholders |
default_topic | string | - | Fallback topic when template resolution fails and no topic column exists |
columns | object | (see below) | Column name mappings for extracting outbox fields |
additional_headers | map | {} | Forward extra payload fields as routing headers. Key = header name, value = column name. |
raw_payload | bool | false | When true, deliver the extracted payload as-is to sinks, bypassing envelope wrapping (native/debezium/cloudevents). Metadata is still available via routing headers. |
key | string | - | Key template resolved against raw payload. Sets routing.key for sink partitioning. Default: aggregate_id value. |
strict | bool | false | When true, fail the batch if required fields are missing (topic, payload, aggregate_type, aggregate_id, event_type). When false, missing fields are silently skipped. |
Column Mappings
Column mappings control header extraction and payload rewriting - they tell the processor which fields correspond to aggregate_type, aggregate_id, etc. for setting df-* headers. The topic template resolves directly against the raw payload, so you reference your actual column names there.
| Column | Default | Header | Description |
|---|---|---|---|
payload | "payload" | - | Event body. Extracted and promoted to event.after. |
aggregate_type | "aggregate_type" | df-aggregate-type | Aggregate root type (e.g. Order). |
aggregate_id | "aggregate_id" | df-aggregate-id | Aggregate root ID. Also used as default routing key. |
event_type | "event_type" | df-event-type | Domain event type (e.g. OrderCreated). |
topic | "topic" | - | Per-row topic override (used when template is absent). |
event_id | "id" | df-event-id | Event identity for idempotency/dedup. |
If your outbox payload uses non-default field names, override them:
processors:
- type: outbox
topic: "${aggregate_type}.${event_type}"
columns:
payload: data # default: "payload"
aggregate_type: type # default: "aggregate_type"
aggregate_id: key # default: "aggregate_id"
event_type: action # default: "event_type"
topic: destination # default: "topic"
event_id: uuid # default: "id"
Additional Headers
Forward arbitrary payload fields as routing headers. This is useful when migrating from Debezium’s table.fields.additional.placement or when downstream consumers need extra metadata:
processors:
- type: outbox
topic: "${aggregate_type}.${event_type}"
additional_headers:
x-trace-id: trace_id
x-correlation-id: correlation_id
x-source-region: region
Each key becomes a header name, each value is the column name in the outbox payload. Missing columns are silently skipped - no error if a row doesn’t contain the field.
Typed Extraction
Header values are extracted as strings regardless of the source JSON type. Numeric IDs, booleans, and string values are all stringified automatically:
| JSON value | Header value |
|---|---|
"abc-123" | abc-123 |
42 | 42 |
true | true |
null / missing | (skipped) |
{} / [] | (skipped) |
What the Processor Does
- Identifies outbox events by the
__outboxsentinel onsource.schema(set by the source). - Extracts
aggregate_type,aggregate_id,event_type, andpayloadfromevent.after. - Resolves the topic using a three-step cascade:
- Template resolved against the raw payload (e.g.
${domain}.${action}— use your actual column names) - Column value (a
topicfield in the payload, configurable viacolumns.topic) default_topicfallback
- Template resolved against the raw payload (e.g.
- Rewrites
event.afterto just thepayloadcontent. - Sets routing headers:
df-event-id,df-aggregate-type,df-aggregate-id,df-event-type, plus anyadditional_headersmappings. - Sets routing key using the key template (or falls back to
aggregate_id). - Marks raw delivery if
raw_payload: true— sinks serializeevent.afterdirectly, skipping envelope wrapping. - Clears the
__outboxsentinel so the event looks like a normal CDC event to sinks. - Drops non-INSERT outbox events (UPDATE/DELETE on the outbox table are meaningless).
- Validates in strict mode (
strict: true): fails the batch with an error if any required field is missing (topic, payload, aggregate_type, aggregate_id, event_type). The error names the missing fields so operators can fix the schema. In lenient mode (default), missing fields are silently skipped. - Passes through all non-outbox events unchanged.
Multi-Outbox Routing
When a source captures multiple outbox channels, use the tables filter to scope each processor:
processors:
- type: outbox
tables: [orders_outbox]
topic: "orders.${event_type}"
- type: outbox
tables: [payments_outbox]
topic: "payments.${event_type}"
columns:
payload: data
Complete Examples
PostgreSQL → Kafka
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: order-events
tenant: acme
spec:
source:
type: postgres
config:
id: pg1
dsn: ${POSTGRES_DSN}
publication: orders_pub
slot: orders_slot
tables: [public.orders]
outbox:
prefixes: [outbox]
processors:
- type: outbox
topic: "${aggregate_type}.${event_type}"
default_topic: "events.unrouted"
raw_payload: true
sinks:
- type: kafka
config:
id: k1
brokers: ${KAFKA_BROKERS}
topic: "events.fallback"
The raw_payload: true means outbox events hit the wire as the extracted payload JSON. Regular CDC events (from public.orders) still use the sink’s configured envelope.
MySQL → Kafka (Multi-Outbox)
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: shop-events
tenant: acme
spec:
source:
type: mysql
config:
id: m1
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.orders_outbox
- shop.payments_outbox
outbox:
tables: ["shop.*_outbox"]
processors:
- type: outbox
tables: [orders_outbox]
topic: "orders.${event_type}"
raw_payload: true
- type: outbox
tables: [payments_outbox]
topic: "payments.${event_type}"
raw_payload: true
sinks:
- type: kafka
config:
id: k1
brokers: ${KAFKA_BROKERS}
topic: "events.default"
Migrating from Debezium
If you’re using Debezium’s outbox event router with a custom schema, DeltaForge’s column mappings and additional_headers map directly. For example, given this Debezium-style outbox table:
CREATE TABLE outbox_events (
id UUID PRIMARY KEY,
aggregatetype VARCHAR(64),
aggregateid VARCHAR(64),
type VARCHAR(64),
payload JSONB,
traceid VARCHAR(64),
tenant VARCHAR(32)
);
Debezium config:
transforms.outbox.table.field.event.id=id
transforms.outbox.table.field.event.key=aggregateid
transforms.outbox.table.field.event.type=type
transforms.outbox.table.field.event.payload=payload
transforms.outbox.table.fields.additional.placement=traceid:header,tenant:header
DeltaForge equivalent:
processors:
- type: outbox
topic: "${aggregatetype}.${type}"
key: "${aggregateid}"
raw_payload: true
columns:
event_id: id
aggregate_type: aggregatetype
aggregate_id: aggregateid
event_type: type
payload: payload
additional_headers:
x-trace-id: traceid
x-tenant: tenant
Key differences from Debezium:
- Topic template references raw column names directly —
${aggregatetype}, not${aggregate_type} raw_payload: truedelivers the extracted payload as-is — same behavior as Debezium’s outbox event router- Column mappings only affect header extraction (
df-*headers) and payload rewriting additional_headersreplacestable.fields.additional.placement- No SMT chain — everything is in one processor config
Observability
The outbox processor emits Prometheus-compatible metrics:
| Metric | Labels | Description |
|---|---|---|
deltaforge_outbox_transformed_total | - | Events successfully transformed |
deltaforge_outbox_dropped_total | reason | Events dropped or rejected |
Drop reasons:
| Reason | Meaning |
|---|---|
non_insert | UPDATE/DELETE on outbox table (expected, harmless) |
non_object | event.after is not a JSON object |
null_payload | event.after is null |
strict_missing_fields | Strict mode: required field missing (batch fails) |
In strict mode, strict_missing_fields increments the counter and returns an error that halts the batch - the pipeline will not silently lose events.
Tips
- PostgreSQL WAL messages are lightweight. No table, no index, no vacuum - just a single WAL entry per message. Prefer this over table-based outbox when using PostgreSQL.
- MySQL BLACKHOLE engine avoids storing outbox rows on disk. The row is written to the binlog and immediately discarded. Use it in production to avoid unbounded table growth.
- Outbox events coexist with normal CDC. The processor passes through all non-outbox events untouched, so you can mix regular table capture and outbox in the same pipeline.
- Topic templates use
${field}syntax, same as dynamic routing. The template resolves directly against the raw outbox payload columns - use your actual column names like${domain}.${action}, no remapping needed. - At-least-once delivery applies to outbox events just like regular CDC events. Downstream consumers should be idempotent - use the
df-event-idheader for idempotency, oraggregate_id+event_typeas a composite dedup key. - Malformed events are logged and dropped by default. Non-INSERT operations, null payloads, and non-object payloads produce a WARN-level log with the table name and reason. They do not propagate to sinks. Enable
strict: trueto fail the batch instead of dropping - this ensures operators are alerted to schema issues before events are lost.
Guarantees & Correctness
This page defines DeltaForge’s data delivery guarantees, ordering model, transaction semantics, failure handling, and operational boundaries. Every claim here is backed by the implementation — no aspirational statements.
Delivery Guarantees
Per-sink delivery tiers
| Sink | Delivery guarantee | Dedup mechanism | Consumer action required |
|---|---|---|---|
Kafka (exactly_once: true) | End-to-end exactly-once | Kafka two-phase commit per batch | Set isolation.level=read_committed |
| Kafka (default) | At-least-once (idempotent producer) | Retries are deduped by rdkafka; crash-replay produces duplicates | Dedup by event ID or idempotency key |
| NATS JetStream | At-least-once + server-side dedup | Nats-Msg-Id header within duplicate_window | Configure duplicate_window on stream |
| Redis Streams | At-least-once + consumer-side dedup | idempotency_key field in XADD payload | Check idempotency_key before processing |
Terminology rule: “exactly-once” is used only when DeltaForge guarantees no duplicates without consumer cooperation. All other sinks are “at-least-once” with a stated dedup mechanism. This distinction matters — calling NATS or Redis “exactly-once” would be misleading because dedup depends on server configuration or consumer behavior outside DeltaForge’s control.
What “at-least-once” means
- No data loss: every event from the source is delivered to the sink at least once. Checkpoints are saved only after the sink acknowledges delivery — never before.
- Duplicates on crash recovery: if DeltaForge crashes after delivering a batch but before saving the checkpoint, that batch is replayed on restart. Consumers must handle duplicates (see Consumer Guidance below).
- No silent drops: events are never discarded. If delivery fails, the batch is retried with exponential backoff until it succeeds or a fatal error stops the pipeline.
What “exactly-once” means (Kafka)
With exactly_once: true, each batch is wrapped in a Kafka transaction (begin_transaction / commit_transaction). Consumers using isolation.level=read_committed only see committed batches — no partial deliveries. If a transaction fails, it is aborted and retried from the same checkpoint position.
Exactly-once overhead is ~7-11% with properly tuned batch sizes. See the Performance guide for benchmark details.
Ordering Model
Within a source
Events are emitted in the source’s native order:
- MySQL: binlog file + position order.
WriteRowsEventbatches preserve row order within each binlog event. - PostgreSQL: LSN (Log Sequence Number) order. One WAL message per row change.
- Turso:
change_idorder (monotonically increasing).
DeltaForge does not reorder events. The source order is preserved through the pipeline.
Within a batch
All events in a batch maintain their source order. The delivery task processes batches in FIFO order from a bounded channel — no reordering between batches.
Per-primary-key ordering (the core guarantee)
DeltaForge guarantees per-primary-key ordering within a table under non-sharded operation. This means: for any single row identified by its primary key, all changes (INSERT, UPDATE, DELETE) are delivered to the sink in the exact order they occurred in the source database.
For Kafka specifically: the default message key is the serialized primary key, so events for the same row always go to the same partition and arrive in order. With dynamic routing (key template), ordering follows the resolved key — events with the same key are ordered; different keys may land in different partitions.
Cross-table ordering
There is no global ordering across tables. Events from different tables may be interleaved across batches. This is by design — enforcing global ordering would require single-threaded delivery, which would cap throughput.
However, when batch.respect_source_tx: true (the default), all rows from a single database transaction are kept in the same batch (see Transaction Boundaries below). This preserves causal ordering within a transaction.
Ordering under retries
When a batch delivery fails and is retried, the batch is re-delivered as a unit in the same order. No reordering occurs within or across retries. The max_inflight=1 setting (default) ensures strict ordering; with max_inflight > 1, batches are still delivered in FIFO order by the single-threaded delivery task.
Cross-sink ordering
All sinks receive the same batch simultaneously. The relative order of events is identical across all sinks.
Transaction Boundaries
How it works
When batch.respect_source_tx: true (the default), the coordinator checks each event’s tx_end flag before splitting a batch:
- MySQL:
tx_endis set on the last row of each XID (transaction commit) event. - PostgreSQL:
tx_endis set on the COMMIT WAL record. - Turso: each change is its own transaction (
tx_endalways true).
The batch accumulator will not split a batch at a point that would separate rows from the same transaction. If the batch limit (max_events or max_bytes) is reached mid-transaction, the batch grows beyond the limit to include all remaining rows in that transaction.
What this guarantees
- All rows from one database transaction appear in the same batch.
- Each batch is delivered atomically to each sink (all events in the batch succeed or fail together).
- With Kafka
exactly_once: true, the entire batch is committed as a single Kafka transaction — consumers see all rows from the DB transaction atomically. - Cross-table transactions: a transaction spanning tables A and B is emitted as a single batch containing events for both tables, tagged with the same
tx_id. The batch is delivered atomically. This is stronger than “tagged but not grouped” — all events from one DB transaction are in one batch and delivered as a unit.
Precise transaction semantics
To avoid ambiguity, here is exactly what DeltaForge guarantees about transactions:
- Events from one source transaction are emitted contiguously within a single batch.
- Multi-table transactions preserve commit grouping — all rows from tables A and B in one DB transaction appear in the same batch.
- Within a single sink, events from one transaction are delivered atomically (the batch succeeds or fails as a unit).
- Across heterogeneous sinks, DeltaForge does not guarantee atomic commit. Kafka may commit a transaction while Redis is still retrying. Each sink’s checkpoint tracks its own progress independently.
- Retries do not break transaction grouping — a retried batch contains the same events in the same order.
- Under non-sharded operation, no sink may observe partial progress within a source transaction (the batch is the commit unit).
Edge cases
- A single database transaction that exceeds
max_eventsormax_bytesis still kept in one batch. The limits are exceeded rather than the transaction being split. - With
respect_source_tx: false, batches are split purely by size/time limits regardless of transaction boundaries. Cross-table transaction atomicity is not preserved in this mode.
Failure Isolation
Per-sink independence
All sinks deliver concurrently. One sink’s failure does not block other sinks:
- The coordinator dispatches the same batch to all sinks simultaneously.
- Each sink’s delivery result is collected independently.
- Only sinks that delivered successfully get their checkpoints advanced.
- Failed sinks remain at their prior checkpoint position — they will receive the same batch again on retry or restart.
Required vs. optional sinks
Each sink is marked required: true (default) or required: false:
- Required: must succeed for the pipeline to consider the batch delivered. If a required sink fails, no checkpoint advances for any sink.
- Optional (best-effort): failures are logged but don’t prevent the pipeline from advancing. Optional sinks that fail will catch up on restart via replay from their own checkpoint.
Commit policy
The commit policy determines when checkpoints advance:
| Policy | Behavior |
|---|---|
required (default) | All required: true sinks must acknowledge |
all | Every sink (required and optional) must acknowledge |
quorum | At least N sinks must acknowledge |
The policy is checked before any checkpoint is committed. If the policy isn’t satisfied, no sink advances — this prevents optional sinks from getting ahead of failed required sinks.
Per-sink checkpoints
Each sink maintains its own checkpoint key ({source_id}::sink::{sink_id}). On restart, the source replays from the minimum checkpoint across all sinks. This means:
- A fast sink is never held back by a slow one during normal operation.
- A slow or failed sink only causes replay for itself, not re-delivery to sinks that are already ahead.
- Adding a new sink triggers replay from the source’s earliest available position for that sink only.
Fatal errors
Some errors are unrecoverable and stop the pipeline immediately:
- Kafka ProducerFenced: another producer instance started with the same
transactional.id. The broker fences the old producer permanently. - Permanent auth revocation: credentials are invalid and retrying won’t help.
Fatal errors return SinkError::Fatal and are not retried. The pipeline stops and requires operator intervention.
Error Classification & Retry
Retry behavior by sink
All sinks use exponential backoff with jitter. The classification determines whether an error is retried:
Kafka:
| Error | Classification | Behavior |
|---|---|---|
| Queue full | Retryable | Backoff, retry (100ms base, 10s max, 3 attempts) |
| Message timeout | Retryable | Backoff, retry |
| Broker connection failure | Retryable | Backoff, retry |
| Authentication failure | Non-retryable | Fail immediately |
| Message too large | Non-retryable | Fail immediately |
| Producer fenced | Fatal | Pipeline stops |
| Transaction commit failure (fatal) | Fatal | Pipeline stops |
NATS:
| Error | Classification | Behavior |
|---|---|---|
| Connection failure | Retryable | Backoff, retry (50ms base, 5s max, 3 attempts) |
| Publish timeout | Retryable | Backoff, retry |
| Authentication failure | Non-retryable | Fail immediately |
| No responders | Non-retryable | Fail immediately |
Redis:
| Error | Classification | Behavior |
|---|---|---|
| Connection failure | Retryable | Backoff, retry (50ms base, 5s max, 3 attempts) |
| Command timeout | Retryable | Backoff, retry |
| NOAUTH / WRONGPASS | Non-retryable | Fail immediately |
| Permission denied | Non-retryable | Fail immediately |
After retry exhaustion
If all retry attempts fail for a retryable error, the error is propagated to the coordinator. The coordinator’s behavior depends on the commit policy:
- Required sink: the batch is not committed, and the pipeline will retry the entire batch on the next cycle.
- Optional sink: the failure is logged, and the pipeline continues with other sinks.
Checkpoint Semantics
When checkpoints are saved
The checkpoint commit follows a strict sequence:
1. Accumulate events from source into a batch
2. Run processors (transform, filter)
3. Deliver batch to ALL sinks concurrently
4. Check commit policy (required/all/quorum)
5. Commit per-sink checkpoints (only for successful sinks)
Key invariant: a checkpoint is saved only after the sink has acknowledged delivery AND the commit policy is satisfied. This is the foundation of at-least-once delivery.
On crash recovery
- DeltaForge reads per-sink checkpoints from the checkpoint store.
- The source resumes from the minimum checkpoint across all sinks.
- Sinks that were already ahead of the minimum position receive duplicate events — they must handle these idempotently (or use exactly-once mode).
- Sinks that were behind receive their missing events.
Checkpoint storage
Checkpoints are stored in SQLite (default) with WAL mode and synchronous=NORMAL for durability. The checkpoint store survives SIGKILL — no graceful shutdown required for checkpoint safety.
Backpressure
DeltaForge implements end-to-end backpressure without dropping events:
Source → [event channel] → Accumulator → [batch channel (max_inflight)] → Delivery → Sinks
- Sink slow: delivery task blocks waiting for sink acknowledgement.
- Batch channel full: accumulator blocks waiting to enqueue the next batch (bounded by
max_inflight). - Event channel full: source blocks waiting to enqueue the next event.
- Source slows: the database connection idles until the channel has capacity.
No events are dropped at any stage. Backpressure propagates from the slowest sink all the way back to the source connection.
max_inflight controls the pipeline depth: higher values allow overlapping batch building with delivery (better throughput), lower values reduce memory usage and latency.
Consumer Guidance
Idempotency key
Every event has a deterministic idempotency key in the format:
{tenant}|{db}.{table}|{tx_id}|{event_id}
This key is identical across replays — the same source event always produces the same key.
- Kafka with
exactly_once: true: setisolation.level=read_committedon consumers. No application-level dedup needed. - Kafka without
exactly_once: use the event’sidfield (UUID v7) or the idempotency key to detect duplicates. - NATS JetStream: server-side dedup via
Nats-Msg-Idheader. Configureduplicate_windowon the stream to cover your maximum expected downtime (default: 2 minutes). - Redis Streams: check the
idempotency_keyfield in the stream entry before processing. Use a Redis SET or application-level tracking to remember processed keys.
Dedup window
How long should consumers remember processed event IDs? Match your maximum expected DeltaForge downtime:
| Scenario | Recommended window |
|---|---|
| Normal operation (no crashes) | No dedup needed (at-most-once per run) |
| Planned restarts | 5 minutes |
| Unplanned crashes with auto-restart | 15-30 minutes |
| Disaster recovery | Match your RPO |
Correctness Test Matrix
Every guarantee is backed by a test. This matrix maps guarantees to their verification:
| Guarantee | Test | Type | Status |
|---|---|---|---|
| No data loss (at-least-once) | crash_recovery chaos scenario | Chaos | Exists |
| Kafka end-to-end exactly-once | exactly_once chaos scenario + kafka_sink_exactly_once_* | Chaos + Integration | Exists |
| Producer fencing detection | kafka_sink_exactly_once_producer_fencing | Integration | Exists |
| Per-primary-key ordering | Events keyed by PK → same Kafka partition | By design | Verified via Kafka partition assignment |
| Transaction boundary preservation | respect_source_tx + check_and_split coordinator logic | Unit | Exists |
| Per-sink checkpoint independence | test_per_sink_checkpoint_only_advances_on_success | Unit | Exists |
| Per-sink checkpoint legacy fallback | per_sink_proxy_falls_back_to_legacy_key | Unit | Exists |
| Commit policy gate before checkpoint | test_per_sink_checkpoint_only_advances_on_success | Unit | Exists |
| DLQ routes per-event failures | test_dlq_routes_failed_events_and_pipeline_continues | Unit | Exists |
| DLQ all-fail batch | test_dlq_all_events_fail_no_send | Unit | Exists |
| DLQ overflow (drop_oldest) | dlq::overflow_drop_oldest | Unit | Exists |
| DLQ overflow (reject) | dlq::overflow_reject_drops_new | Unit | Exists |
| DLQ overflow (block) | dlq::overflow_block_waits_for_ack | Unit | Exists |
| DLQ cleanup expired | dlq::cleanup_expired_removes_old_entries | Unit | Exists |
| Partial batch timer flush | test_partial_batch_flushed_by_timer | Unit | Exists |
| Network partition recovery | network_partition chaos scenario | Chaos | Exists |
| Sink outage recovery | sink_outage chaos scenario | Chaos | Exists |
| Schema drift handling | schema_drift chaos scenario | Chaos | Exists |
| MySQL failover detection | failover chaos scenario | Chaos | Exists |
| Postgres failover detection | pg_failover chaos scenario | Chaos | Exists |
| Binlog purge detection | binlog_purge chaos scenario | Chaos | Exists |
| Replication slot drop detection | slot_dropped chaos scenario | Chaos | Exists |
| NATS dedup within window | Verify Nats-Msg-Id prevents duplicates | Integration | Planned |
| Redis idempotency key | Verify consumer-side dedup via key | Integration | Planned |
| Snapshot → CDC handoff | No gaps or duplicates at boundary | Integration | Planned |
Limitations
These are not guaranteed and are documented honestly:
- No cross-table global ordering — events from different tables may be interleaved. This is by design; enforcing global order would require single-threaded delivery and cap throughput. Use
respect_source_tx: trueto preserve ordering within database transactions. - No stateful stream processing — DeltaForge does not support joins, aggregations, or windowing. For stateful processing, consume DeltaForge’s output with Apache Flink, ksqlDB, or Kafka Streams.
- Dead letter queue — when
journal.enabled: true, poison events (serialization/routing failures) are routed to a DLQ instead of blocking the pipeline. Without DLQ enabled, a single bad event will still block. See the DLQ page. - No schema registry integration (yet) — schema sensing detects structural drift and can halt on breaking changes, but there is no Confluent Schema Registry or Avro/Protobuf encoding support. Planned for a future release.
- Snapshot consistency — initial snapshots use lock-free parallel reads. The snapshot is eventually consistent with the CDC stream; there may be a brief overlap period where both snapshot rows and CDC events for the same row are delivered. Consumers should use the event timestamp or idempotency key to resolve.
Dead Letter Queue
The Dead Letter Queue (DLQ) routes poison events — events that fail serialization, exceed size limits, or have invalid routing — to a durable queue instead of blocking the pipeline.
How it works
- The coordinator dispatches a batch to each sink
- Each sink pre-serializes events individually. If an event fails serialization or routing, it is flagged as a DLQ failure instead of failing the entire batch
- The remaining (healthy) events are sent to the sink normally
- Failed events are written to the DLQ queue with error context
- The pipeline continues — one bad event does not block thousands of good ones
Only per-event attributable failures go to the DLQ:
| Error | DLQ eligible | Why |
|---|---|---|
| Serialization failure | Yes | Event can’t be encoded — specific to this event’s data |
| Routing failure | Yes | Template resolves to empty/invalid for this event’s fields |
| Message too large | Yes | This specific event exceeds the sink’s max message size |
| Connection failure | No | Entire sink is down — not caused by one event |
| Auth failure | No | Credentials invalid — affects all events |
| Timeout / backpressure | No | Transient — will resolve with retry |
| Producer fenced | No | Fatal — pipeline stops |
Configuration
DLQ is opt-in. It is configured under journal in the pipeline spec — the journal is DeltaForge’s internal event storage system that backs the DLQ (and will support replay in a future release). Enabling the journal with a dlq stream activates per-event failure routing:
spec:
journal:
enabled: true
max_event_bytes: 262144 # 256KB — truncate larger payloads
dlq:
max_entries: 10000 # bounded queue size
max_age_secs: 604800 # 7 days — auto-purge older entries
overflow_policy: drop_oldest
Overflow policies
When the DLQ reaches max_entries:
| Policy | Behavior |
|---|---|
drop_oldest (default) | Evict the oldest DLQ entry to make room for the new one. Most recent failures are usually most valuable for investigation. |
reject | Reject the new DLQ entry. The failed event is lost — it is not stored in the DLQ, not retried, and not delivered to the sink. The pipeline continues processing other events normally. An error is logged and deltaforge_dlq_rejected_total is incremented. |
block | Block the pipeline until space is available. No events (good or bad) are processed until the operator acks DLQ entries via the REST API. Visible as degraded on the health endpoint. |
Payload truncation
Events that caused MessageTooLarge may also be too large for the DLQ. If the event payload exceeds max_event_bytes (default 256KB), the before and after fields are truncated and payload_truncated: true is set. All event metadata (source, table, op, id, timestamp) is always preserved.
REST API
| Endpoint | Method | Description |
|---|---|---|
GET /pipelines/{name}/journal/dlq | GET | Peek entries (oldest first). Params: ?limit=50&sink_id=...&error_kind=... |
GET /pipelines/{name}/journal/dlq/count | GET | Count of unacked entries |
POST /pipelines/{name}/journal/dlq/ack | POST | Dismiss (remove) entries up to seq. Body: {"up_to_seq": 42}. Dismissed entries are permanently deleted — they are not retried or reprocessed. |
DELETE /pipelines/{name}/journal/dlq | DELETE | Purge all entries |
Filters (sink_id, error_kind) affect listing only. Ack is always cumulative from the queue head — it removes all entries up to up_to_seq, regardless of filters used when viewing.
Example: inspect DLQ entries
# Peek the first 10 DLQ entries
curl -s http://localhost:8080/pipelines/my-pipeline/journal/dlq?limit=10 | jq .
# Filter by sink
curl -s "http://localhost:8080/pipelines/my-pipeline/journal/dlq?sink_id=kafka-primary&limit=5"
# Check DLQ size
curl -s http://localhost:8080/pipelines/my-pipeline/journal/dlq/count
# {"count": 42}
# Ack (remove) entries up to sequence 100
curl -s -X POST http://localhost:8080/pipelines/my-pipeline/journal/dlq/ack \
-H "Content-Type: application/json" \
-d '{"up_to_seq": 100}'
# {"acked": 12}
# Purge all entries
curl -s -X DELETE http://localhost:8080/pipelines/my-pipeline/journal/dlq
# {"purged": 42}
Example DLQ entry
{
"seq": 42,
"timestamp": 1743350400,
"pipeline": "orders-pipeline",
"stream": "dlq",
"event_id": "01961234-5678-7abc-def0-123456789abc",
"source_cursor": {"file": "mysql-bin.000005", "pos": 12345},
"payload_truncated": false,
"event": {
"id": "01961234-5678-7abc-def0-123456789abc",
"source": {"db": "orders", "table": "events"},
"op": "c",
"after": {"id": 99, "metadata": "<invalid bytes>"}
},
"meta": {
"sink_id": "kafka-primary",
"error_kind": "serialization error",
"error_message": "failed to serialize field 'metadata': invalid UTF-8 sequence",
"attempts": 1
}
}
Metrics
| Metric | Type | Labels | Purpose |
|---|---|---|---|
deltaforge_dlq_events_total | counter | pipeline, sink, error_kind | Events routed to DLQ |
deltaforge_dlq_entries | gauge | pipeline | Current unacked entries |
deltaforge_dlq_evicted_total | counter | pipeline | Evicted by drop_oldest |
deltaforge_dlq_rejected_total | counter | pipeline | Rejected by reject policy |
deltaforge_dlq_write_failures_total | counter | pipeline | DLQ storage write failures |
deltaforge_dlq_saturation_ratio | gauge | pipeline | Current / max_entries (0.0-1.0) |
Health signals:
- Warning log at 80% saturation
- Error log at 95% saturation
Storage
The DLQ is built on DeltaForge’s existing StorageBackend queue primitives (queue_push, queue_peek, queue_ack). It automatically uses whatever storage backend your pipeline is configured with (SQLite, PostgreSQL, or memory). No additional infrastructure is needed.
Background Cleanup
When max_age_secs is configured (default: 7 days), a background task runs every 60 seconds and removes entries older than the threshold. Age is calculated from insertion time, not source event time. A best-effort startup cleanup pass (bounded to 5 seconds) runs when the pipeline starts.
Cleanup and overflow eviction are independent — drop_oldest may remove entries before age-based cleanup fires.
Operator Workflow
DLQ events are not retried automatically. The intended workflow is:
- Alert: monitor
deltaforge_dlq_events_totalordeltaforge_dlq_entriesin Grafana - Inspect:
GET /pipelines/{name}/journal/dlq?sink_id=...to see what failed and why - Fix: resolve the root cause (schema mismatch, oversized column, broken routing template)
- Dismiss:
POST /pipelines/{name}/journal/dlq/ackto remove reviewed entries — dismissed entries are permanently deleted
Future versions may add a retry endpoint (POST .../dlq/retry) that re-injects events into the pipeline, but this raises ordering and idempotency concerns and is deferred.
Limitations
- DLQ is per-pipeline, not per-sink. Use the
sink_idfilter to view entries for a specific sink. - The
blockoverflow policy blocks the entire pipeline, not just the failing sink.
Architecture
This document describes DeltaForge’s internal architecture, design decisions, and how the major components interact.
Design Principles
Source-Owned Semantics
DeltaForge avoids imposing a universal data model on all sources. Instead, each database source defines and owns its schema semantics:
- MySQL captures MySQL-specific types, collations, and engine information
- PostgreSQL captures PostgreSQL-specific types, OIDs, and replica identity
- Future sources (MongoDB, ClickHouse, TiDB) will capture their native semantics
This approach means downstream consumers receive schemas that accurately reflect the source database rather than a lowest-common-denominator normalization.
Delivery Guarantees First
The checkpoint system is designed around a single invariant:
Checkpoints are only saved after events have been successfully delivered.
This ordering guarantees at-least-once delivery. A crash between checkpoint and delivery would lose events; DeltaForge prevents this by always checkpointing after sink acknowledgment.
Configuration Over Code
Pipelines are defined declaratively in YAML. This enables:
- Version-controlled pipeline definitions
- Environment-specific configuration via variable expansion
- Rapid iteration without recompilation
Component Overview
┌─────────────────────────────────────────────────────────────────┐
│ DeltaForge Runtime │
├─────────────┬─────────────┬─────────────┬─────────────┬─────────┤
│ Sources │ Schema │ Coordinator │ Sinks │ Control │
│ │ Registry │ + Batch │ │ Plane │
├─────────────┼─────────────┼─────────────┼─────────────┼─────────┤
│ MySQL │ Durable │ Batching │ Kafka │ REST API│
│ PostgreSQL │ Schema │ Commit │ Redis │ Metrics │
│ │ Registry │ Policy │ NATS │ Health │
└─────────────┴──────┬──────┴─────────────┴─────────────┴─────────┘
│
┌──────────┴──────────┐
│ Storage Backend │
│ (SQLite / PG / │
│ Memory) │
├─────────────────────┤
│ KV · Log · Slot │
│ Queue │
└─────────────────────┘
Data Flow
Event Lifecycle
1. Source reads from database log (binlog/WAL)
│
▼
2. Schema loader maps table_id to schema
│
▼
3. Event constructed with before/after images
│
▼
4. Event sent to coordinator via channel
│
▼
5. Coordinator batches events
│
▼
6. Processors transform batch (JavaScript)
│
▼
7. Sinks deliver batch concurrently
│
▼
8. Commit policy evaluated
│
▼
9. Checkpoint saved (if policy satisfied)
Event Structure
Every CDC event shares a common structure:
#![allow(unused)]
fn main() {
pub struct Event {
pub source_id: String, // Source identifier
pub database: String, // Database name
pub table: String, // Table name
pub op: Op, // Insert, Update, Delete, Ddl
pub tx_id: Option<u64>, // Source transaction ID
pub before: Option<Value>, // Previous row state
pub after: Option<Value>, // New row state
pub schema_version: Option<String>, // Schema fingerprint
pub schema_sequence: Option<u64>, // For replay lookups
pub ddl: Option<Value>, // DDL payload if op == Ddl
pub timestamp: DateTime<Utc>, // Event timestamp
pub checkpoint: Option<CheckpointMeta>, // Position info
pub size_bytes: usize, // For batching
}
}
Schema Registry
Role
The schema registry serves three purposes:
- Map table IDs to schemas: Binlog events reference tables by ID; the registry resolves these to full schema metadata
- Detect schema changes: Fingerprint comparison identifies when DDL has modified a table
- Enable replay: Sequence numbers correlate events with the schema active when they were produced
Schema Sensing
At startup, the schema loader auto-discovers tables via pattern expansion and loads their schemas from the live database catalog before any CDC events arrive.
Pattern expansion supports wildcards:
| Pattern | Matches |
|---|---|
db.table | Exact table |
db.* | All tables in database |
db.prefix% | Tables matching prefix |
%.table | Table in any database |
DDL detection works through cache invalidation. When the binlog delivers a QueryEvent (DDL), the affected database’s cache is cleared. On the next row event for that table, the schema is re-fetched from INFORMATION_SCHEMA, fingerprinted, and registered as a new version. No separate DDL history log is maintained — the live catalog is always the source of truth.
Failover reconciliation (verifying schemas against a new primary after server identity change) is planned but not yet implemented.
Schema versions are persisted via DurableSchemaRegistry, which uses the StorageBackend Log primitive. On startup the log is replayed to populate an in-memory cache, so hot-path reads have the same performance as the previous in-memory implementation. Cold-start reconstruction is always possible from the log alone.
Schema Registration Flow
1. Schema loader fetches from INFORMATION_SCHEMA
│
▼
2. Compute fingerprint (SHA-256 of structure)
│
▼
3. Check registry for existing schema with same fingerprint
│
├── Found: Return existing version (idempotent)
│
└── Not found: Allocate new version number
│
▼
4. Store with: version, fingerprint, JSON, timestamp, sequence, checkpoint
Sequence Numbers
The registry maintains a global monotonic counter. Each schema version receives a sequence number at registration. Events carry this sequence, enabling accurate schema lookup during replay:
Timeline:
─────────────────────────────────────────────────────────────►
│ │ │
Schema v1 Schema v2 Schema v3
(seq=1) (seq=15) (seq=42)
│ │ │
└──events 1-14─┘──events 15-41─────┘──events 42+──►
Replay at seq=20: Use schema v2 (registered at seq=15, before seq=42)
Checkpoint Store
Timing Guarantee
The checkpoint is saved only after sinks acknowledge delivery:
┌────────┐ events ┌────────┐ ack ┌────────────┐
│ Source │ ─────────▶ │ Sink │ ───────▶ │ Checkpoint │
└────────┘ └────────┘ │ Store │
└────────────┘
If the process crashes after sending to sink but before checkpoint, events will be replayed. This is the “at-least-once” guarantee — duplicates are possible, but loss is not.
Storage Backends
Checkpoints are stored via BackendCheckpointStore, a thin adapter over the StorageBackend KV primitive. See Storage for backend configuration and the full namespace map.
| Backend | Persistence | Use Case |
|---|---|---|
SqliteStorageBackend | SQLite file | Single-instance production |
MemoryStorageBackend | None | Testing, ephemeral deployments |
PostgresStorageBackend | External DB | HA, multi-instance |
Checkpoint-Schema Correlation
When registering schemas, the current checkpoint can be attached:
#![allow(unused)]
fn main() {
registry.register_with_checkpoint(
tenant, db, table,
&fingerprint,
&schema_json,
Some(&checkpoint_bytes), // Current binlog position
).await?;
}
This creates a link between schema versions and source positions, enabling accurate schema lookup during replay.
Coordinator
The coordinator orchestrates event flow between source and sinks:
Batching
Events are accumulated until a threshold triggers flush:
max_events: Event count limitmax_bytes: Total serialized size limitmax_ms: Time since batch startedrespect_source_tx: Never split source transactions
Commit Policy
When multiple sinks are configured, the commit policy determines when the checkpoint advances:
#![allow(unused)]
fn main() {
match policy {
All => required_acks == total_sinks,
Required => required_acks == sinks.filter(|s| s.required).count(),
Quorum(n) => required_acks >= n,
}
}
Processor Pipeline
Processors run in declared order, transforming batches:
events ──▶ Processor 1 ──▶ Processor 2 ──▶ ... ──▶ transformed events
Each processor can filter, transform, or enrich events. The JavaScript processor uses deno_core for sandboxed execution.
Hot Paths
Critical performance paths have been optimized:
- Event construction - Minimal allocations, reuse buffers
- Checkpoint serialization - Opaque bytes avoid repeated JSON encoding
- Sink delivery - Batch operations reduce round trips
- Schema lookup - In-memory cache with stable fingerprints
Benchmarking
Performance is tracked via:
- Micro-benchmarks for specific operations
- End-to-end benchmarks using the Coordinator component
- Regression detection in CI
Future Architecture
Planned enhancements:
- Initial snapshot/backfill using the Slot primitive for cursor tracking
- Event store: time-based replay and schema evolution using the Log primitive
- Distributed coordination: leader election via the Slot primitive with TTL-based leases
- Additional sources: MongoDB, SQL Server, TiDB
- PostgreSQL storage validation: chaos/recovery testing to bring it to production parity with SQLite
Checkpoints
Checkpoints record pipeline progress so ingestion can resume from the last successfully delivered position. DeltaForge guarantees at-least-once delivery by saving checkpoints only after events have been acknowledged by sinks.
Core Guarantee: At-Least-Once Delivery
Checkpoints are only saved after events have been successfully delivered to sinks.
If a checkpoint were saved before delivery, a crash between those two points would silently lose events. DeltaForge prevents this by always checkpointing after sink acknowledgment.
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Source │────▶│ Processor │────▶│ Sink │────▶│ Checkpoint │
│ (read) │ │ (transform) │ │ (deliver) │ │ (save) │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
On crash: events since the last checkpoint are replayed. Consumers should be idempotent or use deduplication.
Storage
Checkpoints are stored via the unified StorageBackend - see Storage for backend configuration. All pipelines share the same storage backend; each pipeline’s checkpoint is keyed by its source ID.
| Backend | Persistence | Use Case |
|---|---|---|
SqliteStorageBackend | SQLite file on disk | Single-instance production |
MemoryStorageBackend | None (lost on restart) | Testing |
PostgresStorageBackend | External database | HA, multi-instance |
Checkpoint Contents
MySQL
Tracks binlog position:
file: binlog.000042
pos: 12345
gtid_set: (optional, if GTID replication is enabled)
PostgreSQL
Tracks replication stream LSN:
lsn: 0/1A2B3C4D
tx_id: (optional)
Checkpoint-Schema Correlation
When a schema change is detected, the current checkpoint position is recorded alongside the new schema version. This ensures that during replay, each event is interpreted with the schema that was active when it was produced - even if the table has since been altered.
Commit Policy
When multiple sinks are configured, the commit policy controls when the checkpoint advances:
| Policy | Behaviour |
|---|---|
all | All sinks must acknowledge |
required | Only sinks marked required: true must acknowledge |
quorum(n) | At least n sinks must acknowledge |
Set required: true only on sinks where delivery is mandatory for correctness. Optional sinks can fail without blocking the checkpoint.
Operations
Inspecting Checkpoints
sqlite3 ./data/deltaforge.db \
"SELECT key, length(value) FROM kv WHERE namespace = 'checkpoints';"
Resetting a Pipeline
To force a pipeline to re-read from the beginning, delete its checkpoint:
# Via API
curl -X DELETE http://localhost:8080/pipelines/{name}/checkpoint
# Directly in SQLite
sqlite3 ./data/deltaforge.db \
"DELETE FROM kv WHERE namespace = 'checkpoints' AND key = '{source-id}';"
Best Practices
- Back up
deltaforge.dbregularly - it contains both checkpoints and schema history - Monitor checkpoint lag to detect stuck pipelines
- Use smaller batch sizes for more frequent checkpoints at the cost of throughput
- Test recovery by killing the process and verifying no events are lost or duplicated
Storage
DeltaForge uses a unified storage layer that backs all runtime state: checkpoints, schema registry, FSM state, leases, and quarantine queues. A single StorageBackend instance is shared across subsystems, so there is one operational concern instead of many.
Backends
| Backend | Persistence | Use Case |
|---|---|---|
MemoryStorageBackend | None (lost on restart) | Testing, ephemeral deployments |
SqliteStorageBackend | SQLite file on disk | Single-instance production |
PostgresStorageBackend | External database | HA, multi-instance deployments |
Configuration
The backend is selected via CLI flags:
# SQLite (default for production)
deltaforge --config pipeline.yaml --storage-backend sqlite --storage-path ./data/deltaforge.db
# In-memory (testing only — all state lost on restart)
deltaforge --config pipeline.yaml --storage-backend memory
Or via the config file:
storage:
backend: sqlite
path: ./data/deltaforge.db
# or:
storage:
backend: postgres
dsn: "host=localhost dbname=deltaforge_storage user=df password=secret"
The SQLite backend creates the database file and parent directories automatically on first start. The PostgreSQL backend runs schema migrations on first connect - no manual table creation is needed.
PostgreSQL backend is implemented and available under the
postgresfeature flag, but has not yet received the same chaos/recovery validation as the SQLite backend. Treat it as beta for production use.
Primitives
The StorageBackend trait exposes four primitives. All operations are namespaced - different subsystems share the same backend without key collisions.
KV (Key-Value with optional TTL)
General-purpose persistent key-value store. Used for checkpoints, FSM state, and leases.
#![allow(unused)]
fn main() {
backend.kv_put("checkpoints", "mysql-pipe1", &bytes).await?;
backend.kv_put_with_ttl("leases", "pipe1", b"alive", ttl_secs).await?;
backend.kv_get("checkpoints", "mysql-pipe1").await?;
backend.kv_delete("checkpoints", "mysql-pipe1").await?;
backend.kv_list("checkpoints", Some("mysql-")).await?; // prefix filter
}
Log (Append-only, globally sequenced)
Immutable append-only log with a global monotonic sequence counter. Used for the schema registry. The sequence is global across all keys in the namespace - interleaved appends to different keys produce strictly increasing sequence numbers.
#![allow(unused)]
fn main() {
let seq = backend.log_append("schemas", "acme/shop/orders", &entry).await?;
let all = backend.log_list("schemas", "acme/shop/orders").await?;
let tail = backend.log_since("schemas", "acme/shop/orders", since_seq).await?;
}
Slot (Versioned, compare-and-swap)
Single versioned value with CAS semantics. Used for snapshot cursors and leader election. Concurrent CAS operations with the same expected version are serialized - exactly one wins.
#![allow(unused)]
fn main() {
let ver = backend.slot_upsert("snapshots", "pipe/orders", &state).await?;
let won = backend.slot_cas("snapshots", "pipe/orders", ver, &new_state).await?;
let cur = backend.slot_get("snapshots", "pipe/orders").await?; // (version, bytes)
}
Queue (Ordered, ack-based)
Ordered queue with explicit acknowledgement. Used for quarantine and DLQ. Items are not removed until acked; oldest items can be dropped when the queue is full.
#![allow(unused)]
fn main() {
let id = backend.queue_push("quarantine", "pipe/orders", &event_bytes).await?;
let items = backend.queue_peek("quarantine", "pipe/orders", 10).await?;
backend.queue_ack("quarantine", "pipe/orders", id).await?;
backend.queue_drop_oldest("quarantine", "pipe/orders", n).await?;
let len = backend.queue_len("quarantine", "pipe/orders").await?;
}
Namespace Map
Each subsystem owns a fixed namespace. Keys within a namespace never collide across subsystems.
| Namespace | Primitive | Used by | Key format |
|---|---|---|---|
checkpoints | KV | BackendCheckpointStore | {source-id} |
schemas | Log | DurableSchemaRegistry | {tenant}/{db}/{table} |
schemas | KV | DurableSchemaRegistry | {tenant}/{db}/{table} (index) |
snapshots | Slot | Snapshot/backfill (planned) | {pipeline}/{table} |
fsm | KV | FSM state (planned) | {pipeline} |
leases | KV + TTL | Leader election (planned) | {pipeline} |
quarantine | Queue | Quarantine (planned) | {pipeline}/{table} |
dlq | Queue | Dead-letter queue (planned) | {pipeline} |
Adapters
Existing pipeline code uses higher-level interfaces - it never touches the StorageBackend directly.
BackendCheckpointStore
Implements the CheckpointStore trait on top of KV. Drop-in replacement for the old FileCheckpointStore and SqliteCheckpointStore.
#![allow(unused)]
fn main() {
let store = BackendCheckpointStore::new(Arc::clone(&backend));
// CheckpointStore trait methods work unchanged
store.put_raw("mysql-pipe1", &bytes).await?;
store.get_raw("mysql-pipe1").await?;
}
DurableSchemaRegistry
Implements the schema registry on top of the Log primitive. On startup it replays the log to populate an in-memory cache, so hot-path reads are identical in performance to the old InMemoryRegistry. Writes go through the log, enabling cold-start reconstruction.
#![allow(unused)]
fn main() {
// Production: async construction with log replay
let registry = DurableSchemaRegistry::new(Arc::clone(&backend)).await?;
// Tests: sync construction, no replay (MemoryStorageBackend, empty log)
let registry = DurableSchemaRegistry::for_testing();
}
PipelineManager Wiring
PipelineManager::with_backend() wires both subsystems from a single backend:
#![allow(unused)]
fn main() {
let backend: ArcStorageBackend = SqliteStorageBackend::open("./data/deltaforge.db")?;
let manager = PipelineManager::with_backend(backend).await?;
}
Internally this creates a BackendCheckpointStore and a DurableSchemaRegistry from the same backend instance. All pipelines share the same storage file.
Schema Registry
DeltaForge’s schema registry tracks table schemas across time, enabling accurate event interpretation during replay and providing change detection for DDL operations.
Design Philosophy: Source-Owned Schemas
DeltaForge takes a fundamentally different approach to schema handling than many CDC tools. Rather than normalizing all database schemas into a universal type system, each source defines and owns its schema semantics.
This means:
- MySQL schemas capture MySQL semantics - column types like
bigint(20) unsigned,varchar(255), andjsonare preserved exactly as MySQL defines them - PostgreSQL schemas capture PostgreSQL semantics - arrays, custom types, and pg-specific attributes remain intact
- No lossy normalization - you don’t lose precision or database-specific information by forcing everything into a common format
This design avoids the maintenance burden of keeping a universal type system synchronized across all databases, and it ensures that downstream consumers receive schemas that accurately reflect the source database’s capabilities and constraints.
The SourceSchema Trait
Every CDC source implements the SourceSchema trait, which provides a common interface for fingerprinting and column access while allowing source-specific schema representations:
#![allow(unused)]
fn main() {
pub trait SourceSchema: Serialize + DeserializeOwned + Send + Sync {
/// Source type identifier (e.g., "mysql", "postgres", "mongodb").
fn source_kind(&self) -> &'static str;
/// Content-addressable fingerprint for change detection.
/// Two schemas with the same fingerprint are identical.
fn fingerprint(&self) -> String;
/// Column/field names in ordinal order.
fn column_names(&self) -> Vec<&str>;
/// Primary key column names.
fn primary_key(&self) -> Vec<&str>;
/// Human-readable description.
fn describe(&self) -> String;
}
}
Fingerprinting
Schema fingerprints use SHA-256 hashing over JSON-serialized content to provide:
- Stability - the same schema always produces the same fingerprint
- Change detection - any structural change produces a different fingerprint
- Content-addressability - fingerprints can be used as cache keys or deduplication identifiers
#![allow(unused)]
fn main() {
pub fn compute_fingerprint<T: Serialize>(value: &T) -> String {
let json = serde_json::to_vec(value).unwrap_or_default();
let hash = Sha256::digest(&json);
format!("sha256:{}", hex::encode(hash))
}
}
The fingerprint only includes structurally significant fields. For MySQL, this means columns and primary key are included, but engine and charset are excluded since they don’t affect how CDC events should be interpreted.
MySQL Schema Implementation
The MySqlTableSchema struct captures comprehensive MySQL table metadata:
#![allow(unused)]
fn main() {
pub struct MySqlTableSchema {
/// Columns in ordinal order
pub columns: Vec<MySqlColumn>,
/// Primary key column names
pub primary_key: Vec<String>,
/// Storage engine (InnoDB, MyISAM, etc.)
pub engine: Option<String>,
/// Default charset
pub charset: Option<String>,
/// Default collation
pub collation: Option<String>,
}
pub struct MySqlColumn {
pub name: String,
pub column_type: String, // e.g., "bigint(20) unsigned"
pub data_type: String, // e.g., "bigint"
pub nullable: bool,
pub ordinal_position: u32,
pub default_value: Option<String>,
pub extra: Option<String>, // e.g., "auto_increment"
pub comment: Option<String>,
pub char_max_length: Option<i64>,
pub numeric_precision: Option<i64>,
pub numeric_scale: Option<i64>,
}
}
Schema information is fetched from INFORMATION_SCHEMA at startup and cached for the pipeline’s lifetime.
Schema Registry Architecture
The schema registry serves three core functions:
- Version tracking - maintains a history of schema versions per table
- Change detection - compares fingerprints to detect DDL changes
- Replay correlation - associates schemas with checkpoint positions for accurate replay
Schema Versions
Each registered schema version includes:
| Field | Description |
|---|---|
version | Per-table version number (starts at 1) |
hash | Content fingerprint for deduplication |
schema_json | Full schema as JSON |
registered_at | Registration timestamp |
sequence | Global monotonic sequence number |
checkpoint | Source checkpoint at registration time |
Sequence Numbers for Replay
The registry maintains a global monotonic sequence counter. When a schema is registered, it receives the next sequence number. Events carry this sequence number, enabling the replay engine to look up the correct schema version:
#![allow(unused)]
fn main() {
// During replay: find schema active at event's sequence
let schema = registry.get_at_sequence(tenant, db, table, event.schema_sequence);
}
This ensures events are always interpreted with the schema that was active when they were produced, even if the table structure has since changed.
Checkpoint Correlation
Schemas can be registered with an associated checkpoint, creating a correlation between schema versions and source positions:
#![allow(unused)]
fn main() {
registry.register_with_checkpoint(
tenant, db, table,
&fingerprint,
&schema_json,
Some(checkpoint_bytes), // Optional: binlog position when schema was observed
).await?;
}
This correlation supports scenarios like:
- Replaying events from a specific checkpoint with the correct schema
- Determining which schema was active at a particular binlog position
- Rolling back schema state along with checkpoint rollback
Schema Loader
The MySqlSchemaLoader handles schema discovery and caching:
Pattern Expansion
Tables are specified using patterns that support wildcards:
| Pattern | Description |
|---|---|
db.table | Exact match |
db.* | All tables in database |
db.prefix% | Tables starting with prefix |
%.table | Table in any database |
Preloading
At startup, the loader expands patterns and preloads all matching schemas:
#![allow(unused)]
fn main() {
let schema_loader = MySqlSchemaLoader::new(dsn, registry, tenant);
let tracked_tables = schema_loader.preload(&["shop.orders", "shop.order_%"]).await?;
}
This ensures schemas are available before the first CDC event arrives.
Caching
Loaded schemas are cached to avoid repeated INFORMATION_SCHEMA queries:
#![allow(unused)]
fn main() {
// Fast path: return cached schema
if let Some(cached) = cache.get(&(db, table)) {
return Ok(cached.clone());
}
// Slow path: fetch from database, register, cache
let schema = fetch_schema(db, table).await?;
let version = registry.register(...).await?;
cache.insert((db, table), loaded_schema);
}
DDL Handling
When the binlog contains DDL events, the schema loader responds:
| DDL Type | Action |
|---|---|
CREATE TABLE | Schema loaded on first row event |
ALTER TABLE | Cache invalidated, reloaded on next row |
DROP TABLE | Cache entry removed |
TRUNCATE | No schema change |
RENAME TABLE | Old removed, new loaded on first row |
DDL detection uses the QueryEvent type in the binlog. On DDL, the entire database’s schema cache is invalidated since MySQL doesn’t always specify the exact table in DDL events.
API Endpoints
Reload Schemas
Force reload schemas from the database:
curl -X POST http://localhost:8080/pipelines/{name}/schemas/reload
This clears the cache and re-fetches schemas for all tracked tables.
List Cached Schemas
View currently cached schemas:
curl http://localhost:8080/pipelines/{name}/schemas
Limitations
- In-memory registry - Schema versions are lost on restart. Persistent backends (SQLite, then PostgreSQL for HA) are planned.
- No cross-pipeline sharing - Each pipeline maintains its own registry instance
- Pattern expansion at startup - New tables matching patterns require pipeline restart or reload
Best Practices
- Use explicit table patterns in production to avoid accidentally capturing unwanted tables
- Monitor schema reload times - slow reloads may indicate overly broad patterns
- Trigger schema reload after DDL if your deployment process modifies schemas
- Include schema version in downstream events for consumers that need schema evolution awareness
Troubleshooting
Unknown table_id Errors
WARN write_rows for unknown table_id, table_id=42
The binlog contains row events for a table not in the table_map. This happens when:
- A table was created after the CDC stream started
- Table patterns don’t match the table
Solution: Trigger a schema reload via the REST API.
Schema Fetch Returned 0 Columns
WARN schema fetch returned 0 columns, db=shop, table=orders
Usually indicates:
- Table doesn’t exist
- MySQL user lacks
SELECTprivilege onINFORMATION_SCHEMA - Table was dropped between detection and schema load
Slow Schema Loading
WARN slow schema fetch, db=shop, table=orders, ms=350
Consider:
- Narrowing table patterns to reduce the number of tables
- Using exact table names instead of wildcards
- Verifying network latency to the MySQL server
Confluent Schema Registry Integration
DeltaForge supports the Confluent Schema Registry as the external schema store for Avro encoding. This is separate from DeltaForge’s internal schema registry (described above) which tracks source table schemas.
How it works
When a sink is configured with encoding: { type: avro, schema_registry_url: "..." }:
- Schema derivation: DeltaForge derives an Avro schema from the first event’s JSON structure
- Registration: The schema is registered with the Confluent Schema Registry (idempotent — same schema returns the same ID)
- Caching: The schema ID is cached per subject, so subsequent events skip the HTTP call
- Wire format: Events are encoded as
[0x00][schema_id:4][avro_binary](Confluent wire format)
Relationship to internal schema registry
| Aspect | DeltaForge Internal Registry | Confluent Schema Registry |
|---|---|---|
| Purpose | Track source table schema evolution | Store Avro schemas for consumers |
| Schemas | Source-owned (MySQL/Postgres types) | Avro schemas (derived from event structure) |
| Storage | Embedded (SQLite/Postgres) | External HTTP service |
| Used by | Schema sensing, replay correlation | Kafka Connect, ksqlDB, Flink |
The two registries serve complementary purposes — the internal registry tracks what the source looks like, while the Confluent registry stores what consumers need to decode the wire format.
Configuration
See the Envelopes and Encodings page for full Avro encoding configuration, including subject naming strategies and authentication.
Schema Sensing
Schema sensing automatically infers and tracks schema structure from JSON event payloads. This complements the schema registry by discovering schema from data rather than database metadata.
When to Use Schema Sensing
Schema sensing is useful when:
- Source doesn’t provide schema: Some sources emit JSON without metadata
- JSON columns: Database JSON/JSONB columns have dynamic structure
- Schema evolution tracking: Detect when payload structure changes over time
- Downstream integration: Generate JSON Schema for consumers
- Dynamic map keys: Session IDs, trace IDs, or other high-cardinality keys in JSON
How It Works
┌──────────────┐ ┌─────────────────┐ ┌──────────────────┐
│ Event │────▶│ Schema Sensor │────▶│ Inferred Schema │
│ Payload │ │ (sampling) │ │ + Fingerprint │
└──────────────┘ └─────────────────┘ └──────────────────┘
│
▼
┌─────────────────┐
│ Structure Cache │
│ + HC Classifier │
└─────────────────┘
- Observation: Events flow through the sensor during batch processing
- Sampling: Not every event is fully analyzed (configurable rate)
- Deep inspection: Nested JSON structures are recursively analyzed
- High-cardinality detection: Dynamic map keys are classified and normalized
- Fingerprinting: Schema changes are detected via SHA-256 fingerprints
- Caching: Repeated structures skip full analysis for performance
High-Cardinality Key Handling
JSON payloads often contain dynamic keys like session IDs, trace IDs, or user-generated identifiers:
{
"id": 1,
"sessions": {
"sess_abc123": {"user_id": 42, "started_at": 1700000000},
"sess_xyz789": {"user_id": 43, "started_at": 1700000001}
}
}
Without special handling, each unique key (sess_abc123, sess_xyz789) triggers a “schema evolution” event, causing:
- 0% cache hit rate
- Constant false evolution alerts
- Unbounded schema growth
How It Works
DeltaForge uses probabilistic data structures (HyperLogLog, SpaceSaving) to classify fields:
| Classification | Description | Example |
|---|---|---|
| Stable fields | Appear in most events | id, type, timestamp |
| Dynamic fields | Unique per event, high cardinality | sess_*, trace_*, uuid_* |
When dynamic fields are detected, the schema sensor:
- Normalizes keys: Replaces
sess_abc123with<dynamic>placeholder - Uses adaptive hashing: Structure cache ignores dynamic key names
- Produces stable fingerprints: Same schema despite different keys
Results
| Scenario | Without HC | With HC |
|---|---|---|
| Nested dynamic keys | 100% evolution rate | <1% evolution rate |
| Top-level dynamic keys | 0% cache hits | >99% cache hits |
| Stable structs | Baseline | ~20% overhead during warmup, then ~0% |
Configuration
Example
|
Options
|
Inferred Types
Schema sensing infers these JSON types:
| Type | Description |
|---|---|
null | JSON null value |
boolean | true/false |
integer | Whole numbers |
number | Floating point numbers |
string | Text values |
array | JSON arrays (element types tracked) |
object | Nested objects (fields recursively analyzed) |
For fields with varying types across events, all observed types are recorded.
Schema Evolution
When schema structure changes, the sensor:
- Detects change: Fingerprint differs from previous version
- Increments sequence: Monotonic version number increases
- Logs evolution: Emits structured log with old/new fingerprints
- Updates cache: New structure becomes current
Evolution events are available via the REST API and can trigger alerts.
Stabilization
After observing enough events, a schema “stabilizes”:
- Warmup phase completes
- Structure stops changing
- Sampling rate takes effect
- Cache hit rate increases
Stabilized schemas have stabilized: true in API responses.
API Access
List Inferred Schemas
curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas
Get Schema Details
curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders
Export as JSON Schema
curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders/json-schema
Cache Statistics
curl http://localhost:8080/pipelines/my-pipeline/sensing/stats
Dynamic Map Classifications
curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders/classifications
Response:
{
"table": "orders",
"paths": {
"": {"stable_fields": ["id", "type", "timestamp"], "has_dynamic_fields": false},
"sessions": {"stable_fields": [], "has_dynamic_fields": true, "unique_keys": 1523},
"metadata": {"stable_fields": ["version"], "has_dynamic_fields": true, "unique_keys": 42}
}
}
Drift Detection
Schema sensing integrates with drift detection to compare:
- Expected schema: From database metadata (schema registry)
- Observed schema: From event payloads (schema sensing)
When mismatches occur, drift events are recorded:
| Drift Type | Description |
|---|---|
unexpected_null | Non-nullable column has null values |
type_mismatch | Observed type differs from declared type |
undeclared_column | Field in data not in schema |
missing_column | Schema field never seen in data |
json_structure_change | JSON column structure changed |
Access drift data via:
curl http://localhost:8080/pipelines/my-pipeline/drift
Performance Considerations
Sampling Tradeoffs
| Setting | Effect |
|---|---|
Higher warmup_events | Better initial accuracy, slower stabilization |
Higher sample_rate | Lower CPU usage, slower evolution detection |
Larger structure_cache_size | More memory, better hit rate |
Recommended Settings
High-throughput pipelines (>10k events/sec):
sampling:
warmup_events: 100
sample_rate: 10
structure_cache: true
structure_cache_size: 100
high_cardinality:
enabled: true
min_events: 200
Schema evolution monitoring:
sampling:
warmup_events: 25
sample_rate: 2
structure_cache: true
high_cardinality:
enabled: true
min_events: 50
Payloads with dynamic keys (session stores, feature flags):
sampling:
structure_cache: true
structure_cache_size: 50
high_cardinality:
enabled: true
min_events: 100
min_dynamic_fields: 3
stable_threshold: 0.5
Development/debugging:
sampling:
warmup_events: 10
sample_rate: 1 # Analyze every event
high_cardinality:
enabled: true
min_events: 20 # Faster classification
Example: JSON Column Sensing
For tables with JSON columns, sensing reveals the internal structure:
# Database schema shows: metadata JSON
# Sensing reveals:
fields:
- name: "metadata.user_agent"
types: ["string"]
nullable: false
- name: "metadata.ip_address"
types: ["string"]
nullable: true
- name: "metadata.tags"
types: ["array"]
array_element_types: ["string"]
This enables downstream consumers to understand JSON column structure without manual documentation.
Metrics
Schema sensing emits these Prometheus metrics:
| Metric | Type | Labels | Description |
|---|---|---|---|
deltaforge_schema_events_total | Counter | table | Total events observed |
deltaforge_schema_cache_hits_total | Counter | table | Structure cache hits |
deltaforge_schema_cache_misses_total | Counter | table | Structure cache misses |
deltaforge_schema_evolutions_total | Counter | table | Schema evolutions detected |
deltaforge_schema_tables_total | Gauge | - | Tables with detected schemas |
deltaforge_schema_dynamic_maps_total | Gauge | - | Paths classified as dynamic maps |
deltaforge_schema_sensing_seconds | Histogram | table | Per-event sensing latency |
Example Queries
# Cache hit rate per table
sum(rate(deltaforge_schema_cache_hits_total[5m])) by (table)
/
sum(rate(deltaforge_schema_events_total[5m])) by (table)
# Schema evolution rate (should be near zero after warmup)
sum(rate(deltaforge_schema_evolutions_total[5m])) by (table)
# P99 sensing latency
histogram_quantile(0.99, rate(deltaforge_schema_sensing_seconds_bucket[5m]))
Failover Handling
DeltaForge detects database failover automatically and resumes streaming on the new primary without operator intervention. This page explains how detection works, what happens during reconciliation, and how to configure behaviour when the new primary has a different schema.
How Detection Works
Every time the source reconnects - at startup or after a transient error - it queries the server’s stable identity:
- MySQL:
@@server_uuidfromperformance_schema.replication_group_members - PostgreSQL:
system_identifierfrompg_control_system()
The result is compared against the value stored in DeltaForge’s storage backend. Three outcomes are possible:
| Result | Meaning | Action |
|---|---|---|
FirstSeen | No identity stored yet | Store and continue |
Same | Same server as before | Verify checkpoint GTID is still reachable, then continue |
Changed | Server identity differs | Run failover reconciliation |
Identity is written to the durable storage backend (SQLite or PostgreSQL), so it survives process restarts and is correctly preserved across pipeline reloads.
What Happens During Failover
When a Changed identity is detected, DeltaForge runs reconciliation before allowing any events to flow. Reconciliation is idempotent - if the process dies mid-run, it will re-execute correctly on the next startup.
1. Position reachability check
DeltaForge verifies that the checkpoint position from the old primary still exists on the new primary:
- MySQL: checks whether the GTID set from the last checkpoint is present in B’s executed GTID history or purged range
- PostgreSQL: checks whether the replication slot’s
confirmed_flush_lsnis reachable
If the position is confirmed lost, the source stops immediately with an error and /health returns 503. This covers two distinct cases:
- Server changed (
Changed): B’s GTID history does not contain A’s checkpoint (e.g. B was a lagging replica). - Same server, history wiped (
Same):RESET BINARY LOGS AND GTIDSwas run on the same server, clearing all GTID state without changing the server UUID. DeltaForge detects this on the first reconnect by checkingGTID_SUBSET(checkpoint, @@gtid_executed).
In both cases the error message is:
position lost: <reason>. Re-snapshot required.
Silently skipping data is worse than halting. Restart the pipeline with a fresh snapshot to recover.
If reachability cannot be determined (e.g. the health query fails transiently), DeltaForge logs a warning and continues — it does not halt on uncertainty.
2. Schema drift detection
DeltaForge compares the schema last registered from the old primary against the live catalog on the new primary. Any column additions, removals, or renames are recorded as a ReconcileRecord in the storage backend.
If drift is found, the schema cache is invalidated so the next row event triggers a fresh load with the correct column mapping.
3. Resume
After reconciliation, DeltaForge stores B’s identity and resumes streaming. The first events from B use the updated schema.
Position Adjustment
A subtle but critical detail: simply reconnecting at A’s checkpoint position can cause data loss on its own, before reconciliation even runs.
MySQL: B rejects A’s GTID set at the protocol level with “purged required binary logs”. DeltaForge detects the identity change before opening the binlog stream, resolves B’s current binlog tail via SHOW BINARY LOG STATUS, and connects there instead. A’s original GTID checkpoint is preserved separately for the reachability check.
PostgreSQL: START_REPLICATION at A’s LSN immediately advances the slot’s confirmed_flush_lsn to max(A_checkpoint, slot_lsn). If B’s slot was created at an LSN behind A’s checkpoint, any changes B committed in that gap are permanently discarded - even if you reconnect at the correct LSN afterwards. DeltaForge detects the identity change before opening the replication stream and fetches the slot’s actual confirmed_flush_lsn to use as the start position instead.
In both cases the original checkpoint is preserved for the reachability check, separate from the adjusted streaming position.
Schema Drift Policy
By default DeltaForge adapts to the new primary’s schema and continues streaming. This is safe for additive drift (B has a new column A didn’t have) but can be risky if B is missing columns that A had - row events encoded against A’s schema may decode incorrectly against B’s.
The on_schema_drift field controls this behaviour:
source:
type: mysql
config:
id: my-pipeline
dsn: ${MYSQL_DSN}
tables: [shop.orders]
on_schema_drift: halt # default: adapt
| Value | Behaviour |
|---|---|
adapt | Record drift, reload schema cache, continue streaming. Default. |
halt | Stop the source when any schema drift is detected. Requires operator intervention. |
When halt fires, the reconciliation record is persisted before the source stops - you can inspect what changed before restarting:
schema drift detected after failover and on_schema_drift=halt.
Verify B's schema and apply any missing migrations before restarting.
Use halt when your failover environments do not guarantee DDL sync to replicas before promotion.
What DeltaForge Does Not Handle
DSN switching is external. DeltaForge detects a new server by comparing identities, not by monitoring cluster topology. The DSN must already point to B before the pipeline reconnects - this is typically handled by a load balancer VIP, DNS failover, or connection proxy. If the DSN still resolves to A, the pipeline will retry A’s dead connection rather than discovering B.
Data loss from replica lag is not recoverable. If B was a lagging replica and never received transactions that A committed before failing, those rows are gone at the database level. DeltaForge can detect the position gap but cannot reconstruct missing data. A re-snapshot from B is required in this case.
Mid-flight DDL during active streaming is handled separately by the normal schema reload mechanism, not by failover reconciliation. Failover reconciliation only runs when the server identity changes.
Infrastructure Requirements
For clean automatic failover:
- MySQL: GTID mode must be enabled (
gtid_mode=ON,enforce_gtid_consistency=ON). Without GTID, DeltaForge falls back to file/position coordinates which are meaningless across servers. - PostgreSQL: The replication slot must exist on B before the pipeline connects to it. Slots are not automatically transferred during failover - use a slot-aware HA tool (e.g. Patroni with
permanent_slots) or pre-create the slot on standbys. - Both: The CDC user must exist on B with the same privileges as on A.
Deployment
Docker
docker run --rm \
-e MYSQL_USER=cdc_user \
-e MYSQL_PASSWORD=s3cret \
-v $(pwd)/pipeline.yaml:/etc/deltaforge/pipeline.yaml:ro \
ghcr.io/vnvo/deltaforge:latest \
--config /etc/deltaforge/pipeline.yaml
Docker Compose
See the chaos testing environment for a full Docker Compose example with MySQL, Kafka, Prometheus, Grafana, and Loki.
Kubernetes (Helm)
Install
helm install deltaforge ./deploy/helm/deltaforge \
--set secrets.create=true \
--set secrets.data.MYSQL_USER=cdc_user \
--set secrets.data.MYSQL_PASSWORD=s3cret
What it deploys
| Resource | Purpose |
|---|---|
| StatefulSet | DeltaForge pod with stable identity |
| ConfigMap | Pipeline YAML configuration |
| PVC | Persistent storage for checkpoints + DLQ |
| Service | ClusterIP with API (8080) and metrics (9000) ports |
| ServiceAccount | Pod identity |
| ServiceMonitor | Prometheus Operator integration (optional) |
| Secret | Credentials (optional, dev only) |
Secrets
Secrets contain only credentials (username, password, tokens). Connection details (host, port, topic) stay in the pipeline config. Pipeline config uses ${VAR_NAME} shell expansion at startup:
# Pipeline config (in ConfigMap)
dsn: "mysql://${MYSQL_USER}:${MYSQL_PASSWORD}@mysql-primary:3306/orders"
brokers: "kafka:9092" # not secret — stays in config
For production, create K8s Secrets separately and reference them:
secrets:
existingSecrets:
- name: mysql-creds # keys: MYSQL_USER, MYSQL_PASSWORD
- name: kafka-sasl-creds # keys: KAFKA_SASL_USER, KAFKA_SASL_PASSWORD
Health probes
The chart configures liveness and readiness probes automatically:
- Liveness (
/health): returns 200 when healthy, 503 when a pipeline has failed. Triggers pod restart on prolonged failure. - Readiness (
/ready): returns 200 with pipeline status JSON. Controls traffic routing.
Monitoring
Prometheus annotations are enabled by default (prometheus.io/scrape: "true"). For Prometheus Operator, enable the ServiceMonitor:
serviceMonitor:
enabled: true
interval: 15s
Storage
Development/testing: SQLite on a PersistentVolume (default). Simple, no external dependencies.
Production: PostgreSQL is recommended. Benefits:
- Survives pod rescheduling without PVC migration
- Proper backup/restore via
pg_dump - Supports multiple replicas sharing state (future operator/sharding)
- Better concurrency under high checkpoint commit rates
storage:
backend: postgres
persistence:
enabled: false # no PVC needed with Postgres
secrets:
existingSecrets:
- name: deltaforge-storage # must contain key: STORAGE_DSN
The STORAGE_DSN Secret should contain a PostgreSQL connection string:
postgresql://deltaforge:password@postgres.infra:5432/deltaforge
Create the database and user beforehand:
CREATE USER deltaforge WITH PASSWORD 'password';
CREATE DATABASE deltaforge OWNER deltaforge;
DeltaForge creates its tables automatically on first connection.
Full values reference
See the Helm chart README for all configurable values.
Observability playbook
DeltaForge already ships a Prometheus exporter, structured logging, and a panic hook. The runtime now emits source ingress counters, batching/processor histograms, and sink latency/throughput so operators can build production dashboards immediately. The tables below capture what is wired today and the remaining gaps to make the platform production ready for data and infra engineers.
What exists today
- Prometheus endpoint served at
/metrics(default0.0.0.0:9000) with descriptors for pipeline counts, source/sink counters, and a stage latency histogram. The recorder is installed automatically when metrics are enabled. - Structured logging via
tracing_subscriberwith JSON output by default, optional targets, and support forRUST_LOGoverrides. - Panic hook increments a
deltaforge_panics_totalcounter and logs captured panics before delegating to the default hook.
Instrumentation gaps and recommendations
The sections below call out concrete metrics and log events to add per component. All metrics should include pipeline, tenant, and component identifiers where applicable so users can aggregate across fleets.
Sources (MySQL/Postgres)
| Status | Metric/log | Rationale |
|---|---|---|
| ✅ Implemented | deltaforge_source_events_total{pipeline,source,table} counter increments when MySQL events are handed to the coordinator. | Surfaces ingress per table and pipeline. |
| ✅ Implemented | deltaforge_source_reconnects_total{pipeline,source} counter when binlog reads reconnect. | Makes retry storms visible. |
| ✅ Implemented | deltaforge_source_lag_seconds{pipeline} gauge — replication lag based on last event timestamp vs. wall clock. | Alert when sources fall behind. |
| ✅ Implemented | deltaforge_source_table_lag_seconds{pipeline,table} gauge — per-table replication lag within each batch. | Identify which tables are lagging. |
| 🚧 Gap | deltaforge_source_idle_seconds{pipeline,source} gauge updated when no events arrive within the inactivity window. | Catch stuck readers before downstream backlogs form. |
Coordinator and batching
| Status | Metric/log | Rationale |
|---|---|---|
| ✅ Implemented | deltaforge_batch_events{pipeline} and deltaforge_batch_bytes{pipeline} histograms in Coordinator::process_deliver_and_maybe_commit. | Tune batching policies with data. |
| ✅ Implemented | deltaforge_bytes_total{pipeline} counter — cumulative bytes processed through the pipeline. rate() gives bytes/s throughput. | Monitor data volume and bandwidth utilization. |
| ✅ Implemented | deltaforge_source_bytes_total{pipeline,source} counter — cumulative raw bytes received from the source (WAL/binlog). | Track source-side data ingestion rate. |
| ✅ Implemented | deltaforge_stage_latency_seconds{pipeline,stage,trigger} histogram for processor stage. | Provides batch timing per trigger (timer/limits/shutdown). |
| ✅ Implemented | deltaforge_processor_latency_seconds{pipeline,processor} histogram around every processor invocation. | Identify slow user functions. |
| 🚧 Gap | deltaforge_pipeline_channel_depth{pipeline} gauge from mpsc::Sender::capacity()/len(). | Detect backpressure between sources and coordinator. |
| ✅ Implemented | deltaforge_checkpoints_total{pipeline} counter — successful checkpoint commits. | Monitor checkpoint throughput. |
Sinks (Kafka/Redis/custom)
| Status | Metric/log | Rationale |
|---|---|---|
| ✅ Implemented | deltaforge_sink_events_total{pipeline,sink} counter and deltaforge_sink_latency_seconds{pipeline,sink} histogram around each send. | Throughput and responsiveness per sink. |
| ✅ Implemented | deltaforge_sink_batch_total{pipeline,sink} counter for send. | Number of batches sent per sink. |
| ✅ Implemented | deltaforge_sink_errors_total{pipeline,sink} counter with per-sink error tracking. | Alert on sink failures. |
| ✅ Implemented | deltaforge_sink_txn_commits_total{pipeline,sink} counter — Kafka transaction commits/s. | Track exactly-once throughput. |
| ✅ Implemented | deltaforge_sink_txn_aborts_total{pipeline,sink} counter — Kafka transaction aborts/s. Should be ~0. | Detect fencing or broker issues. |
| ✅ Implemented | deltaforge_sink_checkpoint_status{pipeline,sink} gauge (1=ok, 0=behind). | Per-sink checkpoint health. |
| ✅ Implemented | deltaforge_sink_last_checkpoint_ts{pipeline,sink} epoch timestamp. | Per-sink checkpoint age. |
| 🚧 Gap | Backpressure gauge for client buffers (rdkafka queue, Redis pipeline depth). | Early signal before errors occur. |
Avro encoding (when encoding: avro is configured)
| Status | Metric/log | Rationale |
|---|---|---|
| ✅ Implemented | deltaforge_avro_encode_total{path} counter — path=ddl (DDL-derived schema) or path=inferred (JSON fallback). | Track which schema source is being used. DDL is preferred. |
| ✅ Implemented | deltaforge_avro_schema_registrations_total counter — successful Schema Registry registrations. | Monitor schema registration activity. |
| ✅ Implemented | deltaforge_avro_sr_cache_fallback_total counter — events encoded with cached schema because SR was unavailable. | Alert on SR connectivity issues. |
| ✅ Implemented | deltaforge_avro_encode_failure_total{reason} counter — reason=sr_unavailable (no cache, can’t encode) or reason=schema_mismatch (DDL changed, old schema can’t encode event). | Alert on encoding failures; events with these errors are routed to DLQ. |
Pipeline lifecycle
| Status | Metric/log | Rationale |
|---|---|---|
| ✅ Implemented | deltaforge_pipeline_status{pipeline} gauge reflecting the current lifecycle state of each pipeline. | Single gauge to alert on stopped or failed pipelines and drive dashboards. |
| ✅ Implemented | deltaforge_e2e_latency_seconds{pipeline} histogram measuring wall-clock time from when an event was received by DeltaForge to when it was delivered to the sink. | Measures pipeline delivery latency independently of source clock precision. |
| ✅ Implemented | deltaforge_source_lag_seconds{pipeline} gauge — replication lag based on event timestamp vs. wall clock. | Alert when the source is behind real time. |
| ✅ Implemented | deltaforge_checkpoints_total{pipeline} counter — checkpoint commits/s. | Monitor checkpoint throughput. |
| ✅ Implemented | deltaforge_last_checkpoint_ts{pipeline} epoch timestamp — pipeline-level checkpoint age. | Alert on stale checkpoints. |
deltaforge_pipeline_status value semantics
The gauge uses a numeric encoding so operators can alert on any non-running state with a single threshold:
| Value | State | Meaning |
|---|---|---|
1.0 | running | Pipeline is active and processing events |
0.5 | paused | Source connection alive; event processing suspended |
0.0 | stopped | Disconnected from source; checkpoint saved; resumable |
< 0 | failed | Unrecoverable error (position lost, server changed, etc.) |
Example PromQL:
# Count pipelines in each state
count(deltaforge_pipeline_status == 1) # running
count(deltaforge_pipeline_status == 0.5) # paused
count(deltaforge_pipeline_status == 0) # stopped
count(deltaforge_pipeline_status < 0) # failed
# Alert if any pipeline is not running
count(deltaforge_pipeline_status != 1) > 0
deltaforge_e2e_latency_seconds note
E2E latency is measured from the wall-clock time the event was received and parsed by DeltaForge, not from the binlog header.timestamp. MySQL binlog timestamps have one-second precision, which would introduce up to 1 s of phantom latency in the histogram. Using the internal receive time gives sub-millisecond accuracy regardless of source clock granularity.
The replication lag metric (separate from E2E latency) uses the binlog timestamp and measures how far behind the source is relative to real time — that one-second precision is acceptable for lag alerting.
Dead Letter Queue
| Status | Metric/log | Rationale |
|---|---|---|
| ✅ Implemented | deltaforge_dlq_events_total{pipeline,sink,error_kind} counter. | Track rate of events routed to DLQ. |
| ✅ Implemented | deltaforge_dlq_entries{pipeline} gauge — current unacked entries. | Monitor DLQ backlog size. |
| ✅ Implemented | deltaforge_dlq_saturation_ratio{pipeline} gauge (0.0-1.0). | Alert at 80% (warning) and 95% (critical). |
| ✅ Implemented | deltaforge_dlq_evicted_total{pipeline} counter — entries lost to drop_oldest overflow. | Track data loss from overflow. |
| ✅ Implemented | deltaforge_dlq_rejected_total{pipeline} counter — entries lost to reject overflow. | Track data loss from rejection. |
| ✅ Implemented | deltaforge_dlq_write_failures_total{pipeline} counter — DLQ storage failures. | Alert on DLQ infrastructure issues. |
Control plane and health endpoints
| Need | Suggested metric/log | Rationale |
|---|---|---|
| API request accounting | deltaforge_api_requests_total{route,method,status} counter and latency histogram using Axum middleware. | Production-grade visibility of operator actions. |
| Ready/Liveness transitions | Logs with pipeline counts and per-pipeline status when readiness changes. | Explain probe failures in log aggregation. |
| Pipeline lifecycle counters | Counters for create/patch/stop/resume actions with success/error labels. | Auditable control-plane operations. |
Grafana Dashboard
A production-ready Grafana dashboard is included in the repository, optimized for fleet operations with hundreds of pipelines:
Import it via Grafana UI → Dashboards → Import → Upload JSON file.
What’s included
| Row | Panels | Purpose |
|---|---|---|
| Fleet Overview | Running/unhealthy count, total events/s, total data/s, max lag, DLQ total, reconnects, txn aborts, sink errors | One-glance health across all pipelines |
| Top Pipelines | Top 10 laggiest, top 10 throughput, top 10 DLQ backlogs | Identify outliers without drowning in 300 series |
| Throughput | Aggregate events/s, per-pipeline events/s, data throughput | Capacity planning and anomaly detection |
| Latency & Lag | E2E latency p50/p95, source lag, per-table lag (top 10) | SLA monitoring, identify slow tables |
| Checkpoints & EOS | Per-sink status, commit rate, txn commits/aborts | Exactly-once health, checkpoint freshness |
| Dead Letter Queue | Entries, events/s, saturation, overflow rate | DLQ monitoring and alerting |
| Errors & Reliability | Sink errors, reconnects, pipeline state timeline | Incident detection |
| Batching & Kafka | Batch size, batch bytes, sink latency (collapsed) | Tuning reference |
| Infrastructure | Container CPU, memory (collapsed) | Resource monitoring |
Template variables
The dashboard includes dropdown filters at the top:
- Instance — select DeltaForge instances
- Tenant — filter by tenant (from
deltaforge_pipeline_infometric) - Pipeline — select specific pipelines
- Sink — filter by sink
Prerequisites
- Prometheus scraping DeltaForge metrics on port 9000 (
/metrics) - Prometheus datasource configured in Grafana as “Prometheus”
- For container metrics: cAdvisor scraping enabled
Performance Tuning
This guide covers throughput optimization for DeltaForge CDC pipelines, based on profiling and benchmarking with the chaos test suite.
Note: These results and recommendations are a starting point. Every deployment has unique requirements — hardware, network topology, database workload patterns, event sizes, and downstream consumer capacity all affect real-world throughput. Profile your own workload and iterate.
Benchmark Results
Measured on Docker containers on a single developer machine (not dedicated infrastructure), draining a 1-10M row backlog to a single-node Kafka broker.
With tuned batching (recommended)
batch.max_events=16000, batch.max_bytes=16MB, batch.max_inflight=4, linger.ms=0
| Source | Mode | Avg (events/s) | Peak (events/s) |
|---|---|---|---|
| MySQL | at-least-once | 151K | 159K |
| MySQL | exactly-once | 134K | 143K |
| Postgres | at-least-once | 57K | 64K |
| Postgres | exactly-once | 53K | 55K |
Exactly-once overhead is ~7-11% when batch sizes are properly tuned.
Why max_bytes matters
These results show the impact of a small max_bytes (3MB) with max_events=8000:
| Source | Mode | Avg (events/s) | Peak (events/s) |
|---|---|---|---|
| MySQL | at-least-once | 110K | 122K |
| MySQL | exactly-once | 48K | 57K |
A 3MB byte limit caps batches at ~6,000 events regardless of max_events, making transaction commits proportionally expensive. The default max_bytes is 16MB — sufficient for batches up to ~32K events at typical event sizes.
Your numbers will differ based on hardware, network latency, event size, and Kafka/database configuration.
Key Tuning Parameters
Batch Size (batch.max_events + batch.max_bytes)
The single most impactful setting. Larger batches amortize per-batch overhead (Kafka produce, transaction commit, checkpoint write, metrics recording) across more events.
Important: max_events and max_bytes both cap batch size — whichever triggers first wins. If you set max_bytes too low, it will silently cap your batches regardless of max_events. The default (16MB) accommodates batch sizes up to ~32K events at typical event sizes.
Recommended for high-throughput drain/catch-up:
spec:
batch:
max_events: 16000
max_bytes: 16777216 # 16MB — room for 16K events
max_ms: 100
max_inflight: 4
For steady-state pipelines with lower latency requirements, the defaults (max_events=2000, max_bytes=3MB) are fine.
Kafka Linger (linger.ms)
Controls how long rdkafka waits before sending a produce request. This is the most common throughput bottleneck when left at high values.
linger.ms=20: each small batch waits 20ms before sending, capping throughputlinger.ms=5(sink built-in default): good balance for steady-statelinger.ms=0: maximum throughput for drain/catch-up/high-intensity workloads
The internal coordinator enqueues entire batches (hundreds to thousands of messages) in a tight loop, so rdkafka batches naturally without needing linger time. Higher linger values only add idle wait per produce.
Override via client_conf in the sink config:
sinks:
- type: kafka
config:
client_conf:
linger.ms: "0"
Batch Pipelining (batch.max_inflight)
Controls how many batches can be queued between the accumulation loop and the delivery task. Higher values overlap batch building with Kafka delivery.
max_inflight=1: sequential delivery (default)max_inflight=4: recommended for high-throughput workloads (adjust per need)
The delivery task processes batches in FIFO order, so checkpoint and event delivery ordering is always preserved regardless of the inflight setting. This config essentially allows the read from source to continue without waiting for processing in other parts of the pipline.
spec:
batch:
max_events: 4000
max_ms: 100
max_inflight: 4
Schema Sensing
Disable schema sensing during drain/catch-up for maximum throughput:
spec:
schema_sensing:
enabled: false
Re-enable for steady-state operation when schema tracking is needed. Be mindful, schema sensing is a CPU-intensive task.
Proxy Bypass - Chaos/Bench Testing
When running with Toxiproxy (chaos testing), use --no-proxy to bypass the proxy for direct database and Kafka connections. The proxy adds measurable overhead to throughput.
Source-Specific Tuning
MySQL
MySQL binlog is inherently efficient because WriteRowsEvent batches multiple rows into a single event.
MySQL server settings that affect CDC throughput:
binlog-row-image=FULL— required for CDC but sends all columns per row. If your use case allows it,MINIMALreduces binlog event size significantly (only changed columns are sent).binlog_transaction_dependency_tracking=WRITESET— enables parallel replication metadata. While DeltaForge reads sequentially, this can reduce replication lag on replicas feeding DeltaForge.max_allowed_packet— increase if you have large blob/text columns. The default (64MB) is usually sufficient.binlog_expire_logs_seconds— set high enough that DeltaForge can recover from outages without losing its checkpoint position. 7 days is a safe starting point.
DeltaForge settings for MySQL:
tables— be specific. Subscribing to*.*forces DeltaForge to process table map events for every table, even those it discards.snapshot.chunk_size— for initial snapshots of large tables, increase chunk size (default 10,000) to reduce round trips.
PostgreSQL
Postgres logical replication (pgoutput) sends one WAL message per row change, making it more per-message intensive than MySQL binlog.
PostgreSQL server settings that affect CDC throughput:
wal_level=logical— required. No throughput impact vs.replica.max_wal_senders— ensure enough slots for DeltaForge plus any replicas. Default (10) is usually sufficient.wal_sender_timeout— increase from the default (60s) if DeltaForge pauses processing for extended periods (e.g., during pipeline restarts).300sis a safer value.wal_keep_size— set large enough to cover outage windows. If DeltaForge disconnects and WAL is recycled, the replication slot becomes invalid and requires re-snapshot.- Replica identity —
ALTER TABLE ... REPLICA IDENTITY FULLsends full row images for updates/deletes.DEFAULT(primary key only) reduces WAL message size but limits thebeforeimage in CDC events. - Publication scope — create publications with explicit table lists (
FOR TABLE ...) rather thanFOR ALL TABLESto reduce WAL decoding overhead on the server.
DeltaForge settings for PostgreSQL:
- Batch writes in transactions — if your writer can group inserts into
BEGIN; INSERT ...; INSERT ...; COMMIT, the server sends fewer BEGIN/COMMIT WAL messages, reducing per-event overhead. tables— use specific patterns. Broad patterns force schema loading and filtering for unneeded tables.
The throughput gap between Postgres and MySQL is primarily due to protocol-level differences (one WAL message per row vs. batched rows), not code inefficiency.
Exactly-Once Delivery Overhead
Enabling exactly_once: true on sinks adds per-batch transaction overhead:
Kafka Transactional Producer
Each batch is wrapped in begin_transaction() / commit_transaction(). The transaction commit adds a constant ~1-3ms per batch (broker-side two-phase commit), so larger batches amortize the cost better.
Measured overhead (Docker containers, single developer machine, 1-10M row drain):
| Source | at-least-once | exactly-once | Overhead |
|---|---|---|---|
| MySQL | 151K events/s | 134K events/s | ~11% |
| Postgres | 57K events/s | 53K events/s | ~7% |
These numbers use tuned batch settings (max_events=16000, max_bytes=16MB). With a small max_bytes (e.g. 3MB), batches are capped at ~6K events regardless of max_events, making the transaction commit disproportionately expensive. The default max_bytes (16MB) is sufficient for most workloads.
Key considerations:
transaction.timeout.ms(set to 60s by DeltaForge): if a batch takes longer than this to deliver, the broker aborts the transaction. Increase for very large batches or high-latency networks.transactional.idmust be unique per pipeline-sink pair. DeltaForge sets this automatically todeltaforge-{pipeline_id}-{sink_id}.- Producer fencing: if two producers share the same
transactional.id, the broker fences (kills) the older one. This is detected as aFatalerror and stops the pipeline. Ensure only one instance of each pipeline runs at a time.
NATS JetStream
Deduplication uses the Nats-Msg-Id header (server-side). No client-side transaction overhead, but the server maintains a dedup window. Configure duplicate_window on the stream to match your replay window (default 2 minutes).
Redis Streams
Idempotency keys are embedded in the XADD payload. No transaction overhead on the Redis side — deduplication is the consumer’s responsibility. This is a Tier 2 guarantee (at-least-once with idempotency keys for consumer-side dedup).
Per-Sink Checkpoints
Each sink maintains its own checkpoint, committed independently after successful delivery. The source replays from the minimum checkpoint across all sinks. This means:
- Faster sinks are not held back by slower ones — they advance their own checkpoints independently.
- Adding a new sink to an existing pipeline triggers a replay from the source’s earliest available position for that sink only.
- Checkpoint storage overhead scales linearly with the number of sinks (one key per sink per source).
Profiling
Use the chaos UI’s built-in CPU profiler to capture flamegraphs during drain runs:
- Start a drain scenario from the chaos UI
- Once the drain phase starts (step 5/6), click Record on the target container
- The generated flamegraph SVG includes pipeline config, batch settings, and connection mode in the subtitle automatically
Or from the command line:
# Start drain in terminal 1
cargo run -p chaos --release -- --scenario backlog-drain --source mysql --no-proxy
# Capture flamegraph in terminal 2 (after drain phase starts)
docker exec <container-name> perf record -F 99 -p 1 -g --call-graph dwarf -o /tmp/perf.data -- sleep 30
Requires the profiling image (deltaforge:dev-profile) which includes perf and debug symbols.
Key areas to watch in flamegraphs:
| Area | What it means |
|---|---|
serialize_event / format_escaped_str | JSON serialization — consider smaller batches if dominant |
recv / [unknown] kernel stacks | I/O wait for source data — protocol-bound |
_rjem_je_* | jemalloc allocation pressure — large batches increase this |
rd_kafka_* / LZ4_compress | Kafka produce and compression overhead |
check_and_split | Coordinator batch accumulation |
epoll_wait / park_timeout | Idle time — pipeline is I/O bound, not CPU bound |
Running the Drain Benchmark
The backlog drain benchmark measures catch-up throughput: how fast DeltaForge replays a pre-built backlog of 1M rows.
# MySQL — requires the soak compose profile
docker compose -f docker-compose.chaos.yml --profile soak up -d
cargo run -p chaos --release -- --scenario backlog-drain --source mysql --no-proxy \
--drain-max-events 4000 --drain-max-ms 100 --drain-kafka-conf linger.ms=0
# Postgres — requires the pg-soak compose profile
docker compose -f docker-compose.chaos.yml --profile pg-soak up -d
cargo run -p chaos --release -- --scenario backlog-drain --source postgres --no-proxy \
--drain-max-events 4000 --drain-max-ms 100 --drain-kafka-conf linger.ms=0
The benchmark:
- Stops the pipeline and saves its checkpoint
- Writes 1M rows to the source database using 32 concurrent writers
- Resumes the pipeline and measures how fast events appear in Kafka
- Reports avg/p50/peak events/s with full configuration in the output
Tune --drain-max-events, --drain-max-ms, and --drain-kafka-conf to experiment with different settings. The chaos UI also exposes these as form fields for interactive tuning.
Avro Encoding Performance
Avro encoding trades slightly higher producer CPU for significantly smaller payloads.
Expected impact
| Metric | JSON (baseline) | Avro | Notes |
|---|---|---|---|
| Payload size | Baseline | ~40-60% smaller | No field names, binary encoding |
| Producer CPU | Baseline | ~10-20% higher | Schema lookup + Avro binary encoding |
| Kafka broker disk/network | Baseline | ~40-60% less | Significant at scale |
| First-event latency | Instant | +5-50ms | One-time Schema Registry call per table |
| Steady-state latency | Baseline | Comparable | Schema cached after first event |
The system-level throughput (events/sec end-to-end) usually stays the same or improves because the bottleneck is typically network/disk I/O, not producer CPU. Smaller payloads reduce broker-side pressure.
Comparing JSON vs Avro
Run both soak tests side by side (different containers, same source):
docker compose -f docker-compose.chaos.yml \
--profile base --profile mysql-infra --profile kafka-infra \
--profile soak --profile avro-soak up -d
# JSON baseline
cargo run -p chaos -- --scenario soak-stable --duration-mins 30
# Avro comparison
cargo run -p chaos -- --scenario soak-stable-avro --duration-mins 30
Compare in Grafana: rate(deltaforge_sink_events_total[1m]) filtered by instance port 9001 (JSON) vs 9006 (Avro).
Avro-specific flamegraph areas
| Area | What it means |
|---|---|
encode_event / encode_with_schema | Avro encoding path — schema lookup + binary encoding |
register_schema | Schema Registry HTTP call (only on first event per table or DDL change) |
json_to_avro / to_avro_datum | JSON → Avro value conversion + binary serialization |
resolve (in avro module) | Schema cache lookup — should be near-instant |
Examples
Complete pipeline configurations demonstrating common DeltaForge use cases. Each example is ready to run with minimal modifications.
Available Examples
| Example | Source | Sink(s) | Key Features |
|---|---|---|---|
| MySQL to Redis | MySQL | Redis | JavaScript processor, PII redaction |
| Turso to Kafka | Turso/libSQL | Kafka | Native CDC, CloudEvents envelope |
| PostgreSQL to NATS | PostgreSQL | NATS | Logical replication, CloudEvents |
| Multi-Sink Fan-Out | MySQL | Kafka + Redis + NATS | Multiple envelopes, selective checkpointing |
| Event Filtering | MySQL | Kafka | JavaScript filtering, PII redaction |
| Schema Sensing | PostgreSQL | Kafka | JSON schema inference, drift detection |
| Production Kafka | PostgreSQL | Kafka | SASL/SSL auth, exactly-once, tuning |
| Cache Invalidation | MySQL | Redis | CDC stream for cache invalidation workers |
| Audit Trail | PostgreSQL | Kafka | Compliance logging, PII redaction |
| Analytics Preprocessing | MySQL | Kafka + Redis | Metrics enrichment, analytics stream |
| Outbox Pattern | MySQL | Kafka | Transactional outbox, raw payload, per-aggregate routing |
Quick Start
- Set environment variables for your database and sink connections
- Copy the example to a
.yamlfile - Run DeltaForge:
cargo run -p runner -- --config your-pipeline.yaml
Examples by Category
Getting Started
| Example | Description |
|---|---|
| MySQL to Redis | Simple pipeline with JavaScript transformation |
| Turso to Kafka | Edge database to Kafka with CloudEvents |
| PostgreSQL to NATS | PostgreSQL logical replication to NATS |
Production Patterns
| Example | Description |
|---|---|
| Production Kafka | Authentication, exactly-once, performance tuning |
| Multi-Sink Fan-Out | Multiple sinks with different formats |
| Cache Invalidation | CDC stream for cache invalidation |
| Outbox Pattern | Transactional outbox with raw payload delivery |
Data Processing
| Example | Description |
|---|---|
| Event Filtering | Filter, drop, and redact events |
| Schema Sensing | Automatic JSON schema discovery |
| Analytics Preprocessing | Prepare events for analytics platforms |
Compliance & Auditing
| Example | Description |
|---|---|
| Audit Trail | SOC2/HIPAA/GDPR-compliant change tracking |
Examples by Feature
Envelope Formats
| Format | Example |
|---|---|
| Native | MySQL to Redis, Event Filtering, Cache Invalidation |
| Debezium | Schema Sensing, Multi-Sink Fan-Out, Production Kafka |
| CloudEvents | Turso to Kafka, PostgreSQL to NATS |
JavaScript Processors
| Use Case | Example |
|---|---|
| PII Redaction | MySQL to Redis, Audit Trail |
| Event Filtering | Event Filtering |
| Enrichment | Turso to Kafka, Analytics Preprocessing |
| Cache Key Generation | Cache Invalidation |
| Audit Metadata | Audit Trail |
Multi-Sink Patterns
| Pattern | Example |
|---|---|
| Fan-out with different formats | Multi-Sink Fan-Out |
| Primary + best-effort secondary | Multi-Sink Fan-Out, Analytics Preprocessing |
Customizing Examples
Change Envelope Format
All sinks support configurable envelopes. Add to any sink config:
sinks:
- type: kafka
config:
# ... other config
envelope:
type: cloudevents # or: native, debezium
type_prefix: "com.example" # required for cloudevents
encoding: json
See Envelopes and Encodings for details.
Add Multiple Sinks
Fan out to multiple destinations with different formats:
sinks:
- type: kafka
config:
id: primary-kafka
envelope:
type: debezium
required: true # Must succeed for checkpoint
- type: redis
config:
id: cache-redis
envelope:
type: native
required: false # Best-effort, won't block checkpoint
See Sinks documentation for multi-sink patterns.
Enable Schema Sensing
Automatically discover JSON structure in your data:
schema_sensing:
enabled: true
deep_inspect:
enabled: true
max_depth: 3
sampling:
warmup_events: 100
sample_rate: 10
See Schema Sensing for configuration options.
More Resources
- Configuration Reference - Full spec documentation
- Quickstart Guide - Get running in minutes
- Troubleshooting - Common issues and solutions
MySQL to Redis
This example streams MySQL binlog events into a Redis stream with an inline JavaScript transformation for PII redaction.
Overview
| Component | Configuration |
|---|---|
| Source | MySQL binlog CDC |
| Processor | JavaScript email redaction |
| Sink | Redis Streams |
| Envelope | Native (configurable) |
Pipeline Configuration
metadata:
name: orders-mysql-to-redis
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
processors:
- type: javascript
id: redact-email
inline: |
function processBatch(events) {
return events.map((event) => {
if (event.after && event.after.email) {
event.after.email = "[redacted]";
}
return event;
});
}
limits:
timeout_ms: 500
sinks:
- type: redis
config:
id: orders-redis
uri: ${REDIS_URI}
stream: orders
envelope:
type: native
encoding: json
required: true
batch:
max_events: 500
max_bytes: 1048576
max_ms: 1000
commit_policy:
mode: required
Running the Example
1. Set Environment Variables
export MYSQL_DSN="mysql://user:password@localhost:3306/shop"
export REDIS_URI="redis://localhost:6379"
2. Start DeltaForge
# Save config as mysql-redis.yaml
cargo run -p runner -- --config mysql-redis.yaml
3. Insert Test Data
INSERT INTO shop.orders (email, total, status)
VALUES ('alice@example.com', 99.99, 'pending');
4. Verify in Redis
./dev.sh redis-read orders 10
You should see the event with the email redacted:
{
"before": null,
"after": {
"id": 1,
"email": "[redacted]",
"total": 99.99,
"status": "pending"
},
"op": "c",
"ts_ms": 1700000000000
}
Variations
With Debezium Envelope
For Kafka Connect compatibility downstream:
sinks:
- type: redis
config:
id: orders-redis
uri: ${REDIS_URI}
stream: orders
envelope:
type: debezium
Multi-Sink Fan-Out
Add Kafka alongside Redis for durability:
sinks:
- type: kafka
config:
id: orders-kafka
brokers: ${KAFKA_BROKERS}
topic: orders
envelope:
type: debezium
required: true # Critical path
- type: redis
config:
id: orders-redis
uri: ${REDIS_URI}
stream: orders
envelope:
type: native
required: false # Best-effort
With this configuration, checkpoints only wait for Kafka. Redis failures won’t block the pipeline.
With Schema Sensing
Automatically track schema changes:
spec:
# ... source and sinks config ...
schema_sensing:
enabled: true
deep_inspect:
enabled: true
max_depth: 3
sampling:
warmup_events: 100
sample_rate: 10
Key Concepts Demonstrated
- JavaScript Processors: Transform events in-flight with custom logic
- PII Redaction: Mask sensitive data before it reaches downstream systems
- Envelope Configuration: Choose output format based on consumer needs
- Commit Policy: Control checkpoint behavior with
requiredflag
Related Documentation
- MySQL Source - Prerequisites and configuration
- Redis Sink - Connection options and batching
- Envelopes - Output format options
- Configuration Reference - Full spec documentation
Example: Turso to Kafka
This example demonstrates streaming changes from a Turso database to Kafka with schema sensing enabled.
Use Case
You have a Turso database (or local libSQL) and want to:
- Stream table changes to a Kafka topic
- Automatically detect schema structure from JSON payloads
- Transform events with JavaScript before publishing
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: turso2kafka
tenant: acme
spec:
source:
type: turso
config:
id: turso-main
# Local libSQL for development
url: "http://127.0.0.1:8080"
# For Turso cloud:
# url: "libsql://your-db.turso.io"
# auth_token: "${TURSO_AUTH_TOKEN}"
tables: ["users", "orders", "order_items"]
poll_interval_ms: 1000
cdc_mode: auto
processors:
- type: javascript
id: enrich
inline: |
function processBatch(events) {
return events.map(event => {
// Add custom metadata to events
event.source_type = "turso";
event.processed_at = new Date().toISOString();
return event;
});
}
sinks:
- type: kafka
config:
id: kafka-main
brokers: "${KAFKA_BROKERS}"
topic: turso.changes
required: true
exactly_once: false
client_conf:
message.timeout.ms: "5000"
acks: "all"
batch:
max_events: 100
max_bytes: 1048576
max_ms: 500
respect_source_tx: false
max_inflight: 2
commit_policy:
mode: required
schema_sensing:
enabled: true
deep_inspect:
enabled: true
max_depth: 3
sampling:
warmup_events: 50
sample_rate: 5
structure_cache: true
Running the Example
1. Start Infrastructure
# Start Kafka and other services
./dev.sh up
# Create the target topic
./dev.sh k-create turso.changes 6
2. Start Local libSQL (Optional)
For local development without Turso cloud:
# Using sqld (libSQL server)
sqld --http-listen-addr 127.0.0.1:8080
# Or with Docker
docker run -p 8080:8080 ghcr.io/libsql/sqld:latest
3. Create Test Tables
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE,
metadata TEXT -- JSON column
);
CREATE TABLE orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
total REAL NOT NULL,
status TEXT DEFAULT 'pending',
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
4. Run DeltaForge
# Save config as turso-kafka.yaml
cargo run -p runner -- --config turso-kafka.yaml
5. Insert Test Data
INSERT INTO users (name, email, metadata)
VALUES ('Alice', 'alice@example.com', '{"role": "admin", "tags": ["vip"]}');
INSERT INTO orders (user_id, total, status)
VALUES (1, 99.99, 'completed');
6. Verify Events in Kafka
./dev.sh k-consume turso.changes --from-beginning
You should see events like:
{
"event_id": "550e8400-e29b-41d4-a716-446655440000",
"tenant_id": "acme",
"table": "users",
"op": "insert",
"after": {
"id": 1,
"name": "Alice",
"email": "alice@example.com",
"metadata": "{\"role\": \"admin\", \"tags\": [\"vip\"]}"
},
"source_type": "turso",
"processed_at": "2025-01-15T10:30:00.000Z",
"timestamp": "2025-01-15T10:30:00.123Z"
}
Monitoring
Check Pipeline Status
curl http://localhost:8080/pipelines/turso2kafka
View Inferred Schemas
# List all inferred schemas
curl http://localhost:8080/pipelines/turso2kafka/sensing/schemas
# Get details for users table
curl http://localhost:8080/pipelines/turso2kafka/sensing/schemas/users
# Export as JSON Schema
curl http://localhost:8080/pipelines/turso2kafka/sensing/schemas/users/json-schema
Check Drift Detection
curl http://localhost:8080/pipelines/turso2kafka/drift
Turso Cloud Configuration
For production with Turso cloud:
source:
type: turso
config:
id: turso-prod
url: "libsql://mydb-myorg.turso.io"
auth_token: "${TURSO_AUTH_TOKEN}"
tables: ["*"]
cdc_mode: native
poll_interval_ms: 1000
native_cdc:
level: data
Set the auth token via environment variable:
export TURSO_AUTH_TOKEN="your-token-here"
Notes
- CDC Mode:
autotries native CDC first, then falls back to triggers or polling - Poll Interval: Lower values reduce latency but increase database load
- Schema Sensing: Automatically discovers JSON structure in text columns
- Exactly Once: Set to
falsefor higher throughput; usetrueif Kafka cluster supports EOS
PostgreSQL to NATS
This one streams PostgreSQL logical replication changes to NATS JetStream with CloudEvents envelope format, for serverless architectures for example.
Overview
| Component | Configuration |
|---|---|
| Source | PostgreSQL logical replication |
| Processor | None (passthrough) |
| Sink | NATS JetStream |
| Envelope | CloudEvents 1.0 |
Use Case
You have a PostgreSQL database and want to:
- Stream changes to NATS for event-driven microservices
- Use CloudEvents format for AWS Lambda, Azure Functions, or Knative
- Leverage JetStream for durable, replay-capable event streams
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: users-postgres-to-nats
tenant: acme
spec:
source:
type: postgres
config:
id: users-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_users
publication: users_pub
tables:
- public.users
- public.profiles
- public.user_sessions
start_position: earliest
sinks:
- type: nats
config:
id: users-nats
url: ${NATS_URL}
subject: users.events
stream: USERS
envelope:
type: cloudevents
type_prefix: "com.acme.users"
encoding: json
required: true
send_timeout_secs: 5
batch_timeout_secs: 30
batch:
max_events: 500
max_ms: 500
respect_source_tx: true
commit_policy:
mode: required
Prerequisites
PostgreSQL Setup
-- Enable logical replication (postgresql.conf)
-- wal_level = logical
-- Create publication for the tables
CREATE PUBLICATION users_pub FOR TABLE users, profiles, user_sessions;
-- Verify publication
SELECT * FROM pg_publication_tables WHERE pubname = 'users_pub';
NATS JetStream Setup
# Start NATS with JetStream enabled
./dev.sh up
# Create the stream
./dev.sh nats-stream-add USERS 'users.>'
Running the Example
1. Set Environment Variables
export POSTGRES_DSN="postgres://user:password@localhost:5432/mydb"
export NATS_URL="nats://localhost:4222"
2. Start DeltaForge
cargo run -p runner -- --config postgres-nats.yaml
3. Insert Test Data
INSERT INTO users (name, email, created_at)
VALUES ('Alice', 'alice@example.com', NOW());
UPDATE users SET email = 'alice.new@example.com' WHERE name = 'Alice';
4. Verify in NATS
./dev.sh nats-sub 'users.>'
You should see CloudEvents formatted messages:
{
"specversion": "1.0",
"id": "550e8400-e29b-41d4-a716-446655440000",
"source": "deltaforge/users-postgres/public.users",
"type": "com.acme.users.created",
"time": "2025-01-15T10:30:00.000Z",
"datacontenttype": "application/json",
"subject": "public.users",
"data": {
"before": null,
"after": {
"id": 1,
"name": "Alice",
"email": "alice@example.com",
"created_at": "2025-01-15T10:30:00.000Z"
},
"op": "c"
}
}
Variations
With Debezium Envelope
For compatibility with existing Debezium consumers:
sinks:
- type: nats
config:
id: users-nats
url: ${NATS_URL}
subject: users.events
stream: USERS
envelope:
type: debezium
With Authentication
sinks:
- type: nats
config:
id: users-nats
url: ${NATS_URL}
subject: users.events
stream: USERS
envelope:
type: cloudevents
type_prefix: "com.acme.users"
credentials_file: /path/to/nats.creds
# Or use username/password:
# username: ${NATS_USER}
# password: ${NATS_PASS}
Starting from Latest
Skip existing data and only capture new changes:
source:
type: postgres
config:
id: users-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_users
publication: users_pub
tables:
- public.users
start_position: latest
Key Concepts Demonstrated
- PostgreSQL Logical Replication: Production-ready CDC with slot management
- CloudEvents Format: Standard envelope for cloud-native event routing
- JetStream Durability: Replay-capable event streams with consumer acknowledgment
- Transaction Preservation:
respect_source_tx: truekeeps related changes together
Related Documentation
- PostgreSQL Source - Replication setup and configuration
- NATS Sink - JetStream configuration and authentication
- Envelopes - Output format options
- Configuration Reference - Full spec documentation
Multi-Sink Fan-Out
This example demonstrates streaming changes to multiple destinations simultaneously, each with a different envelope format tailored to its consumers.
Overview
| Component | Configuration |
|---|---|
| Source | MySQL binlog CDC |
| Processor | JavaScript enrichment |
| Sinks | Kafka (Debezium) + Redis (Native) + NATS (CloudEvents) |
| Pattern | Fan-out with format adaptation |
Use Case
You have a MySQL database and need to:
- Send to Kafka Connect (requires Debezium format)
- Populate a Redis cache (native format for efficiency)
- Trigger serverless functions via NATS (CloudEvents format)
- Handle sink failures independently without blocking the pipeline
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-fan-out
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.order_items
processors:
- type: javascript
id: enrich
inline: |
function processBatch(events) {
return events.map(event => {
// Add routing hints via tags (tags is a valid Event field)
event.tags = event.tags || [];
if (event.table.includes('orders')) {
event.tags.push('entity:order');
// High-value orders get priority tag
if (event.after && event.after.total > 1000) {
event.tags.push('priority:high');
}
}
event.tags.push('enriched');
return event;
});
}
limits:
timeout_ms: 500
sinks:
# Primary: Kafka for data warehouse / Kafka Connect
# Uses Debezium format for ecosystem compatibility
- type: kafka
config:
id: warehouse-kafka
brokers: ${KAFKA_BROKERS}
topic: orders.cdc
envelope:
type: debezium
encoding: json
required: true # Must succeed for checkpoint
exactly_once: false
client_conf:
acks: "all"
# Secondary: Redis for real-time cache
# Uses native format for minimal overhead
- type: redis
config:
id: cache-redis
uri: ${REDIS_URI}
stream: orders:cache
envelope:
type: native
encoding: json
required: false # Best-effort, won't block pipeline
# Tertiary: NATS for serverless triggers
# Uses CloudEvents for Lambda/Functions compatibility
- type: nats
config:
id: serverless-nats
url: ${NATS_URL}
subject: orders.events
stream: ORDERS
envelope:
type: cloudevents
type_prefix: "com.acme.orders"
encoding: json
required: false # Best-effort
batch:
max_events: 500
max_bytes: 1048576
max_ms: 500
respect_source_tx: true
commit_policy:
mode: required # Only wait for required sinks (Kafka)
How It Works
Checkpoint Behavior
With commit_policy.mode: required:
Source Event
│
▼
┌─────────────────────────────────────────────────────┐
│ Parallel Sink Delivery │
├─────────────────┬─────────────────┬─────────────────┤
│ Kafka │ Redis │ NATS │
│ required: true │ required: false │ required: false │
│ │ │ │
│ ✓ Must succeed │ ✓ Best-effort │ ✓ Best-effort │
└────────┬────────┴────────┬────────┴──────┬──────────┘
│ │ │
▼ │ │
Checkpoint <───────────┘ │
advances │
(even if Redis/NATS fail) │
Output Formats
Kafka (Debezium):
{
"payload": {
"before": null,
"after": {"id": 1, "total": 1500.00, "status": "pending"},
"source": {"connector": "mysql", "db": "shop", "table": "orders"},
"op": "c",
"ts_ms": 1700000000000
}
}
Redis (Native):
{
"before": null,
"after": {"id": 1, "total": 1500.00, "status": "pending"},
"source": {"connector": "mysql", "db": "shop", "table": "orders"},
"op": "c",
"ts_ms": 1700000000000
}
NATS (CloudEvents):
{
"specversion": "1.0",
"id": "evt-123",
"source": "deltaforge/orders-mysql/shop.orders",
"type": "com.acme.orders.created",
"time": "2025-01-15T10:30:00.000Z",
"data": {
"before": null,
"after": {"id": 1, "total": 1500.00, "status": "pending"},
"op": "c"
}
}
Running the Example
1. Set Environment Variables
export MYSQL_DSN="mysql://user:password@localhost:3306/shop"
export KAFKA_BROKERS="localhost:9092"
export REDIS_URI="redis://localhost:6379"
export NATS_URL="nats://localhost:4222"
2. Start Infrastructure
./dev.sh up
./dev.sh k-create orders.cdc 6
./dev.sh nats-stream-add ORDERS 'orders.>'
3. Start DeltaForge
cargo run -p runner -- --config fan-out.yaml
4. Insert Test Data
INSERT INTO shop.orders (customer_id, total, status)
VALUES (1, 1500.00, 'pending');
5. Verify Each Sink
# Kafka
./dev.sh k-consume orders.cdc --from-beginning
# Redis
./dev.sh redis-read orders:cache 10
# NATS
./dev.sh nats-sub 'orders.>'
Variations
Quorum Mode
Require 2 of 3 sinks to succeed:
sinks:
- type: kafka
config:
id: kafka-1
required: true # Counts toward quorum
- type: redis
config:
id: redis-1
required: true # Counts toward quorum
- type: nats
config:
id: nats-1
required: true # Counts toward quorum
commit_policy:
mode: quorum
quorum: 2 # Any 2 must succeed
All-or-Nothing
Require all sinks to succeed (strongest consistency):
commit_policy:
mode: all
⚠️ Warning:
mode: allmeans any sink failure blocks the entire pipeline. Use only when all destinations are equally critical.
Key Concepts Demonstrated
- Multi-Sink Fan-Out: Single source to multiple destinations
- Format Adaptation: Different envelope per consumer requirement
- Selective Checkpointing:
requiredflag controls which sinks gate progress - Failure Isolation: Non-critical sinks don’t block the pipeline
- Tag-Based Enrichment: Use
event.tagsfor routing metadata
Processor Constraints: JavaScript processors can only modify
event.before,event.after, andevent.tags. Arbitrary top-level fields would be lost during serialization.
Related Documentation
- Sinks Overview - Multi-sink patterns and commit policies
- Envelopes - Output format options
- Commit Policy - Checkpoint gating modes
- Configuration Reference - Full spec documentation
Event Filtering with JavaScript
This example demonstrates using JavaScript processors to filter and selectively drop events before they reach sinks.
Overview
| Component | Configuration |
|---|---|
| Source | MySQL binlog CDC |
| Processor | JavaScript filter + redaction |
| Sink | Kafka |
| Pattern | Event filtering and transformation |
Use Case
You have a MySQL database and want to:
- Filter out events from certain tables or with specific conditions
- Drop low-value events to reduce downstream load
- Redact sensitive fields conditionally
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: filtered-orders
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.order_items
- shop.audit_log # We'll filter most of these out
processors:
- type: javascript
id: filter-and-redact
inline: |
function processBatch(events) {
return events
// 1. Filter out audit_log events except errors
.filter(event => {
if (event.table === 'shop.audit_log') {
// Only keep error-level audit events
return event.after && event.after.level === 'error';
}
return true;
})
// 2. Filter out soft-deleted records
.filter(event => {
if (event.after && event.after.deleted_at !== null) {
return false; // Drop soft-deleted records
}
return true;
})
// 3. Filter out test/staging data
.filter(event => {
if (event.after && event.after.email) {
// Drop test accounts
if (event.after.email.endsWith('@test.local')) {
return false;
}
}
return true;
})
// 4. Transform remaining events
.map(event => {
// Redact PII for non-admin tables
if (event.after) {
if (event.after.email) {
event.after.email = maskEmail(event.after.email);
}
if (event.after.phone) {
event.after.phone = '[redacted]';
}
}
// Add filter tag (tags is a valid Event field)
event.tags = (event.tags || []).concat(['filtered']);
return event;
});
}
// helper: mask email keeping domain
function maskEmail(email) {
const [local, domain] = email.split('@');
if (!domain) return '[invalid-email]';
const masked = local.charAt(0) + '***' + local.charAt(local.length - 1);
return masked + '@' + domain;
}
limits:
timeout_ms: 1000
mem_mb: 128
sinks:
- type: kafka
config:
id: filtered-kafka
brokers: ${KAFKA_BROKERS}
topic: orders.filtered
envelope:
type: native
encoding: json
required: true
batch:
max_events: 500
max_ms: 500
respect_source_tx: true
commit_policy:
mode: required
Filter Patterns
Drop Events (Return Empty Array Element)
.filter(event => {
// return false to drop the event
if (event.table === 'internal_logs') {
return false;
}
return true;
})
Conditional Field-Based Filtering
.filter(event => {
// drop events where status is 'draft'
if (event.after && event.after.status === 'draft') {
return false;
}
return true;
})
Drop by Operation Type
.filter(event => {
// only keep inserts and updates, drop deletes
return event.op === 'c' || event.op === 'u';
})
Sample Events (Rate Limiting)
// keep only 10% of events (for high-volume tables)
.filter(event => {
if (event.table === 'high_volume_table') {
return Math.random() < 0.1;
}
return true;
})
Running the Example
1. Set Environment Variables
export MYSQL_DSN="mysql://user:password@localhost:3306/shop"
export KAFKA_BROKERS="localhost:9092"
2. Start DeltaForge
cargo run -p runner -- --config filtered-orders.yaml
3. Insert Test Data
-- This will be captured and transformed
INSERT INTO shop.orders (customer_email, total, status)
VALUES ('alice@example.com', 99.99, 'pending');
-- This will be filtered out (test account)
INSERT INTO shop.orders (customer_email, total, status)
VALUES ('test@test.local', 50.00, 'pending');
-- This will be filtered out (soft-deleted)
INSERT INTO shop.orders (customer_email, total, status, deleted_at)
VALUES ('bob@example.com', 75.00, 'pending', NOW());
-- Audit log - will be filtered out (not error level)
INSERT INTO shop.audit_log (level, message)
VALUES ('info', 'User logged in');
-- Audit log - will be kept (error level)
INSERT INTO shop.audit_log (level, message)
VALUES ('error', 'Payment failed');
4. Verify Filtered Output
./dev.sh k-consume orders.filtered --from-beginning
You should only see:
- Alice’s order (with masked email:
a***e@example.com) - The error-level audit log entry
Performance Considerations
Tip: Filtering early reduces downstream load. If you’re filtering out 50% of events, your sinks process half the data.
processors:
- type: javascript
id: filter
inline: |
function processBatch(events) {
// Filter FIRST, then transform
return events
.filter(e => shouldKeep(e)) // Reduces array size
.map(e => transform(e)); // Processes fewer events
}
limits:
timeout_ms: 500 # Increase if filtering logic is complex
mem_mb: 128 # Increase for large batches
Key Concepts Demonstrated
- Event Filtering: Drop events before they reach sinks
- Conditional Logic: Filter based on table, operation, or field values
- PII Redaction: Mask sensitive data in remaining events
- Sampling: Rate-limit high-volume event streams
Processor Constraints: JavaScript processors can only modify
event.before,event.after, andevent.tags. Arbitrary top-level fields likeevent.filtered_atwould be lost during serialization.
Related Documentation
- Processors - JavaScript processor configuration
- Envelopes - Output format options
- Configuration Reference - Full spec documentation
Schema Sensing and Drift Detection
This example demonstrates DeltaForge’s automatic schema inference for JSON columns and drift detection capabilities.
Overview
| Component | Configuration |
|---|---|
| Source | PostgreSQL logical replication |
| Processor | None |
| Sink | Kafka |
| Feature | Schema sensing with deep JSON inspection |
Use Case
You have a PostgreSQL database with JSON/JSONB columns and want to:
- Automatically discover the structure of JSON payloads
- Detect when JSON schemas change over time (drift)
- Export inferred schemas as JSON Schema for downstream validation
- Monitor schema evolution without manual tracking
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: products-with-sensing
tenant: acme
spec:
source:
type: postgres
config:
id: products-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_products
publication: products_pub
tables:
- public.products
- public.product_variants
start_position: earliest
sinks:
- type: kafka
config:
id: products-kafka
brokers: ${KAFKA_BROKERS}
topic: products.changes
envelope:
type: debezium
encoding: json
required: true
batch:
max_events: 500
max_ms: 1000
respect_source_tx: true
commit_policy:
mode: required
# Schema sensing configuration
schema_sensing:
enabled: true
deep_inspect:
enabled: true
max_depth: 5 # How deep to traverse nested JSON
max_sample_size: 500 # Events to analyze for deep inspection
sampling:
warmup_events: 100 # Analyze every event during warmup
sample_rate: 20 # After warmup, analyze 1 in 20 events
structure_cache: true # Cache seen structures to avoid re-analysis
structure_cache_size: 100
tracking:
detect_drift: true # Enable drift detection
drift_threshold: 0.1 # Alert if >10% of events have new fields
output:
emit_schemas: true # Include schema info in API responses
json_schema_format: draft-07
Table Structure
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
sku TEXT UNIQUE,
price DECIMAL(10,2),
-- JSON columns that schema sensing will analyze
metadata JSONB, -- Product attributes, tags, etc.
specifications JSONB, -- Technical specs
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE product_variants (
id SERIAL PRIMARY KEY,
product_id INTEGER REFERENCES products(id),
variant_name TEXT,
-- Nested JSON with variable structure
attributes JSONB, -- Color, size, material, etc.
pricing JSONB, -- Regional pricing, discounts
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Create publication
CREATE PUBLICATION products_pub FOR TABLE products, product_variants;
Sample Data
-- Insert products with JSON metadata
INSERT INTO products (name, sku, price, metadata, specifications) VALUES
(
'Wireless Headphones',
'WH-1000',
299.99,
'{
"brand": "AudioTech",
"category": "Electronics",
"tags": ["wireless", "bluetooth", "noise-canceling"],
"ratings": {"average": 4.5, "count": 1250}
}',
'{
"battery_life_hours": 30,
"driver_size_mm": 40,
"frequency_response": {"min_hz": 20, "max_hz": 20000},
"connectivity": ["bluetooth", "aux"]
}'
);
-- Insert variant with nested attributes
INSERT INTO product_variants (product_id, variant_name, attributes, pricing) VALUES
(
1,
'Midnight Black',
'{
"color": {"name": "Midnight Black", "hex": "#1a1a2e"},
"material": "Premium Plastic",
"weight_grams": 250
}',
'{
"base_price": 299.99,
"regional": {
"US": {"price": 299.99, "currency": "USD"},
"EU": {"price": 279.99, "currency": "EUR"}
},
"discounts": [
{"code": "SAVE20", "percent": 20, "expires": "2025-12-31"}
]
}'
);
Running the Example
1. Set Environment Variables
export POSTGRES_DSN="postgres://user:password@localhost:5432/shop"
export KAFKA_BROKERS="localhost:9092"
2. Start DeltaForge
cargo run -p runner -- --config products-sensing.yaml
3. Insert Data and Let Sensing Analyze
-- Insert several products to build schema profile
INSERT INTO products (name, sku, price, metadata, specifications) VALUES
('Smart Watch', 'SW-200', 399.99,
'{"brand": "TechWear", "tags": ["fitness", "smart"]}',
'{"battery_days": 7, "water_resistant": true}');
Using the Schema Sensing API
List Inferred Schemas
curl http://localhost:8080/pipelines/products-with-sensing/sensing/schemas
Response:
{
"schemas": [
{
"table": "public.products",
"columns": {
"metadata": {
"type": "object",
"inferred_at": "2025-01-15T10:30:00Z",
"sample_count": 150
},
"specifications": {
"type": "object",
"inferred_at": "2025-01-15T10:30:00Z",
"sample_count": 150
}
}
}
]
}
Get Detailed Schema for a Table
curl http://localhost:8080/pipelines/products-with-sensing/sensing/schemas/public.products
Response:
{
"table": "public.products",
"json_columns": {
"metadata": {
"inferred_schema": {
"type": "object",
"properties": {
"brand": {"type": "string"},
"category": {"type": "string"},
"tags": {"type": "array", "items": {"type": "string"}},
"ratings": {
"type": "object",
"properties": {
"average": {"type": "number"},
"count": {"type": "integer"}
}
}
}
},
"sample_count": 150,
"last_updated": "2025-01-15T10:35:00Z"
}
}
}
Export as JSON Schema
curl http://localhost:8080/pipelines/products-with-sensing/sensing/schemas/public.products/json-schema
Response:
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "public.products.metadata",
"type": "object",
"properties": {
"brand": {"type": "string"},
"category": {"type": "string"},
"tags": {
"type": "array",
"items": {"type": "string"}
},
"ratings": {
"type": "object",
"properties": {
"average": {"type": "number"},
"count": {"type": "integer"}
}
}
}
}
Check Drift Detection
curl http://localhost:8080/pipelines/products-with-sensing/drift
Response:
{
"drift_detected": true,
"tables": {
"public.products": {
"metadata": {
"new_fields": ["promotion", "seasonal"],
"removed_fields": [],
"type_changes": [],
"drift_percentage": 0.15,
"first_seen": "2025-01-15T11:00:00Z"
}
}
}
}
Get Sensing Statistics
curl http://localhost:8080/pipelines/products-with-sensing/sensing/stats
Response:
{
"total_events_analyzed": 1500,
"total_events_sampled": 250,
"cache_hits": 1250,
"cache_misses": 250,
"tables_tracked": 2,
"json_columns_tracked": 4
}
Performance Tuning
Performance tip: Schema sensing can be CPU-intensive. Tune based on your throughput needs.
High-Throughput Configuration
schema_sensing:
enabled: true
deep_inspect:
enabled: true
max_depth: 3 # Limit depth for faster processing
max_sample_size: 200 # Fewer samples
sampling:
warmup_events: 50 # Shorter warmup
sample_rate: 100 # Analyze 1 in 100 events
structure_cache: true
structure_cache_size: 200 # Larger cache
Development/Debugging Configuration
schema_sensing:
enabled: true
deep_inspect:
enabled: true
max_depth: 10 # Full depth
max_sample_size: 1000 # More samples
sampling:
warmup_events: 500 # Longer warmup
sample_rate: 1 # Analyze every event
Key Concepts Demonstrated
- Automatic Schema Inference: Discover JSON structure without manual definition
- Deep JSON Inspection: Traverse nested objects and arrays
- Drift Detection: Alert when schemas change unexpectedly
- JSON Schema Export: Generate standard schemas for validation
- Sampling Strategy: Balance accuracy vs. performance
Related Documentation
- Schema Sensing - Detailed schema sensing documentation
- PostgreSQL Source - Replication setup
- Configuration Reference - Full sensing options
- API Reference - All sensing endpoints
Production Kafka Configuration
This example demonstrates a production-ready Kafka sink configuration with authentication, high availability settings, and some performance tuning.
Overview
| Component | Configuration |
|---|---|
| Source | PostgreSQL logical replication |
| Processor | None |
| Sink | Kafka with SASL/SSL authentication |
| Pattern | Production-grade reliability |
Use Case
You’re deploying DeltaForge to production and need:
- Secure authentication (SASL/SCRAM or mTLS)
- High availability with proper acknowledgment settings
- Optimal batching and compression for throughput
- Exactly-once semantics for critical data
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-to-kafka-prod
tenant: acme
spec:
source:
type: postgres
config:
id: orders-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_orders
publication: orders_pub
tables:
- public.orders
- public.order_items
- public.payments
start_position: earliest
sinks:
- type: kafka
config:
id: orders-kafka
brokers: ${KAFKA_BROKERS}
topic: orders.cdc.events
envelope:
type: debezium
encoding: json
required: true
# enable exactly-once semantics
exactly_once: true
# timeout for individual sends
send_timeout_secs: 60
# librdkafka client configuration
client_conf:
# security - SASL/SCRAM authentication
security.protocol: "SASL_SSL"
sasl.mechanism: "SCRAM-SHA-512"
sasl.username: "${KAFKA_USERNAME}"
sasl.password: "${KAFKA_PASSWORD}"
# SSL/TLS configuration
ssl.ca.location: "/etc/ssl/certs/kafka-ca.pem"
ssl.endpoint.identification.algorithm: "https"
# reliability - wait for all replicas
acks: "all"
# idempotence (required for exactly-once)
enable.idempotence: "true"
# retries and timeouts
retries: "2147483647"
retry.backoff.ms: "100"
delivery.timeout.ms: "300000"
request.timeout.ms: "30000"
# kafka batching for throughput
batch.size: "65536"
linger.ms: "10"
# compression
compression.type: "lz4"
# buffer management
queue.buffering.max.messages: "100000"
queue.buffering.max.kbytes: "1048576"
batch:
max_events: 1000
max_bytes: 1048576
max_ms: 100
respect_source_tx: true
max_inflight: 4
commit_policy:
mode: required
Security Configurations
SASL/SCRAM (Username/Password)
client_conf:
security.protocol: "SASL_SSL"
sasl.mechanism: "SCRAM-SHA-512"
sasl.username: "${KAFKA_USERNAME}"
sasl.password: "${KAFKA_PASSWORD}"
ssl.ca.location: "/etc/ssl/certs/ca.pem"
mTLS (Mutual TLS)
client_conf:
security.protocol: "SSL"
ssl.ca.location: "/etc/ssl/certs/kafka-ca.pem"
ssl.certificate.location: "/etc/ssl/certs/client.pem"
ssl.key.location: "/etc/ssl/private/client.key"
ssl.key.password: "${SSL_KEY_PASSWORD}"
AWS MSK with IAM
client_conf:
security.protocol: "SASL_SSL"
sasl.mechanism: "AWS_MSK_IAM"
sasl.jaas.config: "software.amazon.msk.auth.iam.IAMLoginModule required;"
sasl.client.callback.handler.class: "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
Confluent Cloud
client_conf:
security.protocol: "SASL_SSL"
sasl.mechanism: "PLAIN"
sasl.username: "${CONFLUENT_API_KEY}"
sasl.password: "${CONFLUENT_API_SECRET}"
Performance Tuning
High Throughput
Optimize for maximum events per second:
client_conf:
acks: "1" # Leader-only ack (faster, less safe)
batch.size: "131072" # 128KB batches
linger.ms: "50" # Wait longer to fill batches
compression.type: "lz4" # Fast compression
batch:
max_events: 5000 # Larger batches
max_ms: 200 # More time to accumulate
max_inflight: 8 # More concurrent requests
Low Latency
Optimize for minimal delay:
client_conf:
acks: "1" # Don't wait for replicas
batch.size: "16384" # Smaller batches
linger.ms: "0" # Send immediately
compression.type: "none" # Skip compression
batch:
max_events: 100 # Smaller batches
max_ms: 10 # Flush quickly
max_inflight: 2 # Limit in-flight
Maximum Durability
Optimize for zero data loss:
client_conf:
acks: "all" # All replicas must ack
enable.idempotence: "true" # Prevent duplicates
max.in.flight.requests.per.connection: "5" # Required for idempotence
retries: "2147483647" # Infinite retries
exactly_once: true # Transactional producer
batch:
respect_source_tx: true # Preserve source transactions
max_inflight: 1 # Strict ordering
Running the Example
1. Set Environment Variables
export POSTGRES_DSN="postgres://user:password@localhost:5432/orders"
export KAFKA_BROKERS="kafka1:9093,kafka2:9093,kafka3:9093"
export KAFKA_USERNAME="deltaforge"
export KAFKA_PASSWORD="secret"
2. Create Kafka Topic
kafka-topics.sh --create \
--topic orders.cdc.events \
--partitions 12 \
--replication-factor 3 \
--config min.insync.replicas=2 \
--bootstrap-server ${KAFKA_BROKERS}
3. Start DeltaForge
cargo run -p runner --release -- --config kafka-prod.yaml
Monitoring
Key Metrics to Watch
deltaforge_sink_events_sent_total— Events delivereddeltaforge_sink_send_latency_seconds— Delivery latencydeltaforge_sink_errors_total— Delivery failuresdeltaforge_checkpoint_lag_events— Events pending checkpoint
Health Check
curl http://localhost:8080/health
Pipeline Status
curl http://localhost:8080/pipelines/orders-to-kafka-prod
Key Concepts Demonstrated
- SASL/SSL Authentication: Secure broker connections
- Exactly-Once Semantics: Transactional producer for no duplicates
- Acknowledgment Modes: Trade-off between durability and latency
- Batching & Compression: Optimize throughput
- Production Tuning: Real-world configuration patterns
Related Documentation
- Kafka Sink - Full configuration reference
- Envelopes - Output format options
- Observability - Metrics and monitoring
- Configuration Reference - Full spec documentation
Redis Cache Invalidation
This example demonstrates streaming database changes to a Redis stream where a worker can consume them to invalidate cache entries, ensuring cache consistency without polling.
Overview
| Component | Configuration |
|---|---|
| Source | MySQL binlog CDC |
| Processor | JavaScript cache key generator |
| Sink | Redis Streams |
| Pattern | CDC-driven cache invalidation |
Use Case
You have a MySQL database with Redis caching and want to:
- Stream change events that trigger cache invalidation
- Avoid stale cache issues without TTL-based expiration
- Generate cache keys matching your application’s key format
Architecture
┌─────────┐ ┌─────────────┐ ┌────────────────┐ ┌─────────┐
│ MySQL │────>│ DeltaForge │────>│ Redis Stream │────>│ Worker │
│ │ │ │ │ (invalidations)│ │ │
└─────────┘ └─────────────┘ └────────────────┘ └────┬────┘
│
┌───────────────┐ │
│ Redis Cache │<──────────┘
│ (DEL keys) │
└───────────────┘
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: cache-invalidation
tenant: acme
spec:
source:
type: mysql
config:
id: app-mysql
dsn: ${MYSQL_DSN}
tables:
- app.users
- app.products
- app.orders
- app.inventory
processors:
- type: javascript
id: generate-cache-keys
inline: |
function processBatch(events) {
return events.map(event => {
const keys = generateCacheKeys(event);
const strategy = getStrategy(event);
// Store cache keys in event.after metadata
// (we can modify event.after since it's a JSON Value)
if (event.after) {
event.after._cache_keys = keys;
event.after._invalidation_strategy = strategy;
} else if (event.before) {
// For deletes, add to before
event.before._cache_keys = keys;
event.before._invalidation_strategy = strategy;
}
// Add tags for routing/filtering
event.tags = (event.tags || []).concat([
'cache:invalidate',
`strategy:${strategy}`,
`keys:${keys.length}`
]);
return event;
});
}
function generateCacheKeys(event) {
const table = event.table.split('.')[1];
const keys = [];
const record = event.after || event.before;
if (!record || !record.id) return keys;
const id = record.id;
switch (table) {
case 'users':
keys.push(`user:${id}`);
if (record.email) {
keys.push(`user:email:${record.email}`);
}
// If email changed, invalidate old email key
if (event.before && event.before.email &&
event.before.email !== record.email) {
keys.push(`user:email:${event.before.email}`);
}
keys.push(`user:${id}:orders`);
keys.push(`user:${id}:profile`);
break;
case 'products':
keys.push(`product:${id}`);
keys.push(`product:${id}:details`);
if (record.category_id) {
keys.push(`category:${record.category_id}:products`);
}
break;
case 'orders':
keys.push(`order:${id}`);
if (record.user_id) {
keys.push(`user:${record.user_id}:orders`);
}
break;
case 'inventory':
keys.push(`inventory:${record.product_id}`);
keys.push(`product:${record.product_id}:stock`);
break;
}
return keys;
}
function getStrategy(event) {
const table = event.table.split('.')[1];
if (table === 'inventory') return 'immediate';
if (event.op === 'd') return 'immediate';
return 'batched';
}
limits:
timeout_ms: 500
mem_mb: 128
sinks:
- type: redis
config:
id: invalidation-stream
uri: ${REDIS_URI}
stream: cache:invalidations
envelope:
type: native
encoding: json
required: true
batch:
max_events: 100
max_ms: 100
respect_source_tx: true
commit_policy:
mode: required
JavaScript Processor Constraints
Important: The processor stores cache keys in
event.after._cache_keys(orevent.beforefor deletes) because we can only modify existing Event fields. Arbitrary top-level fields likeevent.cache_keyswould be lost.
Cache Worker (Consumer)
// cache-invalidation-worker.js
const Redis = require('ioredis');
const redis = new Redis(process.env.REDIS_URI);
const streamKey = 'cache:invalidations';
const consumerGroup = 'cache-workers';
const consumerId = `worker-${process.pid}`;
async function processInvalidations() {
try {
await redis.xgroup('CREATE', streamKey, consumerGroup, '0', 'MKSTREAM');
} catch (e) {
if (!e.message.includes('BUSYGROUP')) throw e;
}
console.log(`Starting cache invalidation worker: ${consumerId}`);
while (true) {
try {
const results = await redis.xreadgroup(
'GROUP', consumerGroup, consumerId,
'COUNT', 100, 'BLOCK', 5000,
'STREAMS', streamKey, '>'
);
if (!results) continue;
for (const [stream, messages] of results) {
for (const [id, fields] of messages) {
const event = JSON.parse(fields[1]);
await invalidateKeys(event);
await redis.xack(streamKey, consumerGroup, id);
}
}
} catch (error) {
console.error('Worker error:', error);
await new Promise(r => setTimeout(r, 1000));
}
}
}
async function invalidateKeys(event) {
// Get cache keys from the event payload
const record = event.after || event.before;
const cacheKeys = record?._cache_keys || [];
if (cacheKeys.length === 0) return;
for (const key of cacheKeys) {
if (key.includes('*')) {
await scanAndDelete(key);
} else {
await redis.del(key);
}
}
console.log(`Invalidated ${cacheKeys.length} keys for ${event.table}`);
}
async function scanAndDelete(pattern) {
let cursor = '0';
do {
const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100);
cursor = nextCursor;
if (keys.length > 0) {
await redis.del(...keys);
}
} while (cursor !== '0');
}
processInvalidations().catch(console.error);
Running the Example
1. Set Environment Variables
export MYSQL_DSN="mysql://user:password@localhost:3306/app"
export REDIS_URI="redis://localhost:6379"
2. Start DeltaForge
cargo run -p runner -- --config cache-invalidation.yaml
3. Start Worker(s)
node cache-invalidation-worker.js
4. Test Invalidation
UPDATE app.users SET email = 'alice.new@example.com' WHERE id = 1;
Worker output:
Invalidated 5 keys for app.users
Key Concepts Demonstrated
- CDC to Stream: DeltaForge captures changes and writes to Redis Streams
- Custom Key Generation: Processor computes cache keys for downstream worker
- Consumer Groups: Scalable worker processing with acknowledgments
- Before/After Diffing: Compute invalidation keys for both old and new values
Note: DeltaForge streams events to Redis; a separate worker (shown above) consumes them and performs the actual cache invalidation.
Related Documentation
- MySQL Source - Binlog configuration
- Redis Sink - Stream configuration
- Processors - JavaScript processor options
Audit Trail and Compliance Logging
This example demonstrates building an immutable audit trail for compliance requirements (SOC2, HIPAA, GDPR) by capturing all database changes with full context.
Overview
| Component | Configuration |
|---|---|
| Source | PostgreSQL logical replication |
| Processor | JavaScript audit enrichment |
| Sink | Kafka (durable audit log) |
| Pattern | Compliance-grade change tracking |
Use Case
You need to meet compliance requirements and want to:
- Capture every change to sensitive tables with before/after values
- Add audit metadata (classification, retention, regulations)
- Redact sensitive fields while preserving change records
- Store immutable audit logs for required retention periods
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: compliance-audit-trail
tenant: acme
spec:
source:
type: postgres
config:
id: app-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_audit
publication: audit_pub
tables:
- public.users
- public.user_profiles
- public.payment_methods
- public.transactions
- public.roles
- public.permissions
- public.user_roles
start_position: earliest
processors:
- type: javascript
id: audit-enrichment
inline: |
function processBatch(events) {
return events.map(event => {
const table = event.table.split('.')[1];
// Build audit tags (event.tags is a valid Event field)
const auditTags = [
'audited',
`sensitivity:${classifySensitivity(table)}`,
`classification:${getDataClassification(table)}`,
`retention:${getRetentionPeriod(table)}d`
];
// Add regulation tags
for (const reg of getApplicableRegulations(table)) {
auditTags.push(`regulation:${reg}`);
}
// Track changed fields for updates
if (event.before && event.after) {
const changed = detectChangedFields(event.before, event.after);
for (const field of changed) {
auditTags.push(`changed:${field}`);
}
}
event.tags = (event.tags || []).concat(auditTags);
// Redact sensitive fields in before/after
if (event.before) {
event.before = sanitizeRecord(event.before, table);
}
if (event.after) {
event.after = sanitizeRecord(event.after, table);
}
return event;
});
}
function classifySensitivity(table) {
const highSensitivity = ['users', 'payment_methods', 'transactions'];
const mediumSensitivity = ['user_profiles', 'user_roles'];
if (highSensitivity.includes(table)) return 'HIGH';
if (mediumSensitivity.includes(table)) return 'MEDIUM';
return 'LOW';
}
function sanitizeRecord(record, table) {
if (!record) return null;
const sanitized = { ...record };
const sensitiveFields = {
'users': ['password_hash', 'ssn', 'tax_id'],
'payment_methods': ['card_number', 'cvv', 'account_number'],
'user_profiles': ['date_of_birth']
};
const fieldsToMask = sensitiveFields[table] || [];
for (const field of fieldsToMask) {
if (sanitized[field] !== undefined) {
sanitized[`_${field}_redacted`] = true;
sanitized[field] = '[REDACTED]';
}
}
return sanitized;
}
function detectChangedFields(before, after) {
const changed = [];
const allKeys = new Set([...Object.keys(before), ...Object.keys(after)]);
for (const key of allKeys) {
if (JSON.stringify(before[key]) !== JSON.stringify(after[key])) {
changed.push(key);
}
}
return changed;
}
function getRetentionPeriod(table) {
if (['transactions', 'payment_methods'].includes(table)) return 2555;
if (['users', 'user_profiles'].includes(table)) return 2190;
return 1095;
}
function getDataClassification(table) {
if (['payment_methods', 'transactions'].includes(table)) return 'PCI';
if (['users', 'user_profiles'].includes(table)) return 'PII';
return 'INTERNAL';
}
function getApplicableRegulations(table) {
const regs = [];
if (['users', 'user_profiles'].includes(table)) regs.push('GDPR', 'CCPA');
if (['payment_methods', 'transactions'].includes(table)) regs.push('PCI-DSS', 'SOX');
return regs;
}
limits:
timeout_ms: 1000
mem_mb: 256
sinks:
- type: kafka
config:
id: audit-kafka
brokers: ${KAFKA_BROKERS}
topic: audit.trail.events
envelope:
type: debezium
encoding: json
required: true
exactly_once: true
client_conf:
acks: "all"
enable.idempotence: "true"
compression.type: "gzip"
batch:
max_events: 500
max_bytes: 1048576
max_ms: 1000
respect_source_tx: true
commit_policy:
mode: required
JavaScript Processor Constraints
Important: The JavaScript processor can only modify fields that exist on DeltaForge’s
Eventstruct. You can:
- Modify
event.beforeandevent.aftervalues (JSON objects)- Set
event.tags(array of strings)- Filter out events (return empty array)
You cannot add arbitrary top-level fields like
event.auditorevent.metadata- they will be lost during serialization. This limitation will be addressed soon.
This example uses event.tags to store audit metadata as key:value strings that downstream systems can parse.
PostgreSQL Setup
-- Create publication for audited tables
CREATE PUBLICATION audit_pub FOR TABLE
public.users,
public.user_profiles,
public.payment_methods,
public.transactions,
public.roles,
public.permissions,
public.user_roles
WITH (publish = 'insert, update, delete');
-- Enable REPLICA IDENTITY FULL to capture before values on updates
ALTER TABLE public.users REPLICA IDENTITY FULL;
ALTER TABLE public.user_profiles REPLICA IDENTITY FULL;
ALTER TABLE public.payment_methods REPLICA IDENTITY FULL;
ALTER TABLE public.transactions REPLICA IDENTITY FULL;
Sample Audit Event Output
With Debezium envelope:
{
"payload": {
"before": {
"id": 42,
"email": "alice@old-domain.com",
"name": "Alice Smith",
"password_hash": "[REDACTED]",
"_password_hash_redacted": true
},
"after": {
"id": 42,
"email": "alice@new-domain.com",
"name": "Alice Smith",
"password_hash": "[REDACTED]",
"_password_hash_redacted": true
},
"source": {
"version": "0.1.0",
"connector": "postgres",
"name": "app-postgres",
"ts_ms": 1705312199123,
"db": "app",
"schema": "public",
"table": "users"
},
"op": "u",
"ts_ms": 1705312200000,
"tags": [
"audited",
"sensitivity:HIGH",
"classification:PII",
"retention:2190d",
"regulation:GDPR",
"regulation:CCPA",
"changed:email"
]
}
}
Parsing Audit Tags
Downstream consumers can parse the structured tags:
// Parse audit tags into an object
function parseAuditTags(tags) {
const audit = {};
for (const tag of tags || []) {
if (tag.includes(':')) {
const [key, value] = tag.split(':', 2);
if (audit[key]) {
if (!Array.isArray(audit[key])) {
audit[key] = [audit[key]];
}
audit[key].push(value);
} else {
audit[key] = value;
}
} else {
audit[tag] = true;
}
}
return audit;
}
// Example: parseAuditTags(event.tags)
// Returns: {
// audited: true,
// sensitivity: "HIGH",
// classification: "PII",
// retention: "2190d",
// regulation: ["GDPR", "CCPA"],
// changed: "email"
// }
Kafka Topic Configuration
kafka-topics.sh --create \
--topic audit.trail.events \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=189216000000 \
--config cleanup.policy=delete \
--config min.insync.replicas=2 \
--config compression.type=gzip \
--bootstrap-server ${KAFKA_BROKERS}
Running the Example
1. Set Environment Variables
export POSTGRES_DSN="postgres://user:password@localhost:5432/app"
export KAFKA_BROKERS="kafka1:9092,kafka2:9092,kafka3:9092"
2. Start DeltaForge
cargo run -p runner -- --config audit-trail.yaml
3. Verify Audit Events
./dev.sh k-consume audit.trail.events --from-beginning
Querying Audit Data
-- Find high-sensitivity changes
SELECT * FROM audit_events
WHERE ARRAY_CONTAINS(payload.tags, 'sensitivity:HIGH');
-- Find all email changes
SELECT * FROM audit_events
WHERE ARRAY_CONTAINS(payload.tags, 'changed:email');
-- Find PCI-regulated changes
SELECT * FROM audit_events
WHERE ARRAY_CONTAINS(payload.tags, 'regulation:PCI-DSS');
Key Concepts Demonstrated
- Full Change Capture: Before and after values with REPLICA IDENTITY FULL
- PII Redaction: Sensitive fields masked, presence tracked via
_field_redacted - Tag-Based Metadata: Audit info stored in
event.tagsas parseable strings - Immutable Storage: Exactly-once delivery to append-only Kafka log
- Compliance Tagging: Retention periods, classifications, regulations as tags
Related Documentation
- PostgreSQL Source - Logical replication setup
- Kafka Sink - Exactly-once and durability settings
- Processors - JavaScript processor constraints
- Envelopes - Output format options
Real-Time Analytics Preprocessing Pipeline
This example demonstrates preparing CDC events for real-time analytics by enriching events with dimensions, metrics, and routing tags (pre-processing).
Overview
| Component | Configuration |
|---|---|
| Source | MySQL binlog CDC |
| Processor | JavaScript analytics enrichment |
| Sinks | Kafka (stream processing) + Redis (real-time counters) |
| Pattern | Analytics-ready event preparation |
Use Case
You have an e-commerce MySQL database and want to:
- Stream order events to real-time dashboards
- Feed a worker that maintains live counters in Redis
- Prepare events for stream processing (Flink, Spark Streaming)
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: ecommerce-analytics
tenant: acme
spec:
source:
type: mysql
config:
id: ecommerce-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.order_items
- shop.payments
- shop.cart_events
processors:
- type: javascript
id: analytics-enrichment
inline: |
function processBatch(events) {
return events.map(event => {
const table = event.table.split('.')[1];
const record = event.after || event.before;
// Add analytics metadata to event.after
// (we can modify event.after since it's a JSON Value)
if (event.after) {
event.after._analytics = {
event_type: `${table}.${mapOperation(event.op)}`,
dimensions: extractDimensions(table, record, event),
metrics: extractMetrics(table, record)
};
}
// Add routing tags
event.tags = generateTags(table, record, event);
return event;
});
}
function mapOperation(op) {
return { 'c': 'created', 'u': 'updated', 'd': 'deleted', 'r': 'snapshot' }[op] || op;
}
function extractDimensions(table, record, event) {
if (!record) return {};
const dims = {
hour_of_day: new Date(event.timestamp).getUTCHours(),
day_of_week: new Date(event.timestamp).getUTCDay()
};
switch (table) {
case 'orders':
dims.customer_id = record.customer_id;
dims.status = record.status;
dims.channel = record.channel || 'web';
break;
case 'order_items':
dims.order_id = record.order_id;
dims.product_id = record.product_id;
break;
case 'payments':
dims.order_id = record.order_id;
dims.payment_method = record.method;
break;
}
return dims;
}
function extractMetrics(table, record) {
if (!record) return {};
const metrics = { event_count: 1 };
switch (table) {
case 'orders':
metrics.order_total = parseFloat(record.total) || 0;
metrics.item_count = parseInt(record.item_count) || 0;
break;
case 'order_items':
metrics.quantity = parseInt(record.quantity) || 1;
metrics.line_total = parseFloat(record.line_total) || 0;
break;
case 'payments':
metrics.payment_amount = parseFloat(record.amount) || 0;
break;
}
return metrics;
}
function generateTags(table, record, event) {
const tags = [table, event.op];
if (!record) return tags;
if (table === 'orders' && record.total > 500) {
tags.push('high_value');
}
if (record.status) {
tags.push(`status:${record.status}`);
}
return tags;
}
limits:
timeout_ms: 500
mem_mb: 256
sinks:
- type: kafka
config:
id: analytics-kafka
brokers: ${KAFKA_BROKERS}
topic: analytics.events
envelope:
type: native
encoding: json
required: true
client_conf:
compression.type: "lz4"
- type: redis
config:
id: realtime-redis
uri: ${REDIS_URI}
stream: analytics:realtime
envelope:
type: native
encoding: json
required: false
batch:
max_events: 500
max_bytes: 1048576
max_ms: 100
respect_source_tx: false
commit_policy:
mode: required
JavaScript Processor Constraints
Important: Analytics metadata is stored in
event.after._analyticsbecause the processor can only modify existing Event fields (before,after,tags). Arbitrary top-level fields would be lost during serialization.
Sample Event Output
{
"before": null,
"after": {
"id": 98765,
"customer_id": 12345,
"total": 299.99,
"status": "pending",
"channel": "mobile",
"_analytics": {
"event_type": "orders.created",
"dimensions": {
"hour_of_day": 10,
"day_of_week": 3,
"customer_id": 12345,
"status": "pending",
"channel": "mobile"
},
"metrics": {
"event_count": 1,
"order_total": 299.99,
"item_count": 3
}
}
},
"source": {
"connector": "mysql",
"db": "shop",
"table": "orders"
},
"op": "c",
"ts_ms": 1705312200000,
"tags": ["orders", "c", "status:pending"]
}
Redis Counter Worker
const Redis = require('ioredis');
const redis = new Redis(process.env.REDIS_URI);
async function processAnalyticsEvents() {
let lastId = '0';
while (true) {
const results = await redis.xread(
'COUNT', 100, 'BLOCK', 1000,
'STREAMS', 'analytics:realtime', lastId
);
if (!results) continue;
for (const [stream, messages] of results) {
for (const [id, fields] of messages) {
const event = JSON.parse(fields[1]);
await updateCounters(event);
lastId = id;
}
}
}
}
async function updateCounters(event) {
const pipe = redis.pipeline();
const now = new Date();
const hourKey = `${now.getUTCFullYear()}:${now.getUTCMonth()+1}:${now.getUTCDate()}:${now.getUTCHours()}`;
const dayKey = `${now.getUTCFullYear()}:${now.getUTCMonth()+1}:${now.getUTCDate()}`;
// Get analytics from event.after
const analytics = event.after?._analytics || {};
const eventType = analytics.event_type || `${event.table}.${event.op}`;
const metrics = analytics.metrics || {};
const dimensions = analytics.dimensions || {};
// Event counts
pipe.hincrby(`stats:events:${hourKey}`, eventType, 1);
// Order-specific counters
if (eventType === 'orders.created') {
pipe.incr(`stats:orders:count:${hourKey}`);
pipe.incrbyfloat(`stats:orders:revenue:${hourKey}`, metrics.order_total || 0);
if (dimensions.channel) {
pipe.hincrby(`stats:orders:channel:${dayKey}`, dimensions.channel, 1);
}
}
// High-value alerts
if (event.tags?.includes('high_value')) {
pipe.lpush('alerts:high_value_orders', JSON.stringify({
order_id: event.after?.id,
total: metrics.order_total
}));
pipe.ltrim('alerts:high_value_orders', 0, 99);
}
pipe.expire(`stats:events:${hourKey}`, 90000);
await pipe.exec();
}
processAnalyticsEvents().catch(console.error);
Running the Example
1. Set Environment Variables
export MYSQL_DSN="mysql://user:password@localhost:3306/shop"
export KAFKA_BROKERS="localhost:9092"
export REDIS_URI="redis://localhost:6379"
2. Create Kafka Topic
./dev.sh k-create analytics.events 12
3. Start DeltaForge
cargo run -p runner -- --config analytics-pipeline.yaml
4. Start Counter Worker
node realtime-counter-worker.js
Key Concepts Demonstrated
- Event Enrichment: Add dimensions/metrics in
event.after._analytics - Tag-Based Filtering: Use
event.tagsfor high-value order detection - Multi-Sink Fan-Out: Kafka for stream processing + Redis for worker consumption
- Worker Pattern: Separate worker consumes Redis stream to update counters
Related Documentation
- MySQL Source - Binlog configuration
- Kafka Sink - Producer settings
- Redis Sink - Stream configuration
Outbox Pattern
This example demonstrates the transactional outbox pattern - writing business data and a domain event in the same database transaction, then streaming the event to Kafka via DeltaForge.
Overview
| Component | Configuration |
|---|---|
| Source | MySQL binlog CDC |
| Processor | Outbox (extract + route) |
| Sink | Kafka (per-aggregate topics) |
| Pattern | Transactional outbox with raw payload delivery |
Use Case
Your application writes orders and needs to publish OrderCreated, OrderShipped, etc. events to Kafka. You want:
- Atomicity: event published if and only if the transaction commits
- Clean payloads: consumers receive the application’s JSON, not CDC envelopes
- Per-aggregate routing: events land on
Order.OrderCreated,Payment.PaymentReceived, etc. - Zero polling: DeltaForge tails the binlog, no application-side outbox relay
Database Setup
-- Business table
CREATE TABLE orders (
id INT AUTO_INCREMENT PRIMARY KEY,
customer VARCHAR(64),
total DECIMAL(10,2),
status VARCHAR(32),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Outbox table (BLACKHOLE = binlog only, no disk storage)
CREATE TABLE outbox (
id INT AUTO_INCREMENT PRIMARY KEY,
aggregate_type VARCHAR(64),
aggregate_id VARCHAR(64),
event_type VARCHAR(64),
payload JSON
) ENGINE=BLACKHOLE;
Application Code
Write both in the same transaction:
BEGIN;
INSERT INTO orders (customer, total, status)
VALUES ('alice', 149.99, 'pending');
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES (
'Order',
LAST_INSERT_ID(),
'OrderCreated',
JSON_OBJECT('customer', 'alice', 'total', 149.99, 'status', 'pending')
);
COMMIT;
If the transaction rolls back, neither the order nor the event exist. If it commits, both do.
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-outbox
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.outbox
outbox:
tables: ["shop.outbox"]
processors:
- type: outbox
topic: "${aggregate_type}.${event_type}"
default_topic: events.unrouted
raw_payload: true
sinks:
- type: kafka
config:
id: events-kafka
brokers: ${KAFKA_BROKERS}
topic: "cdc.${source.db}.${source.table}"
envelope:
type: debezium
encoding: json
required: true
batch:
max_events: 500
max_ms: 500
respect_source_tx: true
commit_policy:
mode: required
What lands on Kafka
Outbox event –> topic Order.OrderCreated:
{"customer": "alice", "total": 149.99, "status": "pending"}
Raw payload, no envelope. Kafka headers carry metadata: df-aggregate-type: Order, df-aggregate-id: 1, df-event-type: OrderCreated.
CDC event –> topic cdc.shop.orders (from sink’s topic template):
{
"payload": {
"before": null,
"after": {"id": 1, "customer": "alice", "total": 149.99, "status": "pending"},
"source": {"connector": "mysql", "db": "shop", "table": "orders"},
"op": "c",
"ts_ms": 1700000000000
}
}
Full Debezium envelope. Both flow through the same pipeline - the outbox processor only touches events tagged as outbox.
Running the Example
1. Start Infrastructure
./dev.sh up
./dev.sh k-create Order.OrderCreated 3
./dev.sh k-create Order.OrderShipped 3
./dev.sh k-create cdc.shop.orders 3
2. Set Environment Variables
export MYSQL_DSN="mysql://deltaforge:dfpw@localhost:3306/shop"
export KAFKA_BROKERS="localhost:9092"
3. Start DeltaForge
cargo run -p runner -- --config outbox.yaml
4. Insert Test Data
BEGIN;
INSERT INTO shop.orders (customer, total, status) VALUES ('alice', 149.99, 'pending');
INSERT INTO shop.outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES ('Order', LAST_INSERT_ID(), 'OrderCreated',
'{"customer":"alice","total":149.99,"status":"pending"}');
COMMIT;
5. Verify
# Outbox event (raw payload, per-aggregate topic)
./dev.sh k-inspect Order.OrderCreated
# CDC event (Debezium envelope, per-table topic)
./dev.sh k-inspect cdc.shop.orders
Variations
PostgreSQL with WAL Messages
PostgreSQL doesn’t need an outbox table - write directly to the WAL:
BEGIN;
INSERT INTO orders (customer, total, status) VALUES ('alice', 149.99, 'pending');
SELECT pg_logical_emit_message(
true, 'outbox',
'{"aggregate_type":"Order","aggregate_id":"1","event_type":"OrderCreated","payload":{"customer":"alice","total":149.99}}'
);
COMMIT;
source:
type: postgres
config:
id: orders-pg
dsn: ${POSTGRES_DSN}
slot: deltaforge_orders
publication: orders_pub
tables: [public.orders]
outbox:
prefixes: [outbox]
No table, no index, no vacuum - just a WAL entry.
Multiple Outbox Channels
Route order events and payment events to different topic hierarchies:
processors:
- type: outbox
tables: [orders_outbox]
topic: "orders.${event_type}"
raw_payload: true
- type: outbox
tables: [payments_outbox]
topic: "payments.${event_type}"
raw_payload: true
columns:
payload: data
Additional Headers for Tracing
Forward trace and correlation IDs as Kafka headers:
processors:
- type: outbox
topic: "${aggregate_type}.${event_type}"
raw_payload: true
additional_headers:
x-trace-id: trace_id
x-correlation-id: correlation_id
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload, trace_id, correlation_id)
VALUES ('Order', '42', 'OrderCreated', '{"total":99.99}', 'abc-123', 'req-456');
Migrating from Debezium
If your outbox table uses Debezium’s column names (aggregatetype, aggregateid, type):
processors:
- type: outbox
topic: "${aggregatetype}.${type}"
raw_payload: true
columns:
aggregate_type: aggregatetype
aggregate_id: aggregateid
event_type: type
additional_headers:
x-trace-id: traceid
x-tenant: tenant
Column mappings control header extraction. The topic template uses raw column names directly.
Key Concepts Demonstrated
- Transactional outbox: atomicity without distributed transactions
- BLACKHOLE engine: binlog-only storage for zero-cost outbox tables on MySQL
- Raw payload delivery:
raw_payload: truebypasses envelope wrapping - consumers get exactly what the application wrote - Mixed pipeline: outbox events and CDC events coexist in the same pipeline with different serialization
- Per-aggregate routing: topic template routes events by
aggregate_typeandevent_type
Related Documentation
- Outbox Pattern - Full outbox reference with configuration details
- MySQL Source - Binlog prerequisites
- PostgreSQL Source - WAL message setup
- Dynamic Routing - Template syntax for topic/key resolution
- Envelopes - Native, Debezium, CloudEvents formats
Troubleshooting
Common issues and quick checks when running DeltaForge.
- 🩺 Health-first: start with
/healthand/readyto pinpoint failing components.
Runner fails to start
- Confirm the config path passed to
--configexists and is readable. - Validate YAML syntax and that required fields like
metadata.nameandspec.sourceare present. - Ensure environment variables referenced in the spec are set (
dsn,brokers,uri, etc.).
Pipelines remain unready
- Check the
/readyendpoint for per-pipeline status and error messages. - Verify upstream credentials allow replication (MySQL binlog). Other engines are experimental unless explicitly documented.
- Inspect sink connectivity; a required sink that cannot connect will block checkpoints.
Slow throughput
- Increase
batch.max_eventsorbatch.max_bytesto reduce flush frequency. - Adjust
max_inflightto allow more concurrent batches if sinks can handle parallelism. - Reduce processor work or add guardrails (
limits) to prevent slow JavaScript from stalling the pipeline.
Checkpoints not advancing
- Review the commit policy:
mode: allorrequiredsinks that are unavailable will block progress. - Look for sink-specific errors (for example, Kafka broker unreachability or Redis backpressure).
- Pause and resume the pipeline to force a clean restart after addressing the underlying issue.
/health returns 503
A 503 means at least one pipeline has entered a permanently failed state — it will not recover on its own. Common causes:
| Cause | Log message | Resolution |
|---|---|---|
| Failover to a server with no GTID overlap | position lost after failover | Re-snapshot from the new primary |
RESET BINARY LOGS AND GTIDS run on same server | checkpoint GTID set no longer reachable | Clear the checkpoint DB and re-snapshot |
| Unrecoverable source error | run task ended with error | Check source logs; fix the root cause and restart |
Use GET /pipelines to see which pipeline has "status": "failed" and check its logs for the specific error. After fixing the root cause, restart the DeltaForge process (or the container) to reset pipeline state.
Avro encoding issues
Schema Registry connection failed
WARN Schema Registry unavailable — using cached schema
DeltaForge is encoding with a previously cached schema ID. Events are still flowing, but new schema registrations (e.g., after DDL changes) will fail until the SR recovers. Check:
- Schema Registry connectivity:
curl http://<sr-url>/subjects - Network/firewall between DeltaForge and the SR
- Monitor
deltaforge_avro_sr_cache_fallback_total— if increasing, the SR is unreachable
Schema registration rejected (compatibility failure)
WARN DDL change for {table} produced incompatible Avro schema — encoding with previous version
The source table DDL changed and the new Avro schema was rejected by the Schema Registry’s compatibility rules. DeltaForge attempts to encode with the previous schema version:
- If encoding succeeds: events continue flowing (the old schema still covers the new data)
- If encoding fails: events are routed to DLQ with
schema_mismatcherror
Resolution: Either relax the SR subject compatibility mode (e.g., NONE or FORWARD) or handle the DLQ entries after updating consumer schemas.
Events routed to DLQ with schema_mismatch
A DDL change produced events that can’t be encoded under the cached schema. This happens when a non-backward-compatible change is made (e.g., column type change, NOT NULL added to existing column).
Resolution:
- Check
deltaforge_avro_encode_failure_total{reason="schema_mismatch"} - Identify the DDL change from source DB logs
- Update the SR subject compatibility if needed
- Restart the pipeline to clear the schema cache and re-register
BIGINT UNSIGNED overflow warning
WARN BIGINT UNSIGNED column {col} mapped to long — values >= 2^63 will fail encoding
Only appears when unsigned_bigint_mode: long is configured. If a row contains a value ≥ 2^63, encoding will fail and the event is routed to DLQ. The default unsigned_bigint_mode: string avoids this entirely.
Using inferred schema instead of DDL
DEBUG no DDL schema available — falling back to JSON inference (Path C)
DeltaForge couldn’t look up the source table schema and is using a less precise JSON-inferred schema. This happens when:
- The schema loader hasn’t cached the table schema yet (first events at startup)
- The table doesn’t match the configured table patterns
- The source type doesn’t support schema loading
Check deltaforge_avro_encode_total{path="inferred"} — if this counter is growing while path="ddl" is not, investigate why DDL lookup is failing.
Development Guide
Use this guide to build, test, and extend DeltaForge. It covers local workflows, optional dependency containers, and how to work with Docker images.
All contributions are welcome and highly appreciated.
Local prerequisites
- Rust toolchain 1.89+ (install via
rustup). - Optional: Docker or Podman for running the dev dependency stack and the container image.
Workspace layout
crates/deltaforge-core: shared event model, pipeline engine, and checkpointing primitives.crates/deltaforge-config: YAML config parsing, environment variable expansion, and pipeline spec types.crates/sources: database CDC readers (MySQL binlog, Postgres logical replication) implemented as pluggable sources.crates/processors: JavaScript-based processors and support code for transforming batches.crates/sinks: sink implementations (Kafka producer, Redis streams, NATS JetStream) plus sink utilities.crates/rest-api: HTTP control plane with health/readiness and pipeline lifecycle endpoints.crates/runner: CLI entrypoint that wires the runtime, metrics, and control plane together.crates/chaos: end-to-end chaos scenario runner, benchmarks, and interactive playground UI.
Use these crate boundaries as reference points when adding new sources, sinks, or pipeline behaviors.
Start dev dependencies
Bring up the optional backing services (MySQL, Kafka, Redis) with Docker Compose:
docker compose -f docker-compose.dev.yml up -d
Each service is exposed on localhost for local runs (5432, 3306, 9092, 6379). The MySQL container seeds demo data from ./init-scripts and configures binlog settings required for CDC.
Prefer the convenience dev.sh wrapper to keep common tasks consistent:
./dev.sh up # start the dependency stack
./dev.sh down # stop and remove it
./dev.sh ps # see container status
Build and test locally
Run the usual Rust workflow from the repo root:
cargo fmt --all
cargo clippy --workspace --all-targets --all-features
cargo test --workspace
Or use the helper script for a single command that mirrors CI expectations:
./dev.sh build # build project (debug)
./dev.sh build-release # build project (release)
./dev.sh run # run with examples/dev.yaml
./dev.sh fmt # format code
./dev.sh lint # clippy with warnings as errors
./dev.sh test # full test suite
./dev.sh check # fmt --check + clippy + tests (mirrors CI)
./dev.sh cov # generate coverage report
Docker images
Use pre-built images
Multi-arch images (amd64/arm64) are published to GHCR and Docker Hub:
# Minimal (~57MB, scratch-based, no shell)
docker pull ghcr.io/vnvo/deltaforge:latest
docker pull vnvohub/deltaforge:latest
# Debug (~140MB, includes shell for troubleshooting)
docker pull ghcr.io/vnvo/deltaforge:latest-debug
docker pull vnvohub/deltaforge:latest-debug
| Variant | Size | Base | Use case |
|---|---|---|---|
latest | ~57MB | scratch | Production |
latest-debug | ~140MB | debian-slim | Troubleshooting, has shell |
Build locally
Two Dockerfiles are provided:
# Minimal image (~57MB)
docker build -t deltaforge:local .
# Debug image (~140MB, includes shell)
docker build -t deltaforge:local-debug -f Dockerfile.debug .
Or use the dev helper:
./dev.sh docker # build minimal image
./dev.sh docker-debug # build debug image
./dev.sh docker-test # test minimal image runs
./dev.sh docker-test-debug # test debug image runs
./dev.sh docker-all # build and test all variants
./dev.sh docker-shell # open shell in debug container
Build multi-arch locally
To build for both amd64 and arm64:
./dev.sh docker-multi-setup # create buildx builder (once)
./dev.sh docker-multi # build both architectures
Note: Multi-arch builds use QEMU emulation and take ~30-35 minutes. The images are not loaded locally - use --push to push to a registry.
Run the image
Run the container by mounting your pipeline specs and exposing the API and metrics ports:
docker run --rm \
-p 8080:8080 -p 9000:9000 \
-v $(pwd)/examples/dev.yaml:/etc/deltaforge/pipeline.yaml:ro \
-v deltaforge-checkpoints:/app/data \
ghcr.io/vnvo/deltaforge:latest \
--config /etc/deltaforge/pipeline.yaml
Notes:
- The container listens on
0.0.0.0:8080for the control plane API with metrics on:9000. - Checkpoints are written to
/app/data/df_checkpoints.json; mount a volume to persist them across restarts. - Environment variables inside the YAML are expanded before parsing.
- Pass any other runner flags as needed (e.g.,
--api-addror--metrics-addr).
Debug a running container
Use the debug image to troubleshoot:
# Run with shell access
docker run --rm -it --entrypoint /bin/bash ghcr.io/vnvo/deltaforge:latest-debug
# Exec into a running container
docker exec -it <container_id> /bin/bash
Dev helper commands
The dev.sh script provides shortcuts for common tasks:
./dev.sh help # show all commands
Infrastructure
./dev.sh up # start MySQL, Kafka, Redis
./dev.sh down # stop and remove containers
./dev.sh ps # list running services
Kafka
./dev.sh k-list # list topics
./dev.sh k-create <topic> # create topic
./dev.sh k-consume <topic> --from-beginning
./dev.sh k-produce <topic> # interactive producer
Redis
./dev.sh redis-cli # open redis-cli
./dev.sh redis-read <stream> # read from stream
Database shells
./dev.sh pg-sh # psql into Postgres
./dev.sh mysql-sh # mysql into MySQL
Documentation
./dev.sh docs # serve docs locally (opens browser)
./dev.sh docs-build # build docs
Pre-release checks
./dev.sh release-check # run all checks + build all Docker variants
Chaos testing
End-to-end resilience tests and benchmarks run against a live Docker Compose stack with fault injection via Toxiproxy. Scenarios cover network partitions, sink outages, crash recovery, server failover, schema drift, binlog purge, long-running endurance runs, and binlog backlog drain benchmarks.
Prerequisites
Build the debug image first (includes a shell, needed for some scenarios):
docker build -t deltaforge:dev-debug -f Dockerfile.debug .
Stack profiles
The df compose profile starts 3 DeltaForge instances — one per build variant:
| Instance | Port | Image | Use case |
|---|---|---|---|
deltaforge-release | 8080 | deltaforge:latest | Production behavior, regression testing |
deltaforge-debug | 8081 | deltaforge:dev-debug | Verbose logging, assertions, chaos scenarios |
deltaforge-profile | 8082 | deltaforge:dev-profile | Flamegraphs, CPU profiling, benchmarks |
Pipeline configs are selected dynamically — either via the chaos UI config dropdown, or the CLI --port flag. All 3 instances start with a default config (mysql-to-kafka.yaml) and can be swapped at runtime.
Start the chaos environment
docker compose -f docker-compose.chaos.yml \
--profile base --profile mysql-infra --profile kafka-infra --profile df up -d
Add --profile pg-infra if testing PostgreSQL scenarios.
Run resilience scenarios
# Target the debug instance (default --port 8080, override with --port)
cargo run -p chaos -- --scenario all --source mysql
cargo run -p chaos -- --scenario network-partition --port 8081
cargo run -p chaos -- --scenario all --source postgres --port 8080
Exit code is 0 on full pass, 1 on any failure — suitable for CI.
Run endurance and benchmark scenarios
Before running soak/drain, apply the appropriate config via the UI or REST API:
# Apply soak config to the profile instance (for flamegraphs)
curl -X POST http://localhost:7474/api/apply-config \
-H 'Content-Type: application/json' \
-d '{"port": 8082, "config": "mysql-soak.yaml"}'
# Soak — long-running with random fault injection
cargo run -p chaos -- --scenario soak --port 8082 --topic chaos.soak
# Soak-stable — same workload, no faults (baseline)
cargo run -p chaos -- --scenario soak-stable --port 8082 --topic chaos.soak --duration-mins 30
# Backlog-drain — measures catch-up throughput (1M row replay)
cargo run -p chaos -- --scenario backlog-drain --port 8082 --topic chaos.soak --no-proxy
# Backlog-drain with custom tuning
cargo run -p chaos -- --scenario backlog-drain --port 8082 --topic chaos.soak --no-proxy \
--drain-max-events 4000 --drain-max-ms 100 \
--drain-kafka-conf linger.ms=0
# TPC-C — apply tpcc config first, then run
curl -X POST http://localhost:7474/api/apply-config \
-H 'Content-Type: application/json' \
-d '{"port": 8081, "config": "mysql-tpcc.yaml"}'
cargo run -p chaos -- --scenario tpcc --port 8081 --duration-mins 30
Avro encoding tests
# Unit + mock tests (no Docker needed)
cargo test -p sinks --test avro_encoding_tests
# Real Schema Registry integration tests (Docker, needs kafka-infra for SR)
cargo test -p sinks --test avro_encoding_tests -- --include-ignored --nocapture --test-threads=1
# Avro chaos scenario: apply Avro config, then run SR outage
curl -X POST http://localhost:7474/api/apply-config \
-H 'Content-Type: application/json' \
-d '{"port": 8081, "config": "mysql-to-kafka-avro.yaml"}'
cargo run -p chaos -- --scenario sr-outage --port 8081
# JSON vs Avro throughput comparison:
# Instance 1 (debug): JSON soak
curl -X POST http://localhost:7474/api/apply-config \
-H 'Content-Type: application/json' \
-d '{"port": 8081, "config": "mysql-soak.yaml"}'
cargo run -p chaos -- --scenario soak-stable --port 8081 --topic chaos.soak --duration-mins 30
# Instance 2 (profile): Avro soak
curl -X POST http://localhost:7474/api/apply-config \
-H 'Content-Type: application/json' \
-d '{"port": 8082, "config": "mysql-soak-avro.yaml"}'
cargo run -p chaos -- --scenario soak-stable --port 8082 --topic chaos.soak.avro --duration-mins 30
Compare in Grafana: rate(deltaforge_sink_events_total[1m]) by instance port.
See Performance Tuning for detailed throughput optimization guidance and profiling instructions.
Playground UI
The chaos binary also ships an interactive web UI for manual exploration:
cargo run -p chaos -- --scenario ui
# Open http://localhost:7474
The UI provides:
- Live service status with health dots, port badges, and Docker image selector
- Stale image detection — warns when a container is running an older image after a rebuild
- Activity bar — shows current operation with per-button loading state and task history
- Console log — unified output for all actions (infra, faults, scenarios) with smart auto-scroll
- One-click fault injection via Toxiproxy (partitions, latency, bandwidth throttle)
- Scenario runner with proxy bypass toggle, drain settings, and live log streaming
- Pipeline API browser for any DeltaForge instance
- Config Lab for A/B config comparison with presets
- CPU profiler — captures flamegraphs from running containers with pipeline context in the subtitle
Data management
The UI includes a Data Management card for resetting persistent state between test runs:
- Reset Checkpoints — stops all DeltaForge instances and deletes their SQLite checkpoint databases (GTID positions, replication offsets). Source databases and Kafka are untouched. Use this when switching branches or after a binlog purge leaves stale checkpoint state.
- Reset All Volumes — runs
docker compose down -vacross all profiles, removing every named volume (MySQL data, Kafka state, Postgres data, checkpoints, Grafana). Full clean slate that requires re-initialization of all services.
Teardown
docker compose -f docker-compose.chaos.yml --profile app down -v
docker compose -f docker-compose.chaos.yml down -v
See crates/chaos/README.md for the full scenario catalogue, network topology, all CLI flags, and instructions for adding new scenarios.
Contributing
- Fork the repository
- Create a branch from
main(e.g.,feature/new-sink,fix/checkpoint-bug) - Make your changes
- Run
./dev.sh checkto ensure CI will pass - Submit a PR against
main
Things to Remember
Tests
There a few #[ignore] tests, run them when making deep changes to the sources, pipeline coordination and anything with impact on core functionality.
Logging hygiene
- Include
pipeline,tenant,source_id/sink_id, andbatch_idfields on all warnings/errors to make traces joinable in log aggregation tools. - Normalize retry/backoff logs so they include the attempt count and sleep duration; consider a structured
reasonfield alongside error details for dashboards. - Add info-level summaries on interval (e.g., every N batches) reporting batches processed, average batch size, lag, and sink latency percentiles pulled from the metrics registry to create human-friendly breadcrumbs.
- Add metrics in a backward-compatible way: prefer new metric names over redefining existing ones to avoid breaking dashboards. Validate cardinality (bounded label sets) before merging.
- Gate noisy logs behind levels (
debugfor per-event traces,infofor batch summaries,warn/errorfor retries and failures). - Exercise the new metrics in integration tests by asserting counters change when sending synthetic events through pipelines.
Roadmap
Completed
- Avro encoding with Confluent Schema Registry — DDL-derived Avro schemas, Confluent wire format, all sinks supported, type conversion policies, Schema Registry failure handling with cached fallback
- HTTP/Webhook sink — POST/PUT to any URL, URL templates, batch mode, retry with backoff
- Dead Letter Queue — per-event failure routing, overflow policies, REST API for inspection
- Per-sink independent checkpoints — each sink advances independently, source replays from minimum
- Exactly-once delivery — Kafka transactional producer with producer fencing detection
- Helm chart — StatefulSet, ConfigMap, PVC, ServiceMonitor, PDB
- Schema sensing — automatic schema inference from payloads, high-cardinality key detection
In Progress
- Avro Schema Registry Phase 2 — publish sensed schemas to external Schema Registry as catalog metadata (under
sensed.prefix). See RFC.
Planned
- MongoDB source — change streams CDC
- S3/Parquet sink — data lake integration
- Event replay — replay DLQ entries or historical events
- Kubernetes operator — PipelineTemplate + PipelinePool for fleet management
- OpenAPI spec generation — auto-generated REST API documentation