Introduction
DeltaForge is a modular, config-driven Change Data Capture (CDC) micro-framework built in Rust. It streams database changes into downstream systems like Kafka, Redis, and NATS while giving you full control over how each event is processed.
Pipelines are defined declaratively in YAML, making it straightforward to onboard new use cases without custom code.
| Built with | Sources | Processors | Sinks |
|
Rust |
MySQL · PostgreSQL |
JavaScript |
Kafka · Redis · NATS |
Why DeltaForge?
Core Strengths
- ⚡ Powered by Rust : Predictable performance, memory safety, and minimal resource footprint.
- 🔌 Pluggable architecture : Sources, processors, and sinks are modular and independently extensible.
- 🧩 Declarative pipelines : Define sources, transforms, sinks, and commit policies in version-controlled YAML with environment variable expansion for secrets.
- 📦 Reliable checkpointing : Resume safely after restarts with at-least-once delivery guarantees.
- 🛠️ Cloud-native ready : Single binary, Docker images, JSON logs, Prometheus metrics, and liveness/readiness probes for Kubernetes.
Schema Intelligence
- 🔍 Schema sensing : Automatically infer and track schema from event payloads, including deep inspection of nested JSON structures.
- 🏷️ Schema fingerprinting : SHA-256 based change detection with schema-to-checkpoint correlation for reliable replay.
- 🗃️ Source-owned semantics : Preserves native database types (PostgreSQL arrays, MySQL JSON, etc.) instead of normalizing to a universal type system.
Operational Features
- 🔄 Automatic reconnection : Exponential backoff with jitter for resilient connections.
- 🎯 Flexible table selection : Wildcard patterns (
db.*,schema.prefix%) for easy onboarding. - 📀 Transaction boundaries : Optionally keep source transactions intact across batches.
- ⚙️ Commit policies : Control checkpoint behavior with
all,required, orquorummodes across multiple sinks.
Use Cases
DeltaForge is designed for:
- Real-time data synchronization : Keep caches, search indexes, and analytics systems in sync.
- Event-driven architectures : Stream database changes to Kafka or NATS for downstream consumers.
- Lightweight ETL : Transform and route data without heavyweight infrastructure.
DeltaForge is not a DAG-based stream processor. It is a focused CDC engine meant to replace tools like Debezium when you need a lighter, cloud-native, and more customizable runtime.
Getting Started
| Guide | Description |
|---|---|
| Quickstart | Get DeltaForge running in minutes |
| CDC Overview | Understand Change Data Capture concepts |
| Configuration | Pipeline spec reference |
| Development | Build from source, contribute |
Quick Example
metadata:
name: orders-to-kafka
tenant: acme
spec:
source:
type: mysql
config:
dsn: ${MYSQL_DSN}
tables: [shop.orders]
processors:
- type: javascript
inline: |
(event) => {
event.processed_at = Date.now();
return [event];
}
sinks:
- type: kafka
config:
brokers: ${KAFKA_BROKERS}
topic: order-events
Installation
Docker (recommended):
docker pull ghcr.io/vnvo/deltaforge:latest
From source:
git clone https://github.com/vnvo/deltaforge.git
cd deltaforge
cargo build --release
See the Development Guide for detailed build instructions and available Docker image variants.
License
DeltaForge is dual-licensed under MIT and Apache 2.0.
Change Data Capture (CDC)
Change Data Capture (CDC) is the practice of streaming database mutations as ordered events so downstream systems can stay in sync without periodic full loads. Rather than asking “what does my data look like now?”, CDC answers “what changed, and when?”
DeltaForge is a CDC engine built to make log-based change streams reliable, observable, and operationally simple in modern, containerized environments. It focuses on row-level CDC from MySQL binlog and Postgres logical replication to keep consumers accurate and latency-aware while minimizing impact on source databases.
In short: DeltaForge tails committed transactions from MySQL and Postgres logs, preserves ordering and transaction boundaries, and delivers events to Kafka and Redis with checkpointed delivery, configurable batching, and Prometheus metrics - without requiring a JVM or distributed coordinator.
Why CDC matters
Traditional data integration relies on periodic batch jobs that query source systems, compare snapshots, and push differences downstream. This approach worked for decades, but modern architectures demand something better.
The batch ETL problem: A nightly sync means your analytics are always a day stale. An hourly sync still leaves gaps and hammers your production database with expensive SELECT * queries during business hours. As data volumes grow, these jobs take longer, fail more often, and compete for the same resources your customers need.
CDC flips the model. Instead of pulling data on a schedule, you subscribe to changes as they happen.
| Aspect | Batch ETL | CDC |
|---|---|---|
| Latency | Minutes to hours | Seconds to milliseconds |
| Source load | High (repeated scans) | Minimal (log tailing) |
| Data freshness | Stale between runs | Near real-time |
| Failure recovery | Re-run entire job | Resume from checkpoint |
| Change detection | Diff comparison | Native from source |
These benefits compound as systems scale and teams decentralize:
- Deterministic replay: Ordered events allow consumers to reconstruct state or power exactly-once delivery with checkpointing.
- Polyglot delivery: The same change feed can serve caches, queues, warehouses, and search indexes simultaneously without additional source queries.
How CDC works
All CDC implementations share a common goal: detect changes and emit them as events. The approaches differ in how they detect those changes.
Log-based CDC
Databases maintain transaction logs (MySQL binlog, Postgres WAL) that record every committed change for durability and replication. Log-based CDC reads these logs directly, capturing changes without touching application tables.
┌─────────────┐ commits ┌─────────────┐ tails ┌─────────────┐
│ Application │──────────────▶│ Database │─────────────▶│ CDC Engine │
└─────────────┘ │ + WAL │ └──────┬──────┘
└─────────────┘ │
▼
┌─────────────┐
│ Kafka / │
│ Redis │
└─────────────┘
Advantages:
- Zero impact on source table performance
- Captures all changes including those from triggers and stored procedures
- Preserves transaction boundaries and ordering
- Can capture deletes without soft-delete columns
Trade-offs:
- Requires database configuration (replication slots, binlog retention)
- Schema changes need careful handling
- Log retention limits how far back you can replay
DeltaForge uses log-based CDC exclusively. This allows DeltaForge to provide stronger ordering guarantees, lower source impact, and simpler operational semantics than hybrid approaches that mix log tailing with polling or triggers.
Trigger-based CDC
Database triggers fire on INSERT, UPDATE, and DELETE operations, writing change records to a shadow table that a separate process polls.
Advantages:
- Works on databases without accessible transaction logs
- Can capture application-level context unavailable in logs
Trade-offs:
- Adds write overhead to every transaction
- Triggers can be disabled or forgotten during schema migrations
- Shadow tables require maintenance and can grow unbounded
Polling-based CDC
A process periodically queries tables for rows modified since the last check, typically using an updated_at timestamp or incrementing ID.
Advantages:
- Simple to implement
- No special database configuration required
Trade-offs:
- Cannot reliably detect deletes
- Requires
updated_atcolumns on every table - Polling frequency trades off latency against database load
- Clock skew and transaction visibility can cause missed or duplicate events
Anatomy of a CDC event
A well-designed CDC event contains everything downstream consumers need to process the change correctly.
{
"id": "evt_01J7K9X2M3N4P5Q6R7S8T9U0V1",
"source": {
"database": "shop",
"table": "orders",
"server_id": "mysql-prod-1"
},
"operation": "update",
"timestamp": "2025-01-15T14:32:01.847Z",
"transaction": {
"id": "gtid:3E11FA47-71CA-11E1-9E33-C80AA9429562:42",
"position": 15847293
},
"before": {
"id": 12345,
"status": "pending",
"total": 99.99,
"updated_at": "2025-01-15T14:30:00.000Z"
},
"after": {
"id": 12345,
"status": "shipped",
"total": 99.99,
"updated_at": "2025-01-15T14:32:01.000Z"
},
"schema_version": "v3"
}
Key components:
| Field | Purpose |
|---|---|
operation | INSERT, UPDATE, DELETE, or DDL |
before / after | Row state before and after the change (enables diff logic) |
transaction | Groups changes from the same database transaction |
timestamp | When the change was committed at the source |
schema_version | Helps consumers handle schema evolution |
Not all fields are present for every operation-before is omitted for INSERTs, after is omitted for DELETEs - but the event envelope and metadata fields are consistent across MySQL and Postgres sources.
Real-world use cases
Cache invalidation and population
Problem: Your Redis cache serves product catalog data, but cache invalidation logic is scattered across dozens of services. Some forget to invalidate, others invalidate too aggressively, and debugging staleness issues takes hours.
CDC solution: Stream changes from the products table to a message queue. A dedicated consumer reads the stream and updates or deletes cache keys based on the operation type. This centralizes invalidation logic, provides replay capability for cache rebuilds, and removes cache concerns from application code entirely.
┌──────────┐ CDC ┌─────────────┐ consumer ┌─────────────┐
│ MySQL │─────────────▶│ Stream │─────────────▶│ Cache Keys │
│ products │ │ (Kafka/ │ │ product:123 │
└──────────┘ │ Redis) │ └─────────────┘
└─────────────┘
Event-driven microservices
Problem: Your order service needs to notify inventory, shipping, billing, and analytics whenever an order changes state. Direct service-to-service calls create tight coupling and cascade failures.
CDC solution: Publish order changes to a durable message queue. Each downstream service subscribes independently, processes at its own pace, and can replay events after failures or during onboarding. The order service doesn’t need to know about its consumers.
┌─────────────┐ CDC ┌─────────────┐
│ Orders │─────────────▶│ Message │
│ Database │ │ Queue │
└─────────────┘ └──────┬──────┘
┌───────────────┼───────────────┐
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Inventory │ │ Shipping │ │ Analytics │
└───────────┘ └───────────┘ └───────────┘
Search index synchronization
Problem: Your Elasticsearch index drifts from the source of truth. Full reindexing takes hours and blocks search updates during the process.
CDC solution: Stream changes continuously to keep the index synchronized. Use the before image to remove old documents and the after image to index new content.
Data warehouse incremental loads
Problem: Nightly full loads into your warehouse take 6 hours and block analysts until noon. Business users complain that dashboards show yesterday’s numbers.
CDC solution: Stream changes to a staging topic, then micro-batch into your warehouse every few minutes. Analysts get near real-time data without impacting source systems.
Audit logging and compliance
Problem: Regulations require you to maintain a complete history of changes to sensitive data. Application-level audit logging is inconsistent and can be bypassed.
CDC solution: The transaction log captures every committed change regardless of how it was made - application code, admin scripts, or direct SQL. Stream these events to immutable storage for compliance.
Cross-region replication
Problem: You need to replicate data to a secondary region for disaster recovery, but built-in replication doesn’t support the transformations you need.
CDC solution: Stream changes through a CDC pipeline that filters, transforms, and routes events to the target region’s databases or message queues.
Architecture patterns
The outbox pattern
When you need to update a database and publish an event atomically, the outbox pattern provides exactly-once semantics without distributed transactions.
┌─────────────────────────────────────────┐
│ Single Transaction │
│ ┌─────────────┐ ┌─────────────────┐ │
│ │ UPDATE │ │ INSERT INTO │ │
│ │ orders SET │ + │ outbox (event) │ │
│ │ status=... │ │ │ │
│ └─────────────┘ └─────────────────┘ │
└─────────────────────────────────────────┘
│
▼ CDC
┌─────────────┐
│ Kafka │
└─────────────┘
- The application writes business data and an event record in the same transaction.
- CDC tails the outbox table and publishes events to Kafka.
- A cleanup process (or CDC itself) removes processed outbox rows.
This guarantees that events are published if and only if the transaction commits.
When to skip the outbox: If your only requirement is to react to committed database state (not application intent), direct CDC from business tables is often simpler than introducing an outbox. The outbox pattern adds value when you need custom event payloads, explicit event versioning, or when the event schema differs significantly from table structure.
Event sourcing integration
CDC complements event sourcing by bridging legacy systems that weren’t built event-first. Stream changes from existing tables into event stores, then gradually migrate to native event sourcing.
CQRS (Command Query Responsibility Segregation)
CDC naturally supports CQRS by populating read-optimized projections from the write model. Changes flow through the CDC pipeline to update denormalized views, search indexes, or cache layers.
Challenges and solutions
Schema evolution
Databases change. Columns get added, renamed, or removed. Types change. CDC pipelines need to handle this gracefully.
Strategies:
- Schema registry: Store and version schemas centrally (e.g., Confluent Schema Registry with Avro/Protobuf).
- Forward compatibility: Add columns as nullable; avoid removing columns that consumers depend on.
- Consumer tolerance: Design consumers to ignore unknown fields and handle missing optional fields.
- Processor transforms: Use DeltaForge’s JavaScript processors to normalize schemas before sinks.
Ordering guarantees
Events must arrive in the correct order for consumers to reconstruct state accurately. A DELETE arriving before its corresponding INSERT would be catastrophic.
DeltaForge guarantees:
- Source-order preservation per table partition (always enabled)
- Transaction boundary preservation when
respect_source_tx: trueis configured in batch settings
Kafka sink uses consistent partitioning by primary key to maintain ordering within a partition at the consumer.
Exactly-once delivery
Network failures, process crashes, and consumer restarts can cause duplicates or gaps. True exactly-once semantics require coordination between source, pipeline, and sink.
DeltaForge approach:
- Checkpoints track the last committed position in the source log.
- Configurable commit policies (
all,required,quorum) control when checkpoints advance. - Kafka sink supports idempotent producers; transactional writes available via
exactly_once: true.
Default behavior: DeltaForge provides at-least-once delivery out of the box. Exactly-once semantics require sink support and explicit configuration.
High availability
Production CDC pipelines need to handle failures without data loss or extended downtime.
Best practices:
- Run multiple pipeline instances with leader election.
- Store checkpoints in durable storage (DeltaForge persists to local files, mountable volumes in containers).
- Monitor lag between source position and checkpoint position.
- Set up alerts for pipeline failures and excessive lag.
Expectations: DeltaForge checkpoints ensure no data loss on restart, but does not currently include built-in leader election. For HA deployments, use external coordination (Kubernetes leader election, etcd locks) or run active-passive with health-check-based failover.
Backpressure
When sinks can’t keep up with the change rate, pipelines need to slow down gracefully rather than dropping events or exhausting memory.
DeltaForge handles backpressure through:
- Configurable batch sizes (
max_events,max_bytes,max_ms). - In-flight limits (
max_inflight) that bound concurrent sink writes. - Blocking reads from source when batches queue up.
Performance considerations
Batching trade-offs
| Setting | Low value | High value |
|---|---|---|
max_events | Lower latency, more overhead | Higher throughput, more latency |
max_ms | Faster flush, smaller batches | Larger batches, delayed flush |
max_bytes | Memory-safe, frequent commits | Efficient for large rows |
Start with DeltaForge defaults and tune based on observed latency and throughput.
Source database impact
Log-based CDC has minimal impact, but consider:
- Replication slot retention: Paused pipelines cause WAL/binlog accumulation.
- Connection limits: Each pipeline holds a replication connection.
- Network bandwidth: High-volume tables generate significant log traffic.
Sink throughput
- Kafka: Tune
batch.size,linger.ms, and compression inclient_conf. - Redis: Use pipelining and connection pooling for high-volume streams.
Monitoring and observability
CDC pipelines are long-running, stateful processes; without metrics and alerts, failures are silent by default. A healthy pipeline requires visibility into lag, throughput, and errors.
Key metrics to track
| Metric | Description | Alert threshold |
|---|---|---|
cdc_lag_seconds | Time between event timestamp and processing | > 60s |
events_processed_total | Throughput counter | Sudden drops |
checkpoint_lag_events | Events since last checkpoint | > 10,000 |
sink_errors_total | Failed sink writes | Any sustained errors |
batch_size_avg | Events per batch | Outside expected range |
DeltaForge exposes Prometheus metrics on the configurable metrics endpoint (default :9000).
Health checks
GET /healthz: Liveness probe - is the process running?GET /readyz: Readiness probe - are pipelines connected and processing?GET /pipelines: Detailed status of each pipeline including configuration.
Choosing a CDC solution
When evaluating CDC tools, consider:
| Factor | Questions to ask |
|---|---|
| Source support | Does it support your databases? MySQL binlog? Postgres logical replication? |
| Sink flexibility | Can it write to your target systems? Kafka, Redis, HTTP, custom? |
| Transformation | Can you filter, enrich, or reshape events in-flight? |
| Operational overhead | How much infrastructure does it require? JVM? Distributed coordinator? |
| Resource efficiency | What’s the memory/CPU footprint per pipeline? |
| Cloud-native | Does it containerize cleanly? Support health checks? Emit metrics? |
Where DeltaForge fits
DeltaForge intentionally avoids the operational complexity of JVM-based CDC stacks (Kafka Connect-style deployments with Zookeeper, Connect workers, and converter configurations) while remaining compatible with Kafka-centric architectures.
DeltaForge is designed for teams that want:
- Lightweight runtime: Single binary, minimal memory footprint, no JVM warmup.
- Config-driven pipelines: YAML specs instead of code for common patterns.
- Inline transformation: JavaScript processors for custom logic without recompilation.
- Container-native operations: Built for Kubernetes with health endpoints and Prometheus metrics.
DeltaForge is not designed for:
- Complex DAG-based stream processing with windowed aggregations
- Stateful joins across multiple streams
- Sources beyond MySQL and Postgres (currently)
- Built-in schema registry integration (use external registries)
If you need those capabilities, consider dedicated stream processors or the broader Kafka ecosystem. DeltaForge excels at getting data out of databases and into your event infrastructure reliably and efficiently.
Getting started
Ready to try CDC with DeltaForge? Head to the Quickstart guide to run your first pipeline in minutes.
For production deployments, review the Development guide for container builds and operational best practices.
Quickstart
Get DeltaForge running in minutes.
1. Prepare a pipeline spec
Create a YAML file that defines your CDC pipeline. Environment variables are expanded at runtime, so secrets stay out of version control.
metadata:
name: orders-mysql-to-kafka
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
processors:
- type: javascript
id: transform
inline: |
(event) => {
event.tags = ["processed"];
return [event];
}
sinks:
- type: kafka
config:
id: orders-kafka
brokers: ${KAFKA_BROKERS}
topic: orders
required: true
batch:
max_events: 500
max_bytes: 1048576
max_ms: 1000
2. Start DeltaForge
Using Docker (recommended):
docker run --rm \
-p 8080:8080 -p 9000:9000 \
-e MYSQL_DSN="mysql://user:pass@host:3306/db" \
-e KAFKA_BROKERS="kafka:9092" \
-v $(pwd)/pipeline.yaml:/etc/deltaforge/pipeline.yaml:ro \
-v deltaforge-checkpoints:/app/data \
ghcr.io/vnvo/deltaforge:latest \
--config /etc/deltaforge/pipeline.yaml
From source:
cargo run -p runner -- --config ./pipeline.yaml
Runner options
| Flag | Default | Description |
|---|---|---|
--config | (required) | Path to pipeline spec file or directory |
--api-addr | 0.0.0.0:8080 | REST API address |
--metrics-addr | 0.0.0.0:9095 | Prometheus metrics address |
3. Verify it’s running
Check health and pipeline status:
# Liveness probe
curl http://localhost:8080/healthz
# Readiness with pipeline status
curl http://localhost:8080/readyz
# List all pipelines
curl http://localhost:8080/pipelines
4. Manage pipelines
Control pipelines via the REST API:
# Pause a pipeline
curl -X POST http://localhost:8080/pipelines/orders-mysql-to-kafka/pause
# Resume a pipeline
curl -X POST http://localhost:8080/pipelines/orders-mysql-to-kafka/resume
# Stop a pipeline
curl -X POST http://localhost:8080/pipelines/orders-mysql-to-kafka/stop
Next steps
- CDC Overview : Understand how Change Data Capture works
- Configuration : Full pipeline spec reference
- Sources : MySQL and Postgres setup
- Sinks : Kafka and Redis configuration
- Development : Build from source, run locally
Configuration
Pipelines are defined as YAML documents that map directly to the PipelineSpec type. Environment variables are expanded before parsing using ${VAR} syntax, so secrets and connection strings can be injected at runtime.
Document structure
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: <pipeline-name>
tenant: <tenant-id>
spec:
source: { ... }
processors: [ ... ]
sinks: [ ... ]
batch: { ... }
commit_policy: { ... }
schema_sensing: { ... }
Metadata
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Unique pipeline identifier. Used in API routes and metrics. |
tenant | string | Yes | Business-oriented tenant label for multi-tenancy. |
Spec fields
| Field | Type | Required | Description |
|---|---|---|---|
source | object | Yes | Database source configuration. See Sources. |
processors | array | No | Ordered list of processors. See Processors. |
sinks | array | Yes (at least one) | One or more sinks that receive each batch. See Sinks. |
sharding | object | No | Optional hint for downstream distribution. |
connection_policy | object | No | How the runtime establishes upstream connections. |
batch | object | No | Commit unit thresholds. See Batching. |
commit_policy | object | No | How sink acknowledgements gate checkpoints. See Commit policy. |
schema_sensing | object | No | Automatic schema inference from event payloads. See Schema sensing. |
Sources
MySQL
Captures row-level changes via binlog replication. See MySQL source documentation for prerequisites and detailed configuration.
|
|
Table patterns support SQL LIKE syntax:
db.table- exact matchdb.prefix%- tables matching prefixdb.*- all tables in database%.table- table in any database
Turso
|
|
PostgreSQL
Captures row-level changes via logical replication using the pgoutput plugin. See PostgreSQL source documentation for prerequisites and detailed configuration.
|
|
Table patterns support SQL LIKE syntax (same as MySQL):
schema.table- exact matchschema.prefix%- tables matching prefixschema.*- all tables in schema%.table- table in any schematable- defaults topublic.table
Processors
Processors transform batches of events before delivery to sinks.
JavaScript
|
|
The processBatch(events) function receives an array of events and can return:
- An array of events (modified, filtered, or expanded)
- A single event object (wrapped in array automatically)
nullorundefinedto use the mutated input array- Empty array
[]to drop all events
Sinks
Kafka
|
|
Redis
|
|
NATS
|
|
Batching
|
|
Commit policy
|
|
Schema sensing
|
|
Complete example
MySQL to Kafka
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-mysql-to-kafka
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
processors:
- type: javascript
id: transform
inline: |
function processBatch(events) {
return events.map(event => {
event.tags = (event.tags || []).concat(["normalized"]);
return event;
});
}
limits:
cpu_ms: 50
mem_mb: 128
timeout_ms: 500
sinks:
- type: kafka
config:
id: orders-kafka
brokers: ${KAFKA_BROKERS}
topic: orders
required: true
exactly_once: false
client_conf:
message.timeout.ms: "5000"
- type: redis
config:
id: orders-redis
uri: ${REDIS_URI}
stream: orders
required: false
batch:
max_events: 500
max_bytes: 1048576
max_ms: 1000
respect_source_tx: true
max_inflight: 2
commit_policy:
mode: required
schema_sensing:
enabled: true
deep_inspect:
enabled: true
max_depth: 3
sampling:
warmup_events: 50
sample_rate: 5
PostgreSQL to Kafka
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: users-postgres-to-kafka
tenant: acme
spec:
source:
type: postgres
config:
id: users-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_users
publication: users_pub
tables:
- public.users
- public.user_sessions
start_position: earliest
sinks:
- type: kafka
config:
id: users-kafka
brokers: ${KAFKA_BROKERS}
topic: user-events
required: true
batch:
max_events: 500
max_ms: 1000
respect_source_tx: true
commit_policy:
mode: required
MySQL to NATS
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-mysql-to-nats
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.order_items
sinks:
- type: nats
config:
id: orders-nats
url: ${NATS_URL}
subject: orders.events
stream: ORDERS
required: true
batch:
max_events: 500
max_ms: 1000
respect_source_tx: true
commit_policy:
mode: required
REST API Reference
DeltaForge exposes a REST API for health checks, pipeline management, schema inspection, and drift detection. All endpoints return JSON.
Base URL
Default: http://localhost:8080
Configure with --api-addr:
deltaforge --config pipelines.yaml --api-addr 0.0.0.0:9090
Health Endpoints
Liveness Probe
GET /healthz
Returns ok if the process is running. Use for Kubernetes liveness probes.
Response: 200 OK
ok
Readiness Probe
GET /readyz
Returns pipeline states. Use for Kubernetes readiness probes.
Response: 200 OK
{
"status": "ready",
"pipelines": [
{
"name": "orders-cdc",
"status": "running",
"spec": { ... }
}
]
}
Pipeline Management
List Pipelines
GET /pipelines
Returns all pipelines with current status.
Response: 200 OK
[
{
"name": "orders-cdc",
"status": "running",
"spec": {
"metadata": { "name": "orders-cdc", "tenant": "acme" },
"spec": { ... }
}
}
]
Get Pipeline
GET /pipelines/{name}
Returns a single pipeline by name.
Response: 200 OK
{
"name": "orders-cdc",
"status": "running",
"spec": { ... }
}
Errors:
404 Not Found- Pipeline doesn’t exist
Create Pipeline
POST /pipelines
Content-Type: application/json
Creates a new pipeline from a full spec.
Request:
{
"metadata": {
"name": "orders-cdc",
"tenant": "acme"
},
"spec": {
"source": {
"type": "mysql",
"config": {
"id": "mysql-1",
"dsn": "mysql://user:pass@host/db",
"tables": ["shop.orders"]
}
},
"processors": [],
"sinks": [
{
"type": "kafka",
"config": {
"id": "kafka-1",
"brokers": "localhost:9092",
"topic": "orders"
}
}
]
}
}
Response: 200 OK
{
"name": "orders-cdc",
"status": "running",
"spec": { ... }
}
Errors:
409 Conflict- Pipeline already exists
Update Pipeline
PATCH /pipelines/{name}
Content-Type: application/json
Applies a partial update to an existing pipeline. The pipeline is stopped, updated, and restarted.
Request:
{
"spec": {
"batch": {
"max_events": 1000,
"max_ms": 500
}
}
}
Response: 200 OK
{
"name": "orders-cdc",
"status": "running",
"spec": { ... }
}
Errors:
404 Not Found- Pipeline doesn’t exist400 Bad Request- Name mismatch in patch
Delete Pipeline
DELETE /pipelines/{name}
Permanently deletes a pipeline. This removes the pipeline from the runtime and cannot be undone.
Response: 204 No Content
Errors:
404 Not Found- Pipeline doesn’t exist
Pause Pipeline
POST /pipelines/{name}/pause
Pauses ingestion. Events in the buffer are not processed until resumed.
Response: 200 OK
{
"name": "orders-cdc",
"status": "paused",
"spec": { ... }
}
Resume Pipeline
POST /pipelines/{name}/resume
Resumes a paused pipeline.
Response: 200 OK
{
"name": "orders-cdc",
"status": "running",
"spec": { ... }
}
Stop Pipeline
POST /pipelines/{name}/stop
Stops a pipeline. Final checkpoint is saved.
Response: 200 OK
{
"name": "orders-cdc",
"status": "stopped",
"spec": { ... }
}
Schema Management
List Database Schemas
GET /pipelines/{name}/schemas
Returns all tracked database schemas for a pipeline. These are the schemas loaded directly from the source database.
Response: 200 OK
[
{
"database": "shop",
"table": "orders",
"column_count": 5,
"primary_key": ["id"],
"fingerprint": "sha256:a1b2c3d4e5f6...",
"registry_version": 2
},
{
"database": "shop",
"table": "customers",
"column_count": 8,
"primary_key": ["id"],
"fingerprint": "sha256:f6e5d4c3b2a1...",
"registry_version": 1
}
]
Get Schema Details
GET /pipelines/{name}/schemas/{db}/{table}
Returns detailed schema information including all columns.
Response: 200 OK
{
"database": "shop",
"table": "orders",
"columns": [
{
"name": "id",
"type": "bigint(20) unsigned",
"nullable": false,
"default": null,
"extra": "auto_increment"
},
{
"name": "customer_id",
"type": "bigint(20)",
"nullable": false,
"default": null
}
],
"primary_key": ["id"],
"fingerprint": "sha256:a1b2c3d4..."
}
Schema Sensing
Schema sensing automatically infers schema structure from JSON event payloads. This is useful for sources that don’t provide schema metadata or for detecting schema evolution in JSON columns.
List Inferred Schemas
GET /pipelines/{name}/sensing/schemas
Returns all schemas inferred via sensing for a pipeline.
Response: 200 OK
[
{
"table": "orders",
"fingerprint": "sha256:abc123...",
"sequence": 3,
"event_count": 1500,
"stabilized": true,
"first_seen": "2025-01-15T10:30:00Z",
"last_seen": "2025-01-15T14:22:00Z"
}
]
| Field | Description |
|---|---|
table | Table name (or table:column for JSON column sensing) |
fingerprint | SHA-256 content hash of current schema |
sequence | Monotonic version number (increments on evolution) |
event_count | Total events observed |
stabilized | Whether schema has stopped sampling (structure stable) |
first_seen | First observation timestamp |
last_seen | Most recent observation timestamp |
Get Inferred Schema Details
GET /pipelines/{name}/sensing/schemas/{table}
Returns detailed inferred schema including all fields.
Response: 200 OK
{
"table": "orders",
"fingerprint": "sha256:abc123...",
"sequence": 3,
"event_count": 1500,
"stabilized": true,
"fields": [
{
"name": "id",
"types": ["integer"],
"nullable": false,
"optional": false
},
{
"name": "metadata",
"types": ["object"],
"nullable": true,
"optional": false,
"nested_field_count": 5
},
{
"name": "tags",
"types": ["array"],
"nullable": false,
"optional": true,
"array_element_types": ["string"]
}
],
"first_seen": "2025-01-15T10:30:00Z",
"last_seen": "2025-01-15T14:22:00Z"
}
Export JSON Schema
GET /pipelines/{name}/sensing/schemas/{table}/json-schema
Exports the inferred schema as a standard JSON Schema document.
Response: 200 OK
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "orders",
"type": "object",
"properties": {
"id": { "type": "integer" },
"metadata": { "type": ["object", "null"] },
"tags": {
"type": "array",
"items": { "type": "string" }
}
},
"required": ["id", "metadata"]
}
Get Sensing Cache Statistics
GET /pipelines/{name}/sensing/stats
Returns cache performance statistics for schema sensing.
Response: 200 OK
{
"tables": [
{
"table": "orders",
"cached_structures": 3,
"max_cache_size": 100,
"cache_hits": 1450,
"cache_misses": 50
}
],
"total_cache_hits": 1450,
"total_cache_misses": 50,
"hit_rate": 0.9667
}
Drift Detection
Drift detection compares expected database schema against observed data patterns to detect mismatches, unexpected nulls, and type drift.
Get Drift Results
GET /pipelines/{name}/drift
Returns drift detection results for all tables in a pipeline.
Response: 200 OK
[
{
"table": "orders",
"has_drift": true,
"columns": [
{
"column": "amount",
"expected_type": "decimal(10,2)",
"observed_types": ["string"],
"mismatch_count": 42,
"examples": ["\"99.99\""]
}
],
"events_analyzed": 1500,
"events_with_drift": 42
}
]
Get Table Drift
GET /pipelines/{name}/drift/{table}
Returns drift detection results for a specific table.
Response: 200 OK
{
"table": "orders",
"has_drift": false,
"columns": [],
"events_analyzed": 1000,
"events_with_drift": 0
}
Errors:
404 Not Found- Table not found or no drift data available
Error Responses
All error responses follow this format:
{
"error": "Description of the error"
}
| Status Code | Meaning |
|---|---|
400 Bad Request | Invalid request body or parameters |
404 Not Found | Resource doesn’t exist |
409 Conflict | Resource already exists |
500 Internal Server Error | Unexpected server error |
Pipelines
Each pipeline is created from a single PipelineSpec. The runtime spawns the source, processors, and sinks defined in the spec and coordinates them with batching and checkpointing.
- 🔄 Live control: pause, resume, or stop pipelines through the REST API without redeploying.
- 📦 Coordinated delivery: batching and commit policy keep sinks consistent even when multiple outputs are configured.
Lifecycle controls
The REST API addresses pipelines by metadata.name and returns PipeInfo records containing the live spec and status.
GET /healthz- liveness probe.GET /readyz- readiness with pipeline states.GET /pipelines- list pipelines.POST /pipelines- create from a full spec.PATCH /pipelines/{name}- merge a partial spec (for example, adjust batch thresholds) and restart the pipeline.POST /pipelines/{name}/pause- pause ingestion and coordination.POST /pipelines/{name}/resume- resume a paused pipeline.POST /pipelines/{name}/stop- stop a running pipeline.
Pausing halts both source ingestion and the coordinator. Resuming re-enables both ends so buffered events can drain cleanly.
Processors
Processors run in the declared order for each batch. The built-in processor type is JavaScript, powered by deno_core.
type: javascriptid: processor label.inline: JS source. Export aprocessBatch(events)function that returns the transformed batch.limits(optional): resource guardrails (cpu_ms,mem_mb,timeout_ms).
Batching
The coordinator builds batches using soft thresholds:
max_events: flush after this many events.max_bytes: flush after the serialized size reaches this limit.max_ms: flush after this much time has elapsed since the batch started.respect_source_tx: when true, never split a single source transaction across batches.max_inflight: cap the number of batches being processed concurrently.
Commit policy
When multiple sinks are configured, checkpoints can wait for different acknowledgement rules:
all: every sink must acknowledge.required(default): only sinks markedrequired: truemust acknowledge; others are best-effort.quorum: checkpoint after at leastquorumsinks acknowledge.
Sources
DeltaForge captures database changes through pluggable source connectors. Each source is configured under spec.source with a type field and a config object. Environment variables are expanded before parsing using ${VAR} syntax.
Supported Sources
| Source | Status | Description |
|---|---|---|
mysql | ✅ Production | MySQL binlog CDC with GTID support |
postgres | ✅ Production | PostgreSQL logical replication via pgoutput |
turso | 🔧 Beta | Turso/libSQL CDC with multiple modes |
Common Behavior
All sources share these characteristics:
- Checkpointing: Progress is automatically saved and resumed on restart
- Schema tracking: Table schemas are loaded and fingerprinted for change detection
- At-least-once delivery: Events may be redelivered after failures; sinks should be idempotent
- Batching: Events are batched according to the pipeline’s
batchconfiguration - Transaction boundaries:
respect_source_tx: true(default) keeps source transactions intact
Source Interface
Sources implement a common trait that provides:
#![allow(unused)]
fn main() {
trait Source {
fn checkpoint_key(&self) -> &str;
async fn run(&self, tx: Sender<Event>, checkpoint_store: Arc<dyn CheckpointStore>) -> SourceHandle;
}
}
The returned SourceHandle supports pause/resume and graceful cancellation.
Adding Custom Sources
The source interface is pluggable. To add a new source:
- Implement the
Sourcetrait - Add configuration parsing in
deltaforge-config - Register the source type in the pipeline builder
See existing sources for implementation patterns.
MySQL source
DeltaForge tails the MySQL binlog to capture row-level changes with automatic checkpointing and schema tracking.
Prerequisites
MySQL Server Configuration
Ensure your MySQL server has binary logging enabled with row-based format:
-- Required server settings (my.cnf or SET GLOBAL)
log_bin = ON
binlog_format = ROW
binlog_row_image = FULL -- Recommended for complete before-images
If binlog_row_image is not FULL, DeltaForge will warn at startup and before-images on UPDATE/DELETE events may be incomplete.
User Privileges
Create a dedicated replication user with the required grants:
-- Create user with mysql_native_password (required by binlog connector)
CREATE USER 'deltaforge'@'%' IDENTIFIED WITH mysql_native_password BY 'your_password';
-- Replication privileges (required)
GRANT REPLICATION REPLICA, REPLICATION CLIENT ON *.* TO 'deltaforge'@'%';
-- Schema introspection (required for table discovery)
GRANT SELECT, SHOW VIEW ON your_database.* TO 'deltaforge'@'%';
FLUSH PRIVILEGES;
For capturing all databases, grant SELECT on *.* instead.
Configuration
Set spec.source.type to mysql and provide a config object:
| Field | Type | Required | Description |
|---|---|---|---|
id | string | Yes | Unique identifier used for checkpoints, server_id derivation, and metrics |
dsn | string | Yes | MySQL connection string with replication privileges |
tables | array | No | Table patterns to capture; omit or leave empty to capture all user tables |
Table Patterns
The tables field supports flexible pattern matching using SQL LIKE syntax:
tables:
- shop.orders # exact match: database "shop", table "orders"
- shop.order_% # LIKE pattern: tables starting with "order_" in "shop"
- analytics.* # wildcard: all tables in "analytics" database
- %.audit_log # cross-database: "audit_log" table in any database
# omit entirely to capture all user tables (excludes system schemas)
System schemas (mysql, information_schema, performance_schema, sys) are always excluded.
Example
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.order_items
Resume Behavior
DeltaForge automatically checkpoints progress and resumes from the last position on restart. The resume strategy follows this priority:
- GTID - Preferred if the MySQL server has GTID enabled. Provides the most reliable resume across binlog rotations and failovers.
- File:position - Used when GTID is not available. Resumes from the exact binlog file and byte offset.
- Binlog tail - On first run with no checkpoint, starts from the current end of the binlog (no historical replay).
Checkpoints are stored using the id field as the key.
Server ID Handling
MySQL replication requires each replica to have a unique server_id. DeltaForge derives this automatically from the source id using a CRC32 hash:
server_id = 1 + (CRC32(id) % 4,000,000,000)
When running multiple DeltaForge instances against the same MySQL server, ensure each has a unique id to avoid server_id conflicts.
Schema Tracking
DeltaForge has a built-in schema registry to track table schemas, per source. For MySQL source:
- Schemas are preloaded at startup by querying
INFORMATION_SCHEMA - Each schema is fingerprinted using SHA-256 for change detection
- Events carry
schema_version(fingerprint) andschema_sequence(monotonic counter) - Schema-to-checkpoint correlation enables reliable replay
Schema changes (DDL) trigger automatic reload of affected table schemas.
Timeouts and Heartbeats
| Behavior | Value | Description |
|---|---|---|
| Heartbeat interval | 15s | Server sends heartbeat if no events |
| Read timeout | 90s | Maximum wait for next binlog event |
| Inactivity timeout | 60s | Triggers reconnect if no data received |
| Connect timeout | 30s | Maximum time to establish connection |
These values are currently fixed. Reconnection uses exponential backoff with jitter.
Event Format
Each captured row change produces an event with:
op:insert,update, ordeletebefore: Previous row state (updates and deletes only, requiresbinlog_row_image = FULL)after: New row state (inserts and updates only)table: Fully qualified table name (database.table)tx_id: GTID if availablecheckpoint: Binlog position for resumeschema_version: Schema fingerprintschema_sequence: Monotonic sequence for schema correlation
Troubleshooting
Connection Issues
If you see authentication errors mentioning mysql_native_password:
ALTER USER 'deltaforge'@'%' IDENTIFIED WITH mysql_native_password BY 'password';
Missing Before-Images
If UPDATE/DELETE events have incomplete before data:
SET GLOBAL binlog_row_image = 'FULL';
Binlog Not Enabled
Check binary logging status:
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
SHOW BINARY LOG STATUS; -- or SHOW MASTER STATUS on older versions
Privilege Issues
Verify grants for your user:
SHOW GRANTS FOR 'deltaforge'@'%';
Required grants include REPLICATION REPLICA and REPLICATION CLIENT.
PostgreSQL source
DeltaForge captures row-level changes from PostgreSQL using logical replication with the pgoutput plugin.
Prerequisites
PostgreSQL Server Configuration
Enable logical replication in postgresql.conf:
# Required settings
wal_level = logical
max_replication_slots = 10 # At least 1 per DeltaForge pipeline
max_wal_senders = 10 # At least 1 per DeltaForge pipeline
Restart PostgreSQL after changing these settings.
User Privileges
Create a replication user with the required privileges:
-- Create user with replication capability
CREATE ROLE deltaforge WITH LOGIN REPLICATION PASSWORD 'your_password';
-- Grant connect access
GRANT CONNECT ON DATABASE your_database TO deltaforge;
-- Grant schema usage and table access for schema introspection
GRANT USAGE ON SCHEMA public TO deltaforge;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO deltaforge;
-- For automatic publication/slot creation (optional)
-- If you prefer manual setup, skip this and create them yourself
ALTER ROLE deltaforge SUPERUSER; -- Or use manual setup below
pg_hba.conf
Ensure your pg_hba.conf allows replication connections:
# TYPE DATABASE USER ADDRESS METHOD
host replication deltaforge 0.0.0.0/0 scram-sha-256
host your_database deltaforge 0.0.0.0/0 scram-sha-256
Replication Slot and Publication
DeltaForge can automatically create the replication slot and publication on first run. Alternatively, create them manually:
-- Create publication for specific tables
CREATE PUBLICATION my_pub FOR TABLE public.orders, public.order_items;
-- Or for all tables
CREATE PUBLICATION my_pub FOR ALL TABLES;
-- Create replication slot
SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput');
Replica Identity
For complete before-images on UPDATE and DELETE operations, set tables to REPLICA IDENTITY FULL:
ALTER TABLE public.orders REPLICA IDENTITY FULL;
ALTER TABLE public.order_items REPLICA IDENTITY FULL;
Without this setting:
- FULL: Complete row data in before-images
- DEFAULT (primary key): Only primary key columns in before-images
- NOTHING: No before-images at all
DeltaForge warns at startup if tables don’t have REPLICA IDENTITY FULL.
Configuration
Set spec.source.type to postgres and provide a config object:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
id | string | Yes | — | Unique identifier for checkpoints and metrics |
dsn | string | Yes | — | PostgreSQL connection string |
slot | string | Yes | — | Replication slot name |
publication | string | Yes | — | Publication name |
tables | array | Yes | — | Table patterns to capture |
start_position | string/object | No | earliest | Where to start when no checkpoint exists |
DSN Formats
DeltaForge accepts both URL-style and key=value DSN formats:
# URL style
dsn: "postgres://user:pass@localhost:5432/mydb"
# Key=value style
dsn: "host=localhost port=5432 user=deltaforge password=pass dbname=mydb"
Table Patterns
The tables field supports flexible pattern matching:
tables:
- public.orders # exact match: schema "public", table "orders"
- public.order_% # LIKE pattern: tables starting with "order_"
- myschema.* # wildcard: all tables in "myschema"
- %.audit_log # cross-schema: "audit_log" table in any schema
- orders # defaults to public schema: "public.orders"
System schemas (pg_catalog, information_schema, pg_toast) are always excluded.
Start Position
Controls where replication begins when no checkpoint exists:
# Start from the earliest available position (slot's restart_lsn)
start_position: earliest
# Start from current WAL position (skip existing data)
start_position: latest
# Start from a specific LSN
start_position:
lsn: "0/16B6C50"
Example
source:
type: postgres
config:
id: orders-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_orders
publication: orders_pub
tables:
- public.orders
- public.order_items
start_position: earliest
Resume Behavior
DeltaForge checkpoints progress using PostgreSQL’s LSN (Log Sequence Number):
- With checkpoint: Resumes from the stored LSN
- Without checkpoint: Uses the slot’s
confirmed_flush_lsnorrestart_lsn - New slot: Starts from
pg_current_wal_lsn()or the configuredstart_position
Checkpoints are stored using the id field as the key.
Type Handling
DeltaForge preserves PostgreSQL’s native type semantics:
| PostgreSQL Type | JSON Representation |
|---|---|
boolean | true / false |
integer, bigint | JSON number |
real, double precision | JSON number |
numeric | JSON string (preserves precision) |
text, varchar | JSON string |
json, jsonb | Parsed JSON object/array |
bytea | {"_base64": "..."} |
uuid | JSON string |
timestamp, date, time | ISO 8601 string |
Arrays (int[], text[], etc.) | JSON array |
| TOAST unchanged | {"_unchanged": true} |
Event Format
Each captured row change produces an event with:
op:insert,update,delete, ortruncatebefore: Previous row state (updates and deletes, requires appropriate replica identity)after: New row state (inserts and updates)table: Fully qualified table name (schema.table)tx_id: PostgreSQL transaction ID (xid)checkpoint: LSN position for resumeschema_version: Schema fingerprintschema_sequence: Monotonic sequence for schema correlation
WAL Management
Logical replication slots prevent WAL segments from being recycled until the consumer confirms receipt. To avoid disk space issues:
- Monitor slot lag: Check
pg_replication_slots.restart_lsnvspg_current_wal_lsn() - Set retention limits: Configure
max_slot_wal_keep_size(PostgreSQL 13+) - Handle stale slots: Drop unused slots with
pg_drop_replication_slot('slot_name')
-- Check slot status and lag
SELECT slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;
Troubleshooting
Connection Issues
If you see authentication errors:
-- Verify user has replication privilege
SELECT rolname, rolreplication FROM pg_roles WHERE rolname = 'deltaforge';
-- Check pg_hba.conf allows replication connections
-- Ensure the line type includes "replication" database
Missing Before-Images
If UPDATE/DELETE events have incomplete before data:
-- Check current replica identity
SELECT relname, relreplident
FROM pg_class
WHERE relname = 'your_table';
-- d = default, n = nothing, f = full, i = index
-- Set to FULL for complete before-images
ALTER TABLE your_table REPLICA IDENTITY FULL;
Slot/Publication Not Found
-- List existing publications
SELECT * FROM pg_publication;
-- List existing slots
SELECT * FROM pg_replication_slots;
-- Create if missing
CREATE PUBLICATION my_pub FOR TABLE public.orders;
SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput');
WAL Disk Usage Growing
-- Check slot lag
SELECT slot_name,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;
-- If slot is inactive and not needed, drop it
SELECT pg_drop_replication_slot('unused_slot');
Logical Replication Not Enabled
-- Check wal_level
SHOW wal_level; -- Should be 'logical'
-- If not, update postgresql.conf and restart PostgreSQL
-- wal_level = logical
Turso Source
⚠️ STATUS: EXPERIMENTAL / PAUSED
The Turso source is not yet ready for production use. Native CDC in Turso/libSQL is still evolving and has limitations:
- CDC is per-connection (only changes from the enabling connection are captured)
- File locking prevents concurrent access
- sqld Docker image doesn’t have CDC support yet
This documentation is retained for reference. The code exists but is not officially supported.
The Turso source captures changes from Turso and libSQL databases. It supports multiple CDC modes to work with different database configurations and Turso versions.
CDC Modes
DeltaForge supports four CDC modes for Turso/libSQL:
| Mode | Description | Requirements |
|---|---|---|
native | Uses Turso’s built-in CDC via turso_cdc table | Turso v0.1.2+ with CDC enabled |
triggers | Shadow tables populated by database triggers | Standard SQLite/libSQL |
polling | Tracks changes via rowid comparison | Any SQLite/libSQL (inserts only) |
auto | Automatic fallback: native → triggers → polling | Any |
Native Mode
Native mode uses Turso’s built-in CDC capabilities. This is the most efficient mode when available.
Requirements:
- Turso database with CDC enabled
- Turso server v0.1.2 or later
How it works:
- Queries the
turso_cdcsystem table for changes - Uses
bin_record_json_object()to extract row data as JSON - Tracks position via change ID in checkpoints
Triggers Mode
Triggers mode uses shadow tables and database triggers to capture changes. This works with standard SQLite and libSQL without requiring native CDC support.
How it works:
- Creates shadow tables (
_df_cdc_{table}) for each tracked table - Installs INSERT/UPDATE/DELETE triggers that write to shadow tables
- Polls shadow tables for new change records
- Cleans up processed records periodically
Polling Mode
Polling mode uses rowid tracking to detect new rows. This is the simplest mode but only captures inserts (not updates or deletes).
How it works:
- Tracks the maximum rowid seen per table
- Queries for rows with rowid greater than last seen
- Emits insert events for new rows
Limitations:
- Only captures INSERT operations
- Cannot detect UPDATE or DELETE
- Requires tables to have accessible rowid (not WITHOUT ROWID tables)
Auto Mode
Auto mode tries each CDC mode in order and uses the first one that works:
- Try native mode (check for
turso_cdctable) - Try triggers mode (check for existing CDC triggers)
- Fall back to polling mode
This is useful for deployments where the database capabilities may vary.
Configuration
source:
type: turso
config:
id: turso-main
url: "libsql://your-db.turso.io"
auth_token: "${TURSO_AUTH_TOKEN}"
tables: ["users", "orders", "order_items"]
cdc_mode: auto
poll_interval_ms: 1000
native_cdc:
level: data
Configuration Options
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
id | string | Yes | — | Logical identifier for metrics and logging |
url | string | Yes | — | Database URL (libsql://, http://, or file path) |
auth_token | string | No | — | Authentication token for Turso cloud |
tables | array | Yes | — | Tables to track (supports wildcards) |
cdc_mode | string | No | auto | CDC mode: native, triggers, polling, auto |
poll_interval_ms | integer | No | 1000 | Polling interval in milliseconds |
native_cdc.level | string | No | data | Native CDC level: binlog or data |
Table Patterns
The tables field supports patterns:
tables:
- users # Exact match
- order% # Prefix match (order, orders, order_items)
- "*" # All tables (excluding system tables)
System tables and DeltaForge infrastructure tables are automatically excluded:
sqlite_*— SQLite system tables_df_*— DeltaForge CDC shadow tables_litestream*— Litestream replication tables_turso*— Turso internal tablesturso_cdc— Turso CDC system table
Native CDC Levels
When using native mode, you can choose the CDC level:
| Level | Description |
|---|---|
data | Only row data changes (default, more efficient) |
binlog | Full binlog-style events with additional metadata |
Examples
Local Development
source:
type: turso
config:
id: local-dev
url: "http://127.0.0.1:8080"
tables: ["users", "orders"]
cdc_mode: auto
poll_interval_ms: 500
Turso Cloud
source:
type: turso
config:
id: turso-prod
url: "libsql://mydb-myorg.turso.io"
auth_token: "${TURSO_AUTH_TOKEN}"
tables: ["*"]
cdc_mode: native
poll_interval_ms: 1000
SQLite File (Polling Only)
source:
type: turso
config:
id: sqlite-file
url: "file:./data/myapp.db"
tables: ["events", "audit_log"]
cdc_mode: polling
poll_interval_ms: 2000
Checkpoints
Turso checkpoints track position differently depending on the CDC mode:
{
"mode": "native",
"last_change_id": 12345,
"table_positions": {}
}
For polling mode, positions are tracked per-table:
{
"mode": "polling",
"last_change_id": null,
"table_positions": {
"users": 1000,
"orders": 2500
}
}
Schema Loading
The Turso source includes a schema loader that:
- Queries
PRAGMA table_info()for column metadata - Detects SQLite type affinities (INTEGER, TEXT, REAL, BLOB, NUMERIC)
- Identifies primary keys and autoincrement columns
- Handles WITHOUT ROWID tables
- Checks for existing CDC triggers
Schema information is available via the REST API at /pipelines/{name}/schemas.
Notes
- WITHOUT ROWID tables: Polling mode cannot track WITHOUT ROWID tables. Use triggers or native mode instead.
- Type affinity: SQLite uses type affinity rather than strict types. The schema loader maps declared types to SQLite affinities.
- Trigger cleanup: In triggers mode, processed change records are cleaned up automatically based on checkpoint position.
- Connection handling: The source maintains a single connection and reconnects automatically on failure.
Sinks
Sinks receive batches from the coordinator after processors run. Each sink lives under spec.sinks and can be marked as required or best-effort via the required flag. Checkpoint behavior is governed by the pipeline’s commit policy.
Current built-in sinks:
Multiple sinks in one pipeline
You can combine multiple sinks in one pipeline to fan out events to different destinations. However, multi-sink pipelines introduce complexity that requires careful consideration.
Why multiple sinks are challenging
Different performance characteristics: Kafka might handle 100K events/sec while a downstream HTTP webhook processes 100/sec. The slowest sink becomes the bottleneck for the entire pipeline.
Independent failure modes: Each sink can fail independently. Redis might be healthy while Kafka experiences broker failures. Without proper handling, a single sink failure could block the entire pipeline or cause data loss.
No distributed transactions: DeltaForge cannot atomically commit across heterogeneous systems. If Kafka succeeds but Redis fails mid-batch, you face a choice: retry Redis (risking duplicates in Kafka) or skip Redis (losing data there).
Checkpoint semantics: The checkpoint represents “how far we’ve processed from the source.” With multiple sinks, when is it safe to advance? After one sink succeeds? All of them? A majority?
The required flag
The required flag on each sink determines whether that sink must acknowledge successful delivery before the checkpoint advances:
sinks:
- type: kafka
config:
id: primary-kafka
required: true # Must succeed for checkpoint to advance
- type: redis
config:
id: cache-redis
required: false # Best-effort; failures don't block checkpoint
When required: true (default): The sink must acknowledge the batch before the checkpoint can advance. If this sink fails, the pipeline blocks and retries until it succeeds or the operator intervenes.
When required: false: The sink is best-effort. Failures are logged but don’t prevent the checkpoint from advancing. Use this for non-critical destinations where some data loss is acceptable.
Commit policy
The commit_policy works with the required flag to determine checkpoint behavior:
| Policy | Behavior |
|---|---|
all | Every sink (regardless of required flag) must acknowledge |
required | Only sinks with required: true must acknowledge (default) |
quorum | At least N sinks must acknowledge |
commit_policy:
mode: required # Only wait for required sinks
sinks:
- type: kafka
config:
required: true # Checkpoint waits for this
- type: redis
config:
required: false # Checkpoint doesn't wait for this
- type: nats
config:
required: true # Checkpoint waits for this
Practical patterns
Primary + secondary: One critical sink (Kafka for durability) marked required: true, with secondary sinks (Redis for caching) marked required: false.
Quorum for redundancy: Three sinks with commit_policy.mode: quorum and quorum: 2. Checkpoint advances when any two succeed, providing fault tolerance.
All-or-nothing: Use commit_policy.mode: all when every destination is critical and you need the strongest consistency guarantee (at the cost of availability or rate of delivery).
Redis sink
The Redis sink publishes events to a Redis Stream for real-time consumption with consumer groups.
When to use Redis
Redis Streams shine when you need low-latency event delivery with simple operational requirements and built-in consumer group support.
Real-world applications
| Use Case | Description |
|---|---|
| Real-time notifications | Push database changes instantly to WebSocket servers for live UI updates |
| Cache invalidation | Trigger cache eviction when source records change; keep Redis cache consistent |
| Session synchronization | Replicate user session changes across application instances in real-time |
| Rate limiting state | Stream counter updates for distributed rate limiting decisions |
| Live dashboards | Feed real-time metrics and KPIs to dashboard backends |
| Job queuing | Use CDC events to trigger background job processing with consumer groups |
| Feature flags | Propagate feature flag changes instantly across all application instances |
Pros and cons
| Pros | Cons |
|---|---|
| ✅ Ultra-low latency - Sub-millisecond publish; ideal for real-time apps | ❌ Memory-bound - All data in RAM; expensive for high-volume retention |
| ✅ Simple operations - Single binary, minimal configuration | ❌ Limited retention - Not designed for long-term event storage |
| ✅ Consumer groups - Built-in competing consumers with acknowledgements | ❌ Durability trade-offs - AOF/RDB persistence has limitations |
| ✅ Familiar tooling - redis-cli, widespread client library support | ❌ Single-threaded - CPU-bound for very high throughput |
| ✅ Versatile - Combine with caching, pub/sub, and data structures | ❌ No native replay - XRANGE exists but no offset management |
| ✅ Atomic operations - MULTI/EXEC for transactional guarantees | ❌ Cluster complexity - Sharding requires careful key design |
Configuration
|
|
Consuming events
Consumer groups (recommended)
# Create consumer group
redis-cli XGROUP CREATE orders.events mygroup $ MKSTREAM
# Read as consumer (blocking)
redis-cli XREADGROUP GROUP mygroup consumer1 BLOCK 5000 COUNT 10 STREAMS orders.events >
# Acknowledge processing
redis-cli XACK orders.events mygroup 1234567890123-0
# Check pending (unacknowledged) messages
redis-cli XPENDING orders.events mygroup
Simple subscription
# Read latest entries
redis-cli XREAD COUNT 10 STREAMS orders.events 0-0
# Block for new entries
redis-cli XREAD BLOCK 5000 STREAMS orders.events $
Go consumer example
import "github.com/redis/go-redis/v9"
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
// Create consumer group (once)
rdb.XGroupCreateMkStream(ctx, "orders.events", "mygroup", "0")
for {
streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "mygroup",
Consumer: "worker1",
Streams: []string{"orders.events", ">"},
Count: 10,
Block: 5 * time.Second,
}).Result()
if err != nil {
continue
}
for _, stream := range streams {
for _, msg := range stream.Messages {
var event Event
json.Unmarshal([]byte(msg.Values["event"].(string)), &event)
process(event)
rdb.XAck(ctx, "orders.events", "mygroup", msg.ID)
}
}
}
Rust consumer example
#![allow(unused)]
fn main() {
use redis::AsyncCommands;
let client = redis::Client::open("redis://localhost:6379")?;
let mut con = client.get_async_connection().await?;
// Create consumer group
let _: () = redis::cmd("XGROUP")
.arg("CREATE").arg("orders.events").arg("mygroup").arg("0").arg("MKSTREAM")
.query_async(&mut con).await.unwrap_or(());
loop {
let results: Vec<StreamReadReply> = con.xread_options(
&["orders.events"],
&[">"],
&StreamReadOptions::default()
.group("mygroup", "worker1")
.count(10)
.block(5000)
).await?;
for stream in results {
for msg in stream.ids {
let event: Event = serde_json::from_str(&msg.map["event"])?;
process(event);
con.xack("orders.events", "mygroup", &[&msg.id]).await?;
}
}
}
}
Python consumer example
import redis
import json
r = redis.Redis.from_url("redis://localhost:6379")
# Create consumer group (once)
try:
r.xgroup_create("orders.events", "mygroup", id="0", mkstream=True)
except redis.ResponseError:
pass # Group already exists
# Consume events
while True:
events = r.xreadgroup("mygroup", "worker1", {"orders.events": ">"}, count=10, block=5000)
for stream, messages in events:
for msg_id, data in messages:
event = json.loads(data[b"event"])
process(event)
r.xack("orders.events", "mygroup", msg_id)
Failure modes
| Failure | Symptoms | DeltaForge behavior | Resolution |
|---|---|---|---|
| Server unavailable | Connection refused | Retries with backoff; blocks checkpoint | Restore Redis; check network |
| Authentication failure | NOAUTH / WRONGPASS | Fails fast, no retry | Fix auth details in URI |
| OOM (Out of Memory) | OOM command not allowed | Fails batch; retries | Increase maxmemory; enable eviction or trim streams |
| Stream doesn’t exist | Auto-created by XADD | No failure | N/A (XADD creates stream) |
| Connection timeout | Command hangs | Timeout after configured duration | Check network; increase timeout |
| Cluster MOVED/ASK | Redirect errors | Automatic redirect (if cluster mode) | Ensure cluster client configured |
| Replication lag | Writes to replica fail | Fails with READONLY | Write to master only |
| Max stream length | If MAXLEN enforced | Oldest entries trimmed | Expected behavior; not a failure |
| Network partition | Intermittent timeouts | Retries; may have gaps | Restore network |
Failure scenarios and data guarantees
Redis OOM during batch delivery
- DeltaForge sends batch of 100 events via pipeline
- 50 events written, Redis hits maxmemory
- Pipeline fails atomically (all or nothing per pipeline)
- DeltaForge retries entire batch
- If OOM persists: batch blocked until memory available
- Checkpoint only saved after ALL events acknowledged
DeltaForge crash after XADD, before checkpoint
- Batch written to Redis stream successfully
- DeltaForge crashes before saving checkpoint
- On restart: replays from last checkpoint
- Result: Duplicate events in stream (at-least-once)
- Consumer must handle idempotently (check event.id)
Redis failover (Sentinel/Cluster)
- Master fails, Sentinel promotes replica
- In-flight XADD may fail with connection error
- DeltaForge reconnects to new master
- Retries failed batch
- Possible duplicates if original write succeeded
Handling duplicates in consumers
# Idempotent consumer using event ID
processed_ids = set() # Or use Redis SET for distributed dedup
for msg_id, data in messages:
event = json.loads(data[b"event"])
event_id = event["id"]
if event_id in processed_ids:
r.xack("orders.events", "mygroup", msg_id)
continue # Skip duplicate
process(event)
processed_ids.add(event_id)
r.xack("orders.events", "mygroup", msg_id)
Monitoring
DeltaForge exposes these metrics for Redis sink monitoring:
# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink} # Events delivered
deltaforge_sink_batch_total{pipeline,sink} # Batches delivered
deltaforge_sink_latency_seconds{pipeline,sink} # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"} # Stage timing
For Redis server visibility, use Redis’s built-in monitoring:
# Monitor commands in real-time
redis-cli MONITOR
# Get server stats
redis-cli INFO stats
# Check memory usage
redis-cli INFO memory
# Stream-specific info
redis-cli XINFO STREAM orders.events
redis-cli XINFO GROUPS orders.events
Stream management
# Check stream length
redis-cli XLEN orders.events
# Trim to last 10000 entries (approximate)
redis-cli XTRIM orders.events MAXLEN ~ 10000
# Trim to exact length
redis-cli XTRIM orders.events MAXLEN 10000
# View consumer group info
redis-cli XINFO GROUPS orders.events
# Check pending messages
redis-cli XPENDING orders.events mygroup
# Claim stuck messages (after 60 seconds)
redis-cli XCLAIM orders.events mygroup worker2 60000 <message-id>
# Delete processed messages (careful!)
redis-cli XDEL orders.events <message-id>
Notes
- Redis Streams provide at-least-once delivery with consumer group acknowledgements
- Use
MAXLEN ~trimming to prevent unbounded memory growth (approximate is faster) - Consider Redis Cluster for horizontal scaling with multiple streams
- Combine with Redis pub/sub for fan-out to ephemeral subscribers
- For durability, enable AOF persistence with
appendfsync everysecoralways - Monitor memory usage closely; Redis will reject writes when
maxmemoryis reached
Kafka sink
The Kafka sink publishes batches to a Kafka topic using rdkafka.
When to use Kafka
Kafka excels as the backbone for event-driven architectures where durability, ordering, and replay capabilities are critical.
Real-world applications
| Use Case | Description |
|---|---|
| Event sourcing | Store all state changes as an immutable log; rebuild application state by replaying events |
| Microservices integration | Decouple services with async messaging; each service consumes relevant topics |
| Real-time analytics pipelines | Feed CDC events to Spark, Flink, or ksqlDB for streaming transformations |
| Data lake ingestion | Stream database changes to S3/HDFS via Kafka Connect for analytics and ML |
| Audit logging | Capture every database mutation for compliance, debugging, and forensics |
| Cross-datacenter replication | Use MirrorMaker 2 to replicate topics across regions for DR |
Pros and cons
| Pros | Cons |
|---|---|
| ✅ Durability - Configurable replication ensures no data loss | ❌ Operational complexity - Requires ZooKeeper/KRaft, careful tuning |
| ✅ Ordering guarantees - Per-partition ordering with consumer groups | ❌ Latency - Batching and replication add milliseconds of delay |
| ✅ Replay capability - Configurable retention allows reprocessing | ❌ Resource intensive - High disk I/O and memory requirements |
| ✅ Ecosystem - Connect, Streams, Schema Registry, ksqlDB | ❌ Learning curve - Partitioning, offsets, consumer groups to master |
| ✅ Throughput - Handles millions of messages per second | ❌ Cold start - Cluster setup and topic configuration overhead |
| ✅ Exactly-once semantics - Transactions for critical workloads | ❌ Cost - Managed services can be expensive at scale |
Configuration
|
|
Recommended client_conf settings
| Setting | Recommended | Description |
|---|---|---|
acks | all | Wait for all replicas for durability |
message.timeout.ms | 30000 | Total time to deliver a message |
retries | 2147483647 | Retry indefinitely (with backoff) |
enable.idempotence | true | Prevent duplicates on retry |
compression.type | lz4 | Balance between CPU and bandwidth |
Consuming events
Kafka CLI
# Consume from beginning
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--from-beginning
# Consume with consumer group
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--group deltaforge-consumers
Go consumer example
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest
group, _ := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
for {
err := group.Consume(ctx, []string{"orders"}, &handler{})
if err != nil {
log.Printf("Consumer error: %v", err)
}
}
type handler struct{}
func (h *handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
var event Event
json.Unmarshal(msg.Value, &event)
process(event)
session.MarkMessage(msg, "")
}
return nil
}
Rust consumer example
#![allow(unused)]
fn main() {
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", "my-group")
.set("auto.offset.reset", "earliest")
.create()?;
consumer.subscribe(&["orders"])?;
loop {
match consumer.recv().await {
Ok(msg) => {
let payload = msg.payload_view::<str>().unwrap()?;
let event: Event = serde_json::from_str(payload)?;
process(event);
consumer.commit_message(&msg, CommitMode::Async)?;
}
Err(e) => eprintln!("Kafka error: {}", e),
}
}
}
Python consumer example
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
event = message.value
process(event)
consumer.commit()
Failure modes
| Failure | Symptoms | DeltaForge behavior | Resolution |
|---|---|---|---|
| Broker unavailable | Connection refused, timeout | Retries with backoff; blocks checkpoint | Restore broker; check network |
| Topic not found | UnknownTopicOrPartition | Fails batch; retries | Create topic or enable auto-create |
| Authentication failure | SaslAuthenticationFailed | Fails fast, no retry | Fix credentials in config |
| Authorization failure | TopicAuthorizationFailed | Fails fast, no retry | Grant ACLs for producer |
| Message too large | MessageSizeTooLarge | Fails message permanently | Increase message.max.bytes or filter large events |
| Leader election | NotLeaderForPartition | Automatic retry after metadata refresh | Wait for election; usually transient |
| Disk full | KafkaStorageException | Retries indefinitely | Add disk space; purge old segments |
| Network partition | Timeouts, partial failures | Retries; may produce duplicates | Restore network; idempotence prevents dups |
| SSL/TLS errors | Handshake failures | Fails fast | Fix certificates, verify truststore |
Failure scenarios and data guarantees
Broker failure during batch delivery
- DeltaForge sends batch of 100 events
- 50 events delivered, broker crashes
- rdkafka detects failure, retries remaining 50
- If idempotence enabled: no duplicates
- If not: possible duplicates of events near failure point
- Checkpoint only saved after ALL events acknowledged
DeltaForge crash after Kafka ack, before checkpoint
- Batch delivered to Kafka successfully
- DeltaForge crashes before saving checkpoint
- On restart: replays from last checkpoint
- Result: Duplicate events in Kafka (at-least-once)
- Consumer must handle idempotently
Monitoring recommendations
DeltaForge exposes these metrics for Kafka sink monitoring:
# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink} # Events delivered
deltaforge_sink_batch_total{pipeline,sink} # Batches delivered
deltaforge_sink_latency_seconds{pipeline,sink} # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"} # Stage timing
For deeper Kafka broker visibility, monitor your Kafka cluster directly:
- Broker metrics via JMX or Kafka’s built-in metrics
- Consumer lag via
kafka-consumer-groups.sh - Topic throughput via broker dashboards
Note: Internal
rdkafkaproducer statistics (message queues, broker RTT, etc.) are not currently exposed by DeltaForge. This is a potential future enhancement.
Notes
- Combine Kafka with other sinks to fan out data; use commit policy to control checkpoint behavior
- For exactly-once semantics, ensure your Kafka cluster supports transactions (2.5+)
- Adjust
client_conffor durability (acks=all) or performance based on your requirements - Consider partitioning strategy for ordering guarantees within partitions
- Enable
enable.idempotence=trueto prevent duplicates during retries
NATS sink
The NATS sink publishes events to a NATS JetStream stream for durable, at-least-once delivery.
When to use NATS
NATS JetStream is ideal when you need a lightweight, high-performance messaging system with persistence, without the operational overhead of Kafka.
Real-world applications
| Use Case | Description |
|---|---|
| Edge computing | Lightweight footprint perfect for IoT gateways and edge nodes syncing to cloud |
| Microservices mesh | Request-reply and pub/sub patterns with automatic load balancing |
| Multi-cloud sync | Leaf nodes and superclusters for seamless cross-cloud data replication |
| Kubernetes-native events | NATS Operator for cloud-native deployment; sidecar-friendly architecture |
| Real-time gaming | Low-latency state synchronization for multiplayer game servers |
| Financial data feeds | Stream market data with subject-based routing and wildcards |
| Command and control | Distribute configuration changes and commands to distributed systems |
Pros and cons
| Pros | Cons |
|---|---|
| ✅ Lightweight - Single binary ~20MB; minimal resource footprint | ❌ Smaller ecosystem - Fewer connectors and integrations than Kafka |
| ✅ Simple operations - Zero external dependencies; easy clustering | ❌ Younger persistence - JetStream newer than Kafka’s battle-tested log |
| ✅ Low latency - Sub-millisecond message delivery | ❌ Community size - Smaller community than Kafka or Redis |
| ✅ Flexible patterns - Pub/sub, queues, request-reply, streams | ❌ Tooling maturity - Fewer monitoring and management tools |
✅ Subject hierarchy - Powerful wildcard routing (orders.>, *.events) | ❌ Learning curve - JetStream concepts differ from traditional queues |
| ✅ Multi-tenancy - Built-in accounts and security isolation | ❌ Less enterprise adoption - Fewer case studies at massive scale |
| ✅ Cloud-native - Designed for Kubernetes and distributed systems |
Configuration
|
|
Authentication options
# Credentials file
credentials_file: /etc/nats/creds/user.creds
# Username/password
username: ${NATS_USER}
password: ${NATS_PASSWORD}
# Token
token: ${NATS_TOKEN}
JetStream setup
Before using the NATS sink with JetStream, create a stream that captures your subject:
# Using NATS CLI
nats stream add ORDERS \
--subjects "orders.>" \
--retention limits \
--storage file \
--replicas 3 \
--max-age 7d
# Verify stream
nats stream info ORDERS
Consuming events
NATS CLI
# Subscribe to subject (ephemeral)
nats sub "orders.>"
# Create durable consumer
nats consumer add ORDERS orders-processor \
--pull \
--ack explicit \
--deliver all \
--max-deliver 3 \
--filter "orders.events"
# Consume messages
nats consumer next ORDERS orders-processor --count 10
Go consumer example
nc, _ := nats.Connect("nats://localhost:4222")
js, _ := nc.JetStream()
// Create or bind to consumer
sub, _ := js.PullSubscribe("orders.events", "orders-processor",
nats.Durable("orders-processor"),
nats.AckExplicit(),
)
for {
msgs, _ := sub.Fetch(10, nats.MaxWait(5*time.Second))
for _, msg := range msgs {
var event Event
json.Unmarshal(msg.Data, &event)
process(event)
msg.Ack()
}
}
Rust consumer example
#![allow(unused)]
fn main() {
use async_nats::jetstream;
let client = async_nats::connect("nats://localhost:4222").await?;
let js = jetstream::new(client);
let stream = js.get_stream("ORDERS").await?;
let consumer = stream.get_consumer("orders-processor").await?;
let mut messages = consumer.messages().await?;
while let Some(msg) = messages.next().await {
let msg = msg?;
let event: Event = serde_json::from_slice(&msg.payload)?;
process(event);
msg.ack().await?;
}
}
Monitoring
DeltaForge exposes these metrics for NATS sink monitoring:
# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink} # Events delivered
deltaforge_sink_batch_total{pipeline,sink} # Batches delivered
deltaforge_sink_latency_seconds{pipeline,sink} # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"} # Stage timing
For NATS server visibility, use the NATS CLI or monitoring endpoint:
# Server info
nats server info
# JetStream account info
nats account info
# Stream statistics
nats stream info ORDERS
# Consumer statistics
nats consumer info ORDERS orders-processor
# Real-time event monitoring
nats events
NATS also exposes a monitoring endpoint (default :8222) with JSON stats:
http://localhost:8222/varz- General server statshttp://localhost:8222/jsz- JetStream statshttp://localhost:8222/connz- Connection stats
Subject design patterns
| Pattern | Example | Use Case |
|---|---|---|
| Hierarchical | orders.us.created | Regional routing |
| Wildcard single | orders.*.created | Any region, specific event |
| Wildcard multi | orders.> | All order events |
| Versioned | v1.orders.events | API versioning |
Failure modes
| Failure | Symptoms | DeltaForge behavior | Resolution |
|---|---|---|---|
| Server unavailable | Connection refused | Retries with backoff; blocks checkpoint | Restore NATS; check network |
| Stream not found | stream not found error | Fails batch; no retry | Create stream or remove stream config |
| Authentication failure | authorization violation | Fails fast, no retry | Fix credentials |
| Subject mismatch | no responders (core NATS) | Fails if no subscribers | Add subscribers or use JetStream |
| JetStream disabled | jetstream not enabled | Fails fast | Enable JetStream on server |
| Storage full | insufficient resources | Retries; eventually fails | Add storage; adjust retention |
| Message too large | message size exceeds maximum | Fails message permanently | Increase max_payload or filter large events |
| Cluster partition | Intermittent failures | Retries with backoff | Restore network; wait for quorum |
| Slow consumer | Publish backpressure | Slows down; may timeout | Scale consumers; increase buffer |
| TLS errors | Handshake failures | Fails fast | Fix certificates |
Failure scenarios and data guarantees
NATS server restart during batch delivery
- DeltaForge sends batch of 100 events
- 50 events published, server restarts
- async_nats detects disconnect, starts reconnecting
- After reconnect, DeltaForge retries remaining 50
- JetStream deduplication prevents duplicates (if enabled)
- Checkpoint only saved after ALL events acknowledged
DeltaForge crash after JetStream ack, before checkpoint
- Batch published to JetStream successfully
- DeltaForge crashes before saving checkpoint
- On restart: replays from last checkpoint
- Result: Duplicate events in stream (at-least-once)
- Consumer must handle idempotently (check event.id)
Stream storage exhausted
- JetStream stream hits max_bytes or max_msgs limit
- With
discard: old→ oldest messages removed, publish succeeds - With
discard: new→ publish rejected - DeltaForge retries on rejection
- Resolution: Increase limits or enable
discard: old
JetStream acknowledgement levels
# Stream configuration affects durability
nats stream add ORDERS \
--replicas 3 \ # R=3 for production
--retention limits \ # or 'workqueue' for single consumer
--discard old \ # Remove oldest when full
--max-age 7d \ # Auto-expire after 7 days
--storage file # Persistent (vs memory)
| Replicas | Guarantee | Use Case |
|---|---|---|
| R=1 | Single node; lost if node fails | Development, non-critical |
| R=3 | Survives 1 node failure | Production default |
| R=5 | Survives 2 node failures | Critical data |
Handling duplicates in consumers
// Use event ID for idempotency
processedIDs := make(map[string]bool) // Or use Redis/DB
for _, msg := range msgs {
var event Event
json.Unmarshal(msg.Data, &event)
if processedIDs[event.ID] {
msg.Ack() // Already processed
continue
}
if err := process(event); err == nil {
processedIDs[event.ID] = true
}
msg.Ack()
}
Notes
- When
streamis specified, the sink verifies the stream exists at connection time - Without
stream, events are published to core NATS (no persistence guarantees) - Connection pooling ensures efficient reuse across batches
- Use replicated streams (
--replicas 3) for production durability - Combine with other sinks to fan out data; use commit policy to control checkpoint behavior
- JetStream provides exactly-once semantics when combined with message deduplication
Architecture
This document describes DeltaForge’s internal architecture, design decisions, and how the major components interact.
Design Principles
Source-Owned Semantics
DeltaForge avoids imposing a universal data model on all sources. Instead, each database source defines and owns its schema semantics:
- MySQL captures MySQL-specific types, collations, and engine information
- Future sources (PostgreSQL, MongoDB, ClickHouse, Turso) will capture their native semantics
This approach means downstream consumers receive schemas that accurately reflect the source database rather than a lowest-common-denominator normalization.
Delivery Guarantees First
The checkpoint system is designed around a single invariant:
Checkpoints are only saved after events have been successfully delivered.
This ordering guarantees at-least-once delivery. A crash between checkpoint and delivery would lose events; DeltaForge prevents this by always checkpointing after sink acknowledgment.
Configuration Over Code
Pipelines are defined declaratively in YAML. This enables:
- Version-controlled pipeline definitions
- Environment-specific configuration via variable expansion
- Rapid iteration without recompilation
Component Overview
┌─────────────────────────────────────────────────────────────────┐
│ DeltaForge Runtime │
├─────────────┬─────────────┬─────────────┬─────────────┬─────────┤
│ Sources │ Schema │ Coordinator │ Sinks │ Control │
│ │ Registry │ + Batch │ │ Plane │
├─────────────┼─────────────┼─────────────┼─────────────┼─────────┤
│ MySQL │ InMemory │ Batching │ Kafka │ REST API│
│ PostgreSQL │ Registry │ Commit │ Redis │ Metrics │
│ (future) │ │ Policy │ (future) │ Health │
└─────────────┴─────────────┴─────────────┴─────────────┴─────────┘
│
┌──────────┴──────────┐
│ Checkpoint Store │
│ (File/SQLite/Mem) │
└─────────────────────┘
Data Flow
Event Lifecycle
1. Source reads from database log (binlog/WAL)
│
▼
2. Schema loader maps table_id to schema
│
▼
3. Event constructed with before/after images
│
▼
4. Event sent to coordinator via channel
│
▼
5. Coordinator batches events
│
▼
6. Processors transform batch (JavaScript)
│
▼
7. Sinks deliver batch concurrently
│
▼
8. Commit policy evaluated
│
▼
9. Checkpoint saved (if policy satisfied)
Event Structure
Every CDC event shares a common structure:
#![allow(unused)]
fn main() {
pub struct Event {
pub source_id: String, // Source identifier
pub database: String, // Database name
pub table: String, // Table name
pub op: Op, // Insert, Update, Delete, Ddl
pub tx_id: Option<u64>, // Source transaction ID
pub before: Option<Value>, // Previous row state
pub after: Option<Value>, // New row state
pub schema_version: Option<String>, // Schema fingerprint
pub schema_sequence: Option<u64>, // For replay lookups
pub ddl: Option<Value>, // DDL payload if op == Ddl
pub timestamp: DateTime<Utc>, // Event timestamp
pub checkpoint: Option<CheckpointMeta>, // Position info
pub size_bytes: usize, // For batching
}
}
Schema Registry
Role
The schema registry serves three purposes:
- Map table IDs to schemas: Binlog events reference tables by ID; the registry resolves these to full schema metadata
- Detect schema changes: Fingerprint comparison identifies when DDL has modified a table
- Enable replay: Sequence numbers correlate events with the schema active when they were produced
Schema Registration Flow
1. Schema loader fetches from INFORMATION_SCHEMA
│
▼
2. Compute fingerprint (SHA-256 of structure)
│
▼
3. Check registry for existing schema with same fingerprint
│
├── Found: Return existing version (idempotent)
│
└── Not found: Allocate new version number
│
▼
4. Store with: version, fingerprint, JSON, timestamp, sequence, checkpoint
Sequence Numbers
The registry maintains a global monotonic counter. Each schema version receives a sequence number at registration. Events carry this sequence, enabling accurate schema lookup during replay:
Timeline:
─────────────────────────────────────────────────────────────►
│ │ │
Schema v1 Schema v2 Schema v3
(seq=1) (seq=15) (seq=42)
│ │ │
└──events 1-14─┘──events 15-41─────┘──events 42+──►
Replay at seq=20: Use schema v2 (registered at seq=15, before seq=42)
Checkpoint Store
Timing Guarantee
The checkpoint is saved only after sinks acknowledge delivery:
┌────────┐ events ┌────────┐ ack ┌────────────┐
│ Source │ ─────────▶ │ Sink │ ───────▶ │ Checkpoint │
└────────┘ └────────┘ │ Store │
└────────────┘
If the process crashes after sending to sink but before checkpoint, events will be replayed. This is the “at-least-once” guarantee - duplicates are possible, but loss is not.
Storage Backends
| Backend | Versioning | Persistence | Use Case |
|---|---|---|---|
FileCheckpointStore | No | Yes | Production (simple) |
SqliteCheckpointStore | Yes | Yes | Development, debugging |
MemCheckpointStore | No | No | Testing |
Checkpoint-Schema Correlation
When registering schemas, the current checkpoint can be attached:
#![allow(unused)]
fn main() {
registry.register_with_checkpoint(
tenant, db, table,
&fingerprint,
&schema_json,
Some(&checkpoint_bytes), // Current binlog position
).await?;
}
This creates a link between schema versions and source positions, enabling coordinated rollback and point-in-time schema queries.
Coordinator
The coordinator orchestrates event flow between source and sinks:
Batching
Events are accumulated until a threshold triggers flush:
max_events: Event count limitmax_bytes: Total serialized size limitmax_ms: Time since batch startedrespect_source_tx: Never split source transactions
Commit Policy
When multiple sinks are configured, the commit policy determines when the checkpoint advances:
#![allow(unused)]
fn main() {
match policy {
All => required_acks == total_sinks,
Required => required_acks == sinks.filter(|s| s.required).count(),
Quorum(n) => required_acks >= n,
}
}
Processor Pipeline
Processors run in declared order, transforming batches:
events ──▶ Processor 1 ──▶ Processor 2 ──▶ ... ──▶ transformed events
Each processor can filter, transform, or enrich events. The JavaScript processor uses deno_core for sandboxed execution.
Hot Paths
Critical performance paths have been optimized:
- Event construction - Minimal allocations, reuse buffers
- Checkpoint serialization - Opaque bytes avoid repeated JSON encoding
- Sink delivery - Batch operations reduce round trips
- Schema lookup - In-memory cache with stable fingerprints
Benchmarking
Performance is tracked via:
- Micro-benchmarks for specific operations
- End-to-end benchmarks using the Coordinator component
- Regression detection in CI
Future Architecture
Planned enhancements:
- Persistent schema registry: SQLite backend initially, mirroring the checkpoint storage pattern
- Production storage backends: PostgreSQL, S3/GCS for cloud-native and HA deployments
- Event store: Time-based replay and schema evolution
- Distributed coordination: Leader election for HA deployments
- Additional sources: Turso/SQLite, ClickHouse, MongoDB
Checkpoints
Checkpoints record pipeline progress so ingestion can resume from the last successfully delivered position. DeltaForge’s checkpoint system is designed to guarantee at-least-once delivery by carefully coordinating when checkpoints are saved relative to event delivery.
Core Guarantee: At-Least-Once Delivery
The fundamental rule of DeltaForge checkpointing:
Checkpoints are only saved after events have been successfully delivered to sinks.
This ordering is critical. If a checkpoint were saved before events were delivered, a crash between checkpoint save and delivery would cause those events to be lost - the pipeline would resume from a position past events that were never delivered.
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Source │────▶│ Processor │────▶│ Sink │────▶│ Checkpoint │
│ (read) │ │ (transform) │ │ (deliver) │ │ (save) │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│
▼
Sink acknowledges
successful delivery
│
▼
Then checkpoint
is saved
What This Means in Practice
- On clean shutdown: All buffered events are flushed and checkpointed
- On crash: Events since the last checkpoint are replayed (hence “at-least-once”)
- Duplicate handling: Consumers should be idempotent or use deduplication
Checkpoint Storage
Storage Backends
DeltaForge supports pluggable checkpoint storage:
| Backend | Description | Use Case |
|---|---|---|
FileCheckpointStore | JSON file on disk | Development, simple deployments |
MemCheckpointStore | In-memory (ephemeral) | Testing |
SqliteCheckpointStore | SQLite with versioning | Single-instance production |
The default stores checkpoints to ./data/df_checkpoints.json.
For HA deployments requiring shared state across instances, additional backends (PostgreSQL, S3/GCS) are planned but not yet implemented.
Storage Interface
All backends implement the CheckpointStore trait:
#![allow(unused)]
fn main() {
#[async_trait]
pub trait CheckpointStore: Send + Sync {
/// Get raw checkpoint bytes
async fn get_raw(&self, source_id: &str) -> CheckpointResult<Option<Vec<u8>>>;
/// Store raw checkpoint bytes
async fn put_raw(&self, source_id: &str, bytes: &[u8]) -> CheckpointResult<()>;
/// Delete checkpoint
async fn delete(&self, source_id: &str) -> CheckpointResult<bool>;
/// List all checkpoint keys
async fn list(&self) -> CheckpointResult<Vec<String>>;
/// Whether this backend supports versioning
fn supports_versioning(&self) -> bool;
}
}
Typed Access
The CheckpointStoreExt trait provides convenient typed access:
#![allow(unused)]
fn main() {
// Store typed checkpoint (automatically serialized to JSON)
store.put("pipeline-1", MySqlCheckpoint {
file: "binlog.000042".into(),
pos: 12345,
gtid_set: None,
}).await?;
// Retrieve typed checkpoint
let cp: Option<MySqlCheckpoint> = store.get("pipeline-1").await?;
}
Checkpoint Contents
MySQL Checkpoints
MySQL checkpoints track binlog position:
#![allow(unused)]
fn main() {
pub struct MySqlCheckpoint {
pub file: String, // e.g., "binlog.000042"
pub pos: u64, // Byte position in binlog file
pub gtid_set: Option<String>, // GTID set if enabled
}
}
The checkpoint is taken from the last event in a successfully delivered batch, ensuring resumption starts exactly where delivery left off.
Checkpoint in Events
Events carry checkpoint metadata for end-to-end tracking:
#![allow(unused)]
fn main() {
pub struct Event {
// ... other fields ...
/// Checkpoint info from source
pub checkpoint: Option<CheckpointMeta>,
}
pub enum CheckpointMeta {
Opaque(Arc<[u8]>), // Serialized source-specific checkpoint
}
}
Using Arc<[u8]> allows zero-copy sharing of checkpoint data across the pipeline without repeated allocations.
Commit Policy
When multiple sinks are configured, the commit policy determines when checkpoints advance:
| Policy | Behavior |
|---|---|
all | Every sink must acknowledge |
required | Only required: true sinks must acknowledge |
quorum | At least N sinks must acknowledge |
Configuration
spec:
batch:
commit_policy: required # or: all, quorum
quorum: 2 # for quorum policy
sinks:
- type: kafka
required: true # Must succeed for checkpoint
config: { ... }
- type: redis
required: false # Best-effort, doesn't block checkpoint
config: { ... }
Commit Logic
The coordinator tracks acknowledgments from each sink and only advances the checkpoint when the policy is satisfied:
#![allow(unused)]
fn main() {
// Simplified commit logic
let required_acks = sinks.iter().filter(|s| s.required).count();
let actual_acks = batch.acknowledgments.iter().filter(|a| a.success).count();
if actual_acks >= required_acks {
checkpoint_store.put(&key, batch.last_checkpoint).await?;
} else {
warn!("commit policy not satisfied; checkpoint NOT advanced");
}
}
Batching and Checkpoints
Checkpoints are saved at batch boundaries, not per-event. This provides:
- Efficiency: Fewer checkpoint writes
- Atomicity: Batch success or failure is all-or-nothing
- Transaction preservation:
respect_source_tx: truekeeps source transactions in single batches
Batch Configuration
spec:
batch:
max_events: 1000 # Flush after N events
max_bytes: 8388608 # Flush after 8MB
max_ms: 200 # Flush after 200ms
respect_source_tx: true # Never split source transactions
max_inflight: 1 # Concurrent batches in flight
Checkpoint Timing in Batches
Within a batch:
- Events are collected until a threshold is reached
- Processors transform the batch
- Sinks receive and deliver events
- Sinks acknowledge success/failure
- Commit policy is evaluated
- If satisfied, checkpoint advances to the last event’s position
Versioned Checkpoints
The SQLite backend supports checkpoint versioning for:
- Rollback: Return to a previous checkpoint position
- Audit: Track checkpoint progression over time
- Debugging: Understand checkpoint history during incident analysis
Version Operations
#![allow(unused)]
fn main() {
// Store with versioning
let version = store.put_raw_versioned("pipeline-1", bytes).await?;
// Get specific version
let old_bytes = store.get_version_raw("pipeline-1", version - 1).await?;
// List all versions
let versions = store.list_versions("pipeline-1").await?;
// Rollback to previous version
store.rollback("pipeline-1", target_version).await?;
}
Version Metadata
#![allow(unused)]
fn main() {
pub struct VersionInfo {
pub version: u64,
pub created_at: DateTime<Utc>,
pub size_bytes: usize,
}
}
Schema-Checkpoint Correlation
For replay scenarios, DeltaForge correlates schemas with checkpoints. When a schema is registered, it can optionally include the current checkpoint position:
#![allow(unused)]
fn main() {
registry.register_with_checkpoint(
tenant, db, table,
&fingerprint,
&schema_json,
Some(checkpoint_bytes), // Binlog position when schema was observed
).await?;
}
This enables:
- Accurate replay: Events are interpreted with the schema active at their checkpoint position
- Schema time-travel: Find what schema was active at any checkpoint
- Coordinated rollback: Roll back both checkpoint and schema state together
Operational Considerations
Clean Shutdown
Before maintenance, cleanly stop pipelines to flush checkpoints:
# Pause ingestion
curl -X POST http://localhost:8080/pipelines/{name}/pause
# Wait for in-flight batches to complete
sleep 5
# Stop pipeline
curl -X POST http://localhost:8080/pipelines/{name}/stop
Checkpoint Inspection
View current checkpoint state:
# List all checkpoints
curl http://localhost:8080/checkpoints
# Get specific pipeline checkpoint
curl http://localhost:8080/checkpoints/{pipeline-name}
Monitoring
Key metrics to monitor:
deltaforge_checkpoint_lag_seconds: Time since last checkpointdeltaforge_checkpoint_bytes: Size of last checkpointdeltaforge_batch_commit_total: Successful batch commitsdeltaforge_batch_commit_failed_total: Failed commits (policy not satisfied)
Recovery Scenarios
| Scenario | Behavior |
|---|---|
| Process crash | Resume from last checkpoint, replay events |
| Network partition (sink unreachable) | Retry delivery, checkpoint doesn’t advance |
| Corrupt checkpoint file | Manual intervention required |
| Source unavailable at checkpoint | Retry connection with backoff |
Best Practices
- Use durable storage for production checkpoint backends (not in-memory)
- Monitor checkpoint lag to detect stuck pipelines
- Configure appropriate batch sizes — smaller batches mean more frequent checkpoints but more overhead
- Set
required: trueonly on sinks that must succeed for correctness - Test recovery by killing pipelines and verifying no events are lost
- Back up checkpoint files if using file-based storage
Future Enhancements
Planned checkpoint improvements:
- PostgreSQL backend for HA deployments with shared state
- S3/GCS backends for cloud-native deployments
- Distributed coordination for multi-instance leader election
- Checkpoint compression for large state
- Point-in-time recovery with event store integration
Schema Registry
DeltaForge’s schema registry tracks table schemas across time, enabling accurate event interpretation during replay and providing change detection for DDL operations.
Design Philosophy: Source-Owned Schemas
DeltaForge takes a fundamentally different approach to schema handling than many CDC tools. Rather than normalizing all database schemas into a universal type system, each source defines and owns its schema semantics.
This means:
- MySQL schemas capture MySQL semantics - column types like
bigint(20) unsigned,varchar(255), andjsonare preserved exactly as MySQL defines them - PostgreSQL schemas capture PostgreSQL semantics - arrays, custom types, and pg-specific attributes remain intact
- No lossy normalization - you don’t lose precision or database-specific information by forcing everything into a common format
This design avoids the maintenance burden of keeping a universal type system synchronized across all databases, and it ensures that downstream consumers receive schemas that accurately reflect the source database’s capabilities and constraints.
The SourceSchema Trait
Every CDC source implements the SourceSchema trait, which provides a common interface for fingerprinting and column access while allowing source-specific schema representations:
#![allow(unused)]
fn main() {
pub trait SourceSchema: Serialize + DeserializeOwned + Send + Sync {
/// Source type identifier (e.g., "mysql", "postgres", "mongodb").
fn source_kind(&self) -> &'static str;
/// Content-addressable fingerprint for change detection.
/// Two schemas with the same fingerprint are identical.
fn fingerprint(&self) -> String;
/// Column/field names in ordinal order.
fn column_names(&self) -> Vec<&str>;
/// Primary key column names.
fn primary_key(&self) -> Vec<&str>;
/// Human-readable description.
fn describe(&self) -> String;
}
}
Fingerprinting
Schema fingerprints use SHA-256 hashing over JSON-serialized content to provide:
- Stability - the same schema always produces the same fingerprint
- Change detection - any structural change produces a different fingerprint
- Content-addressability - fingerprints can be used as cache keys or deduplication identifiers
#![allow(unused)]
fn main() {
pub fn compute_fingerprint<T: Serialize>(value: &T) -> String {
let json = serde_json::to_vec(value).unwrap_or_default();
let hash = Sha256::digest(&json);
format!("sha256:{}", hex::encode(hash))
}
}
The fingerprint only includes structurally significant fields. For MySQL, this means columns and primary key are included, but engine and charset are excluded since they don’t affect how CDC events should be interpreted.
MySQL Schema Implementation
The MySqlTableSchema struct captures comprehensive MySQL table metadata:
#![allow(unused)]
fn main() {
pub struct MySqlTableSchema {
/// Columns in ordinal order
pub columns: Vec<MySqlColumn>,
/// Primary key column names
pub primary_key: Vec<String>,
/// Storage engine (InnoDB, MyISAM, etc.)
pub engine: Option<String>,
/// Default charset
pub charset: Option<String>,
/// Default collation
pub collation: Option<String>,
}
pub struct MySqlColumn {
pub name: String,
pub column_type: String, // e.g., "bigint(20) unsigned"
pub data_type: String, // e.g., "bigint"
pub nullable: bool,
pub ordinal_position: u32,
pub default_value: Option<String>,
pub extra: Option<String>, // e.g., "auto_increment"
pub comment: Option<String>,
pub char_max_length: Option<i64>,
pub numeric_precision: Option<i64>,
pub numeric_scale: Option<i64>,
}
}
Schema information is fetched from INFORMATION_SCHEMA at startup and cached for the pipeline’s lifetime.
Schema Registry Architecture
The schema registry serves three core functions:
- Version tracking - maintains a history of schema versions per table
- Change detection - compares fingerprints to detect DDL changes
- Replay correlation - associates schemas with checkpoint positions for accurate replay
Schema Versions
Each registered schema version includes:
| Field | Description |
|---|---|
version | Per-table version number (starts at 1) |
hash | Content fingerprint for deduplication |
schema_json | Full schema as JSON |
registered_at | Registration timestamp |
sequence | Global monotonic sequence number |
checkpoint | Source checkpoint at registration time |
Sequence Numbers for Replay
The registry maintains a global monotonic sequence counter. When a schema is registered, it receives the next sequence number. Events carry this sequence number, enabling the replay engine to look up the correct schema version:
#![allow(unused)]
fn main() {
// During replay: find schema active at event's sequence
let schema = registry.get_at_sequence(tenant, db, table, event.schema_sequence);
}
This ensures events are always interpreted with the schema that was active when they were produced, even if the table structure has since changed.
Checkpoint Correlation
Schemas can be registered with an associated checkpoint, creating a correlation between schema versions and source positions:
#![allow(unused)]
fn main() {
registry.register_with_checkpoint(
tenant, db, table,
&fingerprint,
&schema_json,
Some(checkpoint_bytes), // Optional: binlog position when schema was observed
).await?;
}
This correlation supports scenarios like:
- Replaying events from a specific checkpoint with the correct schema
- Determining which schema was active at a particular binlog position
- Rolling back schema state along with checkpoint rollback
Schema Loader
The MySqlSchemaLoader handles schema discovery and caching:
Pattern Expansion
Tables are specified using patterns that support wildcards:
| Pattern | Description |
|---|---|
db.table | Exact match |
db.* | All tables in database |
db.prefix% | Tables starting with prefix |
%.table | Table in any database |
Preloading
At startup, the loader expands patterns and preloads all matching schemas:
#![allow(unused)]
fn main() {
let schema_loader = MySqlSchemaLoader::new(dsn, registry, tenant);
let tracked_tables = schema_loader.preload(&["shop.orders", "shop.order_%"]).await?;
}
This ensures schemas are available before the first CDC event arrives.
Caching
Loaded schemas are cached to avoid repeated INFORMATION_SCHEMA queries:
#![allow(unused)]
fn main() {
// Fast path: return cached schema
if let Some(cached) = cache.get(&(db, table)) {
return Ok(cached.clone());
}
// Slow path: fetch from database, register, cache
let schema = fetch_schema(db, table).await?;
let version = registry.register(...).await?;
cache.insert((db, table), loaded_schema);
}
DDL Handling
When the binlog contains DDL events, the schema loader responds:
| DDL Type | Action |
|---|---|
CREATE TABLE | Schema loaded on first row event |
ALTER TABLE | Cache invalidated, reloaded on next row |
DROP TABLE | Cache entry removed |
TRUNCATE | No schema change |
RENAME TABLE | Old removed, new loaded on first row |
DDL detection uses the QueryEvent type in the binlog. On DDL, the entire database’s schema cache is invalidated since MySQL doesn’t always specify the exact table in DDL events.
API Endpoints
Reload Schemas
Force reload schemas from the database:
curl -X POST http://localhost:8080/pipelines/{name}/schemas/reload
This clears the cache and re-fetches schemas for all tracked tables.
List Cached Schemas
View currently cached schemas:
curl http://localhost:8080/pipelines/{name}/schemas
Limitations
- In-memory registry - Schema versions are lost on restart. Persistent backends (SQLite, then PostgreSQL for HA) are planned.
- No cross-pipeline sharing - Each pipeline maintains its own registry instance
- Pattern expansion at startup - New tables matching patterns require pipeline restart or reload
Best Practices
- Use explicit table patterns in production to avoid accidentally capturing unwanted tables
- Monitor schema reload times - slow reloads may indicate overly broad patterns
- Trigger schema reload after DDL if your deployment process modifies schemas
- Include schema version in downstream events for consumers that need schema evolution awareness
Troubleshooting
Unknown table_id Errors
WARN write_rows for unknown table_id, table_id=42
The binlog contains row events for a table not in the table_map. This happens when:
- A table was created after the CDC stream started
- Table patterns don’t match the table
Solution: Trigger a schema reload via the REST API.
Schema Fetch Returned 0 Columns
WARN schema fetch returned 0 columns, db=shop, table=orders
Usually indicates:
- Table doesn’t exist
- MySQL user lacks
SELECTprivilege onINFORMATION_SCHEMA - Table was dropped between detection and schema load
Slow Schema Loading
WARN slow schema fetch, db=shop, table=orders, ms=350
Consider:
- Narrowing table patterns to reduce the number of tables
- Using exact table names instead of wildcards
- Verifying network latency to the MySQL server
Schema Sensing
Schema sensing automatically infers and tracks schema structure from JSON event payloads. This complements the schema registry by discovering schema from data rather than database metadata.
When to Use Schema Sensing
Schema sensing is useful when:
- Source doesn’t provide schema: Some sources emit JSON without metadata
- JSON columns: Database JSON/JSONB columns have dynamic structure
- Schema evolution tracking: Detect when payload structure changes over time
- Downstream integration: Generate JSON Schema for consumers
How It Works
┌──────────────┐ ┌─────────────────┐ ┌──────────────────┐
│ Event │────▶│ Schema Sensor │────▶│ Inferred Schema │
│ Payload │ │ (sampling) │ │ + Fingerprint │
└──────────────┘ └─────────────────┘ └──────────────────┘
│
▼
┌─────────────────┐
│ Structure Cache │
└─────────────────┘
- Observation: Events flow through the sensor during batch processing
- Sampling: Not every event is fully analyzed (configurable rate)
- Deep inspection: Nested JSON structures are recursively analyzed
- Fingerprinting: Schema changes are detected via SHA-256 fingerprints
- Caching: Repeated structures skip full analysis for performance
Configuration
Enable schema sensing in your pipeline spec:
spec:
schema_sensing:
enabled: true
# Deep inspection for nested JSON
deep_inspect:
enabled: true
max_depth: 3
max_sample_size: 500
# Sampling configuration
sampling:
warmup_events: 50
sample_rate: 5
structure_cache: true
structure_cache_size: 50
# Output configuration
output:
include_stats: true
Configuration Options
Top Level
| Field | Type | Default | Description |
|---|---|---|---|
enabled | bool | false | Enable schema sensing |
Deep Inspection (deep_inspect)
| Field | Type | Default | Description |
|---|---|---|---|
enabled | bool | false | Enable deep inspection of nested objects |
max_depth | integer | 3 | Maximum nesting depth to analyze |
max_sample_size | integer | 500 | Max events to sample for deep analysis |
Sampling (sampling)
| Field | Type | Default | Description |
|---|---|---|---|
warmup_events | integer | 50 | Events to analyze before sampling kicks in |
sample_rate | integer | 5 | Analyze 1 in N events after warmup |
structure_cache | bool | true | Cache structure hashes for performance |
structure_cache_size | integer | 50 | Max cached structures per table |
Inferred Types
Schema sensing infers these JSON types:
| Type | Description |
|---|---|
null | JSON null value |
boolean | true/false |
integer | Whole numbers |
number | Floating point numbers |
string | Text values |
array | JSON arrays (element types tracked) |
object | Nested objects (fields recursively analyzed) |
For fields with varying types across events, all observed types are recorded.
Schema Evolution
When schema structure changes, the sensor:
- Detects change: Fingerprint differs from previous version
- Increments sequence: Monotonic version number increases
- Logs evolution: Emits structured log with old/new fingerprints
- Updates cache: New structure becomes current
Evolution events are available via the REST API and can trigger alerts.
Stabilization
After observing enough events, a schema “stabilizes”:
- Warmup phase completes
- Structure stops changing
- Sampling rate takes effect
- Cache hit rate increases
Stabilized schemas have stabilized: true in API responses.
API Access
List Inferred Schemas
curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas
Get Schema Details
curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders
Export as JSON Schema
curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders/json-schema
Cache Statistics
curl http://localhost:8080/pipelines/my-pipeline/sensing/stats
Drift Detection
Schema sensing integrates with drift detection to compare:
- Expected schema: From database metadata (schema registry)
- Observed schema: From event payloads (schema sensing)
When mismatches occur, drift events are recorded:
| Drift Type | Description |
|---|---|
unexpected_null | Non-nullable column has null values |
type_mismatch | Observed type differs from declared type |
undeclared_column | Field in data not in schema |
missing_column | Schema field never seen in data |
json_structure_change | JSON column structure changed |
Access drift data via:
curl http://localhost:8080/pipelines/my-pipeline/drift
Performance Considerations
Sampling Tradeoffs
| Setting | Effect |
|---|---|
Higher warmup_events | Better initial accuracy, slower stabilization |
Higher sample_rate | Lower CPU usage, slower evolution detection |
Larger structure_cache_size | More memory, better hit rate |
Recommended Settings
High-throughput pipelines (>10k events/sec):
sampling:
warmup_events: 100
sample_rate: 10
structure_cache: true
structure_cache_size: 100
Schema evolution monitoring:
sampling:
warmup_events: 25
sample_rate: 2
structure_cache: true
Development/debugging:
sampling:
warmup_events: 10
sample_rate: 1 # Analyze every event
Example: JSON Column Sensing
For tables with JSON columns, sensing reveals the internal structure:
# Database schema shows: metadata JSON
# Sensing reveals:
fields:
- name: "metadata.user_agent"
types: ["string"]
nullable: false
- name: "metadata.ip_address"
types: ["string"]
nullable: true
- name: "metadata.tags"
types: ["array"]
array_element_types: ["string"]
This enables downstream consumers to understand JSON column structure without manual documentation.
Metrics
Schema sensing emits these metrics:
| Metric | Type | Description |
|---|---|---|
deltaforge_schema_evolutions_total{pipeline} | Counter | Schema evolution events detected |
deltaforge_schema_drift_detected{pipeline} | Counter | Incremented when drift is detected in a batch |
deltaforge_stage_latency_seconds{pipeline,stage="schema_sensing"} | Histogram | Time spent in schema sensing per batch |
Cache statistics (hits, misses, hit rate) are available via the REST API at /pipelines/{name}/sensing/stats but are not currently exposed as Prometheus metrics.
Observability playbook
DeltaForge already ships a Prometheus exporter, structured logging, and a panic hook. The runtime now emits source ingress counters, batching/processor histograms, and sink latency/throughput so operators can build production dashboards immediately. The tables below capture what is wired today and the remaining gaps to make the platform production ready for data and infra engineers.
What exists today
- Prometheus endpoint served at
/metrics(default0.0.0.0:9000) with descriptors for pipeline counts, source/sink counters, and a stage latency histogram. The recorder is installed automatically when metrics are enabled. - Structured logging via
tracing_subscriberwith JSON output by default, optional targets, and support forRUST_LOGoverrides. - Panic hook increments a
deltaforge_panics_totalcounter and logs captured panics before delegating to the default hook.
Instrumentation gaps and recommendations
The sections below call out concrete metrics and log events to add per component. All metrics should include pipeline, tenant, and component identifiers where applicable so users can aggregate across fleets.
Sources (MySQL/Postgres)
| Status | Metric/log | Rationale |
|---|---|---|
| ✅ Implemented | deltaforge_source_events_total{pipeline,source,table} counter increments when MySQL events are handed to the coordinator. | Surfaces ingress per table and pipeline. |
| ✅ Implemented | deltaforge_source_reconnects_total{pipeline,source} counter when binlog reads reconnect. | Makes retry storms visible. |
| 🚧 Gap | deltaforge_source_lag_seconds{pipeline,source} gauge based on binlog/WAL position vs. server time. | Alert when sources fall behind. |
| 🚧 Gap | deltaforge_source_idle_seconds{pipeline,source} gauge updated when no events arrive within the inactivity window. | Catch stuck readers before downstream backlogs form. |
Coordinator and batching
| Status | Metric/log | Rationale |
|---|---|---|
| ✅ Implemented | deltaforge_batch_events{pipeline} and deltaforge_batch_bytes{pipeline} histograms in Coordinator::process_deliver_and_maybe_commit. | Tune batching policies with data. |
| ✅ Implemented | deltaforge_stage_latency_seconds{pipeline,stage,trigger} histogram for processor stage. | Provides batch timing per trigger (timer/limits/shutdown). |
| ✅ Implemented | deltaforge_processor_latency_seconds{pipeline,processor} histogram around every processor invocation. | Identify slow user functions. |
| 🚧 Gap | deltaforge_pipeline_channel_depth{pipeline} gauge from mpsc::Sender::capacity()/len(). | Detect backpressure between sources and coordinator. |
| 🚧 Gap | Checkpoint outcome counters/logs (deltaforge_checkpoint_success_total / _failure_total). | Alert on persistence regressions and correlate to data loss risk. |
Sinks (Kafka/Redis/custom)
| Status | Metric/log | Rationale |
|---|---|---|
| ✅ Implemented | deltaforge_sink_events_total{pipeline,sink} counter and deltaforge_sink_latency_seconds{pipeline,sink} histogram around each send. | Throughput and responsiveness per sink. |
| ✅ Implemented | deltaforge_sink_batch_total{pipeline,sink} counter for send. | Number of batches sent per sink. |
| 🚧 Gap | Error taxonomy in deltaforge_sink_failures_total (add kind/details). | Easier alerting on specific failure classes (auth, timeout, schema). |
| 🚧 Gap | Backpressure gauge for client buffers (rdkafka queue, Redis pipeline depth). | Early signal before errors occur. |
| 🚧 Gap | Drop/skip counters from processors/sinks. | Auditing and reconciliation. |
Control plane and health endpoints
| Need | Suggested metric/log | Rationale |
|---|---|---|
| API request accounting | deltaforge_api_requests_total{route,method,status} counter and latency histogram using Axum middleware. | Production-grade visibility of operator actions. |
| Ready/Liveness transitions | Logs with pipeline counts and per-pipeline status when readiness changes. | Explain probe failures in log aggregation. |
| Pipeline lifecycle | Counters for create/patch/stop actions with success/error labels; include tenant and caller metadata in logs. | Auditable control-plane operations. |
Examples
MySQL to Redis
This example streams MySQL binlog events into a Redis stream with an inline JavaScript transformation.
metadata:
name: orders-mysql-to-redis
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
processors:
- type: javascript
id: redact-email
inline: |
function processBatch(events) {
return events.map((event) => {
if (event.after && event.after.email) {
event.after.email = "[redacted]";
}
return event;
});
}
limits:
timeout_ms: 500
sinks:
- type: redis
config:
id: orders-redis
uri: ${REDIS_URI}
stream: orders
required: true
batch:
max_events: 500
max_bytes: 1048576
max_ms: 1000
commit_policy:
mode: required
Feel free to add a Kafka sink alongside Redis. Mark only the critical sink as required if you want checkpoints to proceed when optional sinks are unavailable.
Example: Turso to Kafka
This example demonstrates streaming changes from a Turso database to Kafka with schema sensing enabled.
Use Case
You have a Turso database (or local libSQL) and want to:
- Stream table changes to a Kafka topic
- Automatically detect schema structure from JSON payloads
- Transform events with JavaScript before publishing
Pipeline Configuration
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: turso2kafka
tenant: acme
spec:
source:
type: turso
config:
id: turso-main
# Local libSQL for development
url: "http://127.0.0.1:8080"
# For Turso cloud:
# url: "libsql://your-db.turso.io"
# auth_token: "${TURSO_AUTH_TOKEN}"
tables: ["users", "orders", "order_items"]
poll_interval_ms: 1000
cdc_mode: auto
processors:
- type: javascript
id: enrich
inline: |
function processBatch(events) {
return events.map(event => {
// Add custom metadata to events
event.source_type = "turso";
event.processed_at = new Date().toISOString();
return event;
});
}
sinks:
- type: kafka
config:
id: kafka-main
brokers: "${KAFKA_BROKERS}"
topic: turso.changes
required: true
exactly_once: false
client_conf:
message.timeout.ms: "5000"
acks: "all"
batch:
max_events: 100
max_bytes: 1048576
max_ms: 500
respect_source_tx: false
max_inflight: 2
commit_policy:
mode: required
schema_sensing:
enabled: true
deep_inspect:
enabled: true
max_depth: 3
sampling:
warmup_events: 50
sample_rate: 5
structure_cache: true
Running the Example
1. Start Infrastructure
# Start Kafka and other services
./dev.sh up
# Create the target topic
./dev.sh k-create turso.changes 6
2. Start Local libSQL (Optional)
For local development without Turso cloud:
# Using sqld (libSQL server)
sqld --http-listen-addr 127.0.0.1:8080
# Or with Docker
docker run -p 8080:8080 ghcr.io/libsql/sqld:latest
3. Create Test Tables
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE,
metadata TEXT -- JSON column
);
CREATE TABLE orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
total REAL NOT NULL,
status TEXT DEFAULT 'pending',
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
4. Run DeltaForge
# Save config as turso-kafka.yaml
cargo run -p runner -- --config turso-kafka.yaml
5. Insert Test Data
INSERT INTO users (name, email, metadata)
VALUES ('Alice', 'alice@example.com', '{"role": "admin", "tags": ["vip"]}');
INSERT INTO orders (user_id, total, status)
VALUES (1, 99.99, 'completed');
6. Verify Events in Kafka
./dev.sh k-consume turso.changes --from-beginning
You should see events like:
{
"event_id": "550e8400-e29b-41d4-a716-446655440000",
"tenant_id": "acme",
"table": "users",
"op": "insert",
"after": {
"id": 1,
"name": "Alice",
"email": "alice@example.com",
"metadata": "{\"role\": \"admin\", \"tags\": [\"vip\"]}"
},
"source_type": "turso",
"processed_at": "2025-01-15T10:30:00.000Z",
"timestamp": "2025-01-15T10:30:00.123Z"
}
Monitoring
Check Pipeline Status
curl http://localhost:8080/pipelines/turso2kafka
View Inferred Schemas
# List all inferred schemas
curl http://localhost:8080/pipelines/turso2kafka/sensing/schemas
# Get details for users table
curl http://localhost:8080/pipelines/turso2kafka/sensing/schemas/users
# Export as JSON Schema
curl http://localhost:8080/pipelines/turso2kafka/sensing/schemas/users/json-schema
Check Drift Detection
curl http://localhost:8080/pipelines/turso2kafka/drift
Turso Cloud Configuration
For production with Turso cloud:
source:
type: turso
config:
id: turso-prod
url: "libsql://mydb-myorg.turso.io"
auth_token: "${TURSO_AUTH_TOKEN}"
tables: ["*"]
cdc_mode: native
poll_interval_ms: 1000
native_cdc:
level: data
Set the auth token via environment variable:
export TURSO_AUTH_TOKEN="your-token-here"
Notes
- CDC Mode:
autotries native CDC first, then falls back to triggers or polling - Poll Interval: Lower values reduce latency but increase database load
- Schema Sensing: Automatically discovers JSON structure in text columns
- Exactly Once: Set to
falsefor higher throughput; usetrueif Kafka cluster supports EOS
Troubleshooting
Common issues and quick checks when running DeltaForge.
- 🩺 Health-first: start with
/healthzand/readyzto pinpoint failing components.
Runner fails to start
- Confirm the config path passed to
--configexists and is readable. - Validate YAML syntax and that required fields like
metadata.nameandspec.sourceare present. - Ensure environment variables referenced in the spec are set (
dsn,brokers,uri, etc.).
Pipelines remain unready
- Check the
/readyzendpoint for per-pipeline status and error messages. - Verify upstream credentials allow replication (MySQL binlog). Other engines are experimental unless explicitly documented.
- Inspect sink connectivity; a required sink that cannot connect will block checkpoints.
Slow throughput
- Increase
batch.max_eventsorbatch.max_bytesto reduce flush frequency. - Adjust
max_inflightto allow more concurrent batches if sinks can handle parallelism. - Reduce processor work or add guardrails (
limits) to prevent slow JavaScript from stalling the pipeline.
Checkpoints not advancing
- Review the commit policy:
mode: allorrequiredsinks that are unavailable will block progress. - Look for sink-specific errors (for example, Kafka broker unreachability or Redis backpressure).
- Pause and resume the pipeline to force a clean restart after addressing the underlying issue.
Development Guide
Use this guide to build, test, and extend DeltaForge. It covers local workflows, optional dependency containers, and how to work with Docker images.
All contributions are welcome and highly appreciated.
Local prerequisites
- Rust toolchain 1.89+ (install via
rustup). - Optional: Docker or Podman for running the dev dependency stack and the container image.
Workspace layout
crates/deltaforge-core: shared event model, pipeline engine, and checkpointing primitives.crates/sources: database CDC readers (MySQL binlog, Postgres logical replication) implemented as pluggable sources.crates/processors: JavaScript-based processors and support code for transforming batches.crates/sinks: sink implementations (Kafka producer, Redis streams) plus sink utilities.crates/rest-api: HTTP control plane with health/readiness and pipeline lifecycle endpoints.crates/runner: CLI entrypoint that wires the runtime, metrics, and control plane together.
Use these crate boundaries as reference points when adding new sources, sinks, or pipeline behaviors.
Start dev dependencies
Bring up the optional backing services (MySQL, Kafka, Redis) with Docker Compose:
docker compose -f docker-compose.dev.yml up -d
Each service is exposed on localhost for local runs (5432, 3306, 9092, 6379). The MySQL container seeds demo data from ./init-scripts and configures binlog settings required for CDC.
Prefer the convenience dev.sh wrapper to keep common tasks consistent:
./dev.sh up # start the dependency stack
./dev.sh down # stop and remove it
./dev.sh ps # see container status
Build and test locally
Run the usual Rust workflow from the repo root:
cargo fmt --all
cargo clippy --workspace --all-targets --all-features
cargo test --workspace
Or use the helper script for a single command that mirrors CI expectations:
./dev.sh build # build project (debug)
./dev.sh build-release # build project (release)
./dev.sh run # run with examples/dev.yaml
./dev.sh fmt # format code
./dev.sh lint # clippy with warnings as errors
./dev.sh test # full test suite
./dev.sh check # fmt --check + clippy + tests (mirrors CI)
./dev.sh cov # generate coverage report
Docker images
Use pre-built images
Multi-arch images (amd64/arm64) are published to GHCR and Docker Hub:
# Minimal (~57MB, scratch-based, no shell)
docker pull ghcr.io/vnvo/deltaforge:latest
docker pull vnvohub/deltaforge:latest
# Debug (~140MB, includes shell for troubleshooting)
docker pull ghcr.io/vnvo/deltaforge:latest-debug
docker pull vnvohub/deltaforge:latest-debug
| Variant | Size | Base | Use case |
|---|---|---|---|
latest | ~57MB | scratch | Production |
latest-debug | ~140MB | debian-slim | Troubleshooting, has shell |
Build locally
Two Dockerfiles are provided:
# Minimal image (~57MB)
docker build -t deltaforge:local .
# Debug image (~140MB, includes shell)
docker build -t deltaforge:local-debug -f Dockerfile.debug .
Or use the dev helper:
./dev.sh docker # build minimal image
./dev.sh docker-debug # build debug image
./dev.sh docker-test # test minimal image runs
./dev.sh docker-test-debug # test debug image runs
./dev.sh docker-all # build and test all variants
./dev.sh docker-shell # open shell in debug container
Build multi-arch locally
To build for both amd64 and arm64:
./dev.sh docker-multi-setup # create buildx builder (once)
./dev.sh docker-multi # build both architectures
Note: Multi-arch builds use QEMU emulation and take ~30-35 minutes. The images are not loaded locally - use --push to push to a registry.
Run the image
Run the container by mounting your pipeline specs and exposing the API and metrics ports:
docker run --rm \
-p 8080:8080 -p 9000:9000 \
-v $(pwd)/examples/dev.yaml:/etc/deltaforge/pipeline.yaml:ro \
-v deltaforge-checkpoints:/app/data \
ghcr.io/vnvo/deltaforge:latest \
--config /etc/deltaforge/pipeline.yaml
Notes:
- The container listens on
0.0.0.0:8080for the control plane API with metrics on:9000. - Checkpoints are written to
/app/data/df_checkpoints.json; mount a volume to persist them across restarts. - Environment variables inside the YAML are expanded before parsing.
- Pass any other runner flags as needed (e.g.,
--api-addror--metrics-addr).
Debug a running container
Use the debug image to troubleshoot:
# Run with shell access
docker run --rm -it --entrypoint /bin/bash ghcr.io/vnvo/deltaforge:latest-debug
# Exec into a running container
docker exec -it <container_id> /bin/bash
Dev helper commands
The dev.sh script provides shortcuts for common tasks:
./dev.sh help # show all commands
Infrastructure
./dev.sh up # start MySQL, Kafka, Redis
./dev.sh down # stop and remove containers
./dev.sh ps # list running services
Kafka
./dev.sh k-list # list topics
./dev.sh k-create <topic> # create topic
./dev.sh k-consume <topic> --from-beginning
./dev.sh k-produce <topic> # interactive producer
Redis
./dev.sh redis-cli # open redis-cli
./dev.sh redis-read <stream> # read from stream
Database shells
./dev.sh pg-sh # psql into Postgres
./dev.sh mysql-sh # mysql into MySQL
Documentation
./dev.sh docs # serve docs locally (opens browser)
./dev.sh docs-build # build docs
Pre-release checks
./dev.sh release-check # run all checks + build all Docker variants
Contributing
- Fork the repository
- Create a branch from
main(e.g.,feature/new-sink,fix/checkpoint-bug) - Make your changes
- Run
./dev.sh checkto ensure CI will pass - Submit a PR against
main
Things to Remember
Tests
There a few #[ignore] tests, run them when making deep changes to the sources, pipeline coordination and anything with impact on core functionality.
Logging hygiene
- Include
pipeline,tenant,source_id/sink_id, andbatch_idfields on all warnings/errors to make traces joinable in log aggregation tools. - Normalize retry/backoff logs so they include the attempt count and sleep duration; consider a structured
reasonfield alongside error details for dashboards. - Add info-level summaries on interval (e.g., every N batches) reporting batches processed, average batch size, lag, and sink latency percentiles pulled from the metrics registry to create human-friendly breadcrumbs.
- Add metrics in a backward-compatible way: prefer new metric names over redefining existing ones to avoid breaking dashboards. Validate cardinality (bounded label sets) before merging.
- Gate noisy logs behind levels (
debugfor per-event traces,infofor batch summaries,warn/errorfor retries and failures). - Exercise the new metrics in integration tests by asserting counters change when sending synthetic events through pipelines.