Pipelines are defined as YAML documents that map directly to the PipelineSpec type. Environment variables are expanded before parsing using ${VAR} syntax, so secrets and connection strings can be injected at runtime. Unknown variables (e.g. ${source.table}) pass through for use as routing templates .
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: <pipeline-name>
tenant: <tenant-id>
spec:
source: { ... }
processors: [ ... ]
sinks: [ ... ]
batch: { ... }
commit_policy: { ... }
schema_sensing: { ... }
Field Type Required Description
namestring Yes Unique pipeline identifier. Used in API routes and metrics.
tenantstring Yes Business-oriented tenant label for multi-tenancy.
Field Type Required Description
sourceobject Yes Database source configuration. See Sources .
processorsarray No Ordered list of processors. See Processors .
sinksarray Yes (at least one) One or more sinks that receive each batch. See Sinks .
shardingobject No Optional hint for downstream distribution.
connection_policyobject No How the runtime establishes upstream connections.
batchobject No Commit unit thresholds. See Batching .
commit_policyobject No How sink acknowledgements gate checkpoints. See Commit policy .
schema_sensingobject No Automatic schema inference from event payloads. See Schema sensing .
Captures row-level changes via binlog replication. See MySQL source documentation for prerequisites and detailed configuration.
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.order_items
outbox:
tables: ["shop.outbox"]
snapshot:
mode: initial
Field Type Description
idstring Unique identifier for checkpoints and metrics
dsnstring MySQL connection string with replication privileges
tablesarray Table patterns to capture; omit for all tables
outbox.tablesarray Table patterns to tag as outbox events. Must also appear in tables. Supports globs: shop.outbox, *.outbox, shop.outbox_%.
snapshot.modestring never (default), initial - run once if no checkpoint exists, always - re-snapshot on every restart
snapshot.max_parallel_tablesint Tables snapshotted concurrently (default: 8)
snapshot.chunk_sizeint Rows per range chunk for integer-PK tables (default: 10000)
Table patterns support SQL LIKE syntax:
db.table - exact match
db.prefix% - tables matching prefix
db.% - all tables in database
Captures row-level changes via logical replication. See PostgreSQL source documentation for prerequisites and detailed configuration.
source:
type: postgres
config:
id: users-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_users
publication: users_pub
tables:
- public.users
- public.sessions
start_position: earliest
outbox:
prefixes: ["outbox", "order_outbox_%"]
snapshot:
mode: initial
Field Type Description
idstring Unique identifier
dsnstring PostgreSQL connection string
slotstring Replication slot name
publicationstring Publication name
tablesarray Table patterns to capture
start_positionstring earliest, latest, or lsn
outbox.prefixesarray pg_logical_emit_message prefixes to tag as outbox events. Supports globs: outbox, outbox_%, *.
snapshot.modestring never (default), initial - run once if no checkpoint exists, always - re-snapshot on every restart
snapshot.max_parallel_tablesint Tables snapshotted concurrently (default: 8)
snapshot.chunk_sizeint Rows per range chunk (default: 10000)
Note: The source-level outbox field only tags matching events with the __outbox sentinel. Routing and transformation are handled by the outbox processor .
Processors transform events between source and sinks. They run in order and can filter, enrich, or modify events.
processors:
- type: javascript
id: transform
inline: |
function processBatch(events) {
return events.map(e => {
e.tags = ["processed"];
return e;
});
}
limits:
cpu_ms: 50
mem_mb: 128
timeout_ms: 500
Field Type Description
idstring Processor identifier
inlinestring JavaScript code
limits.cpu_msint CPU time limit
limits.mem_mbint Memory limit
limits.timeout_msint Execution timeout
processors:
- type: flatten
id: flat
separator: "__"
max_depth: 3
on_collision: last
empty_object: preserve
lists: preserve
empty_list: drop
Field Type Default Description
idstring "flatten"Processor identifier
separatorstring "__"Separator between path segments
max_depthint unlimited Stop recursing at this depth; objects at the boundary kept as-is
on_collisionstring lastKey collision policy: last, first, or error
empty_objectstring preserveEmpty object policy: preserve, drop, or null
listsstring preserveArray policy: preserve or index
empty_liststring preserveEmpty array policy: preserve, drop, or null
Transforms raw outbox events into routed, sink-ready events. Requires the source to have outbox configured so events are tagged before reaching this processor. See Outbox pattern documentation for full details.
processors:
- type: outbox
id: outbox
topic: "${aggregate_type}.${event_type}"
default_topic: "events.unrouted"
raw_payload: true
columns:
payload: data
additional_headers:
x-trace-id: trace_id
Field Type Default Description
idstring "outbox"Processor identifier
tablesarray []Filter: only process outbox events matching these patterns. Empty = all outbox events.
topicstring — Topic template resolved against the raw payload using ${field} placeholders
default_topicstring — Fallback topic when template resolution fails and no topic column exists
keystring — Key template resolved against raw payload. Default: aggregate_id value.
raw_payloadbool falseDeliver the extracted payload as-is to sinks, bypassing envelope wrapping
strictbool falseFail the batch if required fields are missing rather than silently falling back
columns.payloadstring payloadColumn containing the event payload
columns.aggregate_typestring aggregate_typeColumn for aggregate type
columns.aggregate_idstring aggregate_idColumn for aggregate ID
columns.event_typestring event_typeColumn for event type
columns.topicstring topicColumn for pre-computed topic override
columns.event_idstring idColumn extracted as df-event-id header
additional_headersmap {}Forward extra payload fields as routing headers: header-name: column-name
Sinks deliver events to downstream systems. Each sink supports configurable envelope formats and wire encodings to match consumer expectations. See the Sinks documentation for detailed information on multi-sink patterns, commit policies, and failure handling.
All sinks support these serialization options:
Field Type Default Description
envelopeobject nativeOutput structure format. See Envelopes .
encodingstring jsonWire encoding format
Envelope types:
native - Direct Debezium payload structure (default, most efficient)
debezium - Full {"payload": ...} wrapper
cloudevents - CloudEvents 1.0 specification (requires type_prefix)
# Native (default)
envelope:
type: native
# Debezium wrapper
envelope:
type: debezium
# CloudEvents
envelope:
type: cloudevents
type_prefix: "com.example.cdc"
See Kafka sink documentation for detailed configuration options and best practices.
sinks:
- type: kafka
config:
id: orders-kafka
brokers: ${KAFKA_BROKERS}
topic: orders
envelope:
type: debezium
encoding: json
required: true
exactly_once: false
send_timeout_secs: 30
client_conf:
security.protocol: SASL_SSL
Field Type Default Description
idstring - Sink identifier
brokersstring - Kafka broker addresses
topicstring - Target topic or template
keystring - Message key template
envelopeobject nativeOutput format
encodingstring jsonWire encoding
requiredbool trueGates checkpoints
exactly_oncebool falseTransactional mode
send_timeout_secsint 30Send timeout
client_confmap - librdkafka overrides
CloudEvents example:
sinks:
- type: kafka
config:
id: events-kafka
brokers: ${KAFKA_BROKERS}
topic: events
envelope:
type: cloudevents
type_prefix: "com.acme.cdc"
encoding: json
See Redis sink documentation for detailed configuration options and best practices.
sinks:
- type: redis
config:
id: orders-redis
uri: ${REDIS_URI}
stream: orders
envelope:
type: native
encoding: json
required: true
Field Type Default Description
idstring - Sink identifier
uristring - Redis connection URI
streamstring - Redis stream key or template
keystring - Entry key template
envelopeobject nativeOutput format
encodingstring jsonWire encoding
requiredbool trueGates checkpoints
send_timeout_secsint 5XADD timeout
batch_timeout_secsint 30Pipeline timeout
connect_timeout_secsint 10Connection timeout
See NATS sink documentation for detailed configuration options and best practices.
sinks:
- type: nats
config:
id: orders-nats
url: ${NATS_URL}
subject: orders.events
stream: ORDERS
envelope:
type: native
encoding: json
required: true
send_timeout_secs: 5
batch_timeout_secs: 30
Field Type Default Description
idstring - Sink identifier
urlstring - NATS server URL
subjectstring - Subject or template
keystring - Message key template
streamstring - JetStream stream name
envelopeobject nativeOutput format
encodingstring jsonWire encoding
requiredbool trueGates checkpoints
send_timeout_secsint 5Publish timeout
batch_timeout_secsint 30Batch timeout
connect_timeout_secsint 10Connection timeout
credentials_filestring - NATS credentials file
usernamestring - Auth username
passwordstring - Auth password
tokenstring - Auth token
batch:
max_events: 500
max_bytes: 1048576
max_ms: 1000
respect_source_tx: true
max_inflight: 2
Field Type Default Description
max_eventsint 500Flush after this many events
max_bytesint 1048576Flush after size reaches limit
max_msint 1000Flush after time (ms)
respect_source_txbool trueNever split source transactions
max_inflightint 2Max concurrent batches
commit_policy:
mode: required
# For quorum mode:
commit_policy:
mode: quorum
quorum: 2
Mode Description
allEvery sink must acknowledge before checkpoint
requiredOnly required: true sinks must acknowledge (default)
quorumCheckpoint after quorum sinks acknowledge
Schema sensing automatically infers and tracks schema from event payloads. See the Schema Sensing documentation for detailed information on how it works, drift detection, and API endpoints.
Performance tip : Schema sensing can be CPU-intensive, especially with deep JSON inspection. Consider your throughput requirements when configuring:
Set enabled: false if you don’t need runtime schema inference
Limit deep_inspect.max_depth to avoid traversing deeply nested structures
Increase sampling.sample_rate to analyze fewer events (e.g., 1 in 100 instead of 1 in 10)
Reduce sampling.warmup_events if you’re confident in schema stability
schema_sensing:
enabled: true
deep_inspect:
enabled: true
max_depth: 3
max_sample_size: 500
sampling:
warmup_events: 50
sample_rate: 5
structure_cache: true
structure_cache_size: 50
high_cardinality:
enabled: true
min_events: 100
stable_threshold: 0.5
min_dynamic_fields: 5
Field Type Default Description
enabledbool falseEnable schema sensing
deep_inspect.enabledbool trueInspect nested JSON
deep_inspect.max_depthint 10Max nesting depth
deep_inspect.max_sample_sizeint 1000Max events for deep analysis
sampling.warmup_eventsint 1000Events to fully analyze first
sampling.sample_rateint 10After warmup, analyze 1 in N
sampling.structure_cachebool trueCache structure fingerprints
sampling.structure_cache_sizeint 100Max cached structures
high_cardinality.enabledbool trueDetect dynamic map keys
high_cardinality.min_eventsint 100Events before classification
high_cardinality.stable_thresholdfloat 0.5Frequency for stable fields
high_cardinality.min_dynamic_fieldsint 5Min unique fields for map
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-mysql-to-kafka
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
processors:
- type: javascript
id: transform
inline: |
function processBatch(events) {
return events.map(event => {
event.tags = (event.tags || []).concat(["normalized"]);
return event;
});
}
limits:
cpu_ms: 50
mem_mb: 128
timeout_ms: 500
sinks:
- type: kafka
config:
id: orders-kafka
brokers: ${KAFKA_BROKERS}
topic: orders
envelope:
type: debezium
encoding: json
required: true
exactly_once: false
client_conf:
message.timeout.ms: "5000"
batch:
max_events: 500
max_bytes: 1048576
max_ms: 1000
respect_source_tx: true
max_inflight: 2
commit_policy:
mode: required
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: users-postgres-to-kafka
tenant: acme
spec:
source:
type: postgres
config:
id: users-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_users
publication: users_pub
tables:
- public.users
- public.user_sessions
start_position: earliest
sinks:
- type: kafka
config:
id: users-kafka
brokers: ${KAFKA_BROKERS}
topic: user-events
envelope:
type: cloudevents
type_prefix: "com.acme.users"
encoding: json
required: true
batch:
max_events: 500
max_ms: 1000
respect_source_tx: true
commit_policy:
mode: required
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-multi-sink
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
sinks:
# Kafka Connect expects Debezium format
- type: kafka
config:
id: connect-sink
brokers: ${KAFKA_BROKERS}
topic: connect-events
envelope:
type: debezium
required: true
# Lambda expects CloudEvents
- type: kafka
config:
id: lambda-sink
brokers: ${KAFKA_BROKERS}
topic: lambda-events
envelope:
type: cloudevents
type_prefix: "com.acme.cdc"
required: false
# Redis cache uses native format
- type: redis
config:
id: cache-redis
uri: ${REDIS_URI}
stream: orders-cache
envelope:
type: native
required: false
batch:
max_events: 500
max_ms: 1000
respect_source_tx: true
commit_policy:
mode: required
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-mysql-to-nats
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.order_items
sinks:
- type: nats
config:
id: orders-nats
url: ${NATS_URL}
subject: orders.events
stream: ORDERS
envelope:
type: native
encoding: json
required: true
batch:
max_events: 500
max_ms: 1000
respect_source_tx: true
commit_policy:
mode: required