Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Configuration

Pipelines are defined as YAML documents that map directly to the PipelineSpec type. Environment variables are expanded before parsing using ${VAR} syntax, so secrets and connection strings can be injected at runtime. Unknown variables (e.g. ${source.table}) pass through for use as routing templates.

Document structure

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: <pipeline-name>
  tenant: <tenant-id>
spec:
  source: { ... }
  processors: [ ... ]
  sinks: [ ... ]
  batch: { ... }
  commit_policy: { ... }
  schema_sensing: { ... }

Metadata

FieldTypeRequiredDescription
namestringYesUnique pipeline identifier. Used in API routes and metrics.
tenantstringYesBusiness-oriented tenant label for multi-tenancy.

Spec fields

FieldTypeRequiredDescription
sourceobjectYesDatabase source configuration. See Sources.
processorsarrayNoOrdered list of processors. See Processors.
sinksarrayYes (at least one)One or more sinks that receive each batch. See Sinks.
shardingobjectNoOptional hint for downstream distribution.
connection_policyobjectNoHow the runtime establishes upstream connections.
batchobjectNoCommit unit thresholds. See Batching.
commit_policyobjectNoHow sink acknowledgements gate checkpoints. See Commit policy.
schema_sensingobjectNoAutomatic schema inference from event payloads. See Schema sensing.

Sources

MySQL

Captures row-level changes via binlog replication. See MySQL source documentation for prerequisites and detailed configuration.

source:
  type: mysql
  config:
    id: orders-mysql
    dsn: ${MYSQL_DSN}
    tables:
      - shop.orders
      - shop.order_items
    outbox:
      tables: ["shop.outbox"]
    snapshot:
      mode: initial
FieldTypeDescription
idstringUnique identifier for checkpoints and metrics
dsnstringMySQL connection string with replication privileges
tablesarrayTable patterns to capture; omit for all tables
outbox.tablesarrayTable patterns to tag as outbox events. Must also appear in tables. Supports globs: shop.outbox, *.outbox, shop.outbox_%.
snapshot.modestringnever (default), initial - run once if no checkpoint exists, always - re-snapshot on every restart
snapshot.max_parallel_tablesintTables snapshotted concurrently (default: 8)
snapshot.chunk_sizeintRows per range chunk for integer-PK tables (default: 10000)

Table patterns support SQL LIKE syntax:

  • db.table - exact match
  • db.prefix% - tables matching prefix
  • db.% - all tables in database

PostgreSQL

Captures row-level changes via logical replication. See PostgreSQL source documentation for prerequisites and detailed configuration.

source:
  type: postgres
  config:
    id: users-postgres
    dsn: ${POSTGRES_DSN}
    slot: deltaforge_users
    publication: users_pub
    tables:
      - public.users
      - public.sessions
    start_position: earliest
    outbox:
      prefixes: ["outbox", "order_outbox_%"]
    snapshot:
      mode: initial
FieldTypeDescription
idstringUnique identifier
dsnstringPostgreSQL connection string
slotstringReplication slot name
publicationstringPublication name
tablesarrayTable patterns to capture
start_positionstringearliest, latest, or lsn
outbox.prefixesarraypg_logical_emit_message prefixes to tag as outbox events. Supports globs: outbox, outbox_%, *.
snapshot.modestringnever (default), initial - run once if no checkpoint exists, always - re-snapshot on every restart
snapshot.max_parallel_tablesintTables snapshotted concurrently (default: 8)
snapshot.chunk_sizeintRows per range chunk (default: 10000)

Note: The source-level outbox field only tags matching events with the __outbox sentinel. Routing and transformation are handled by the outbox processor.


Processors

Processors transform events between source and sinks. They run in order and can filter, enrich, or modify events.

JavaScript

processors:
  - type: javascript
    id: transform
    inline: |
      function processBatch(events) {
        return events.map(e => {
          e.tags = ["processed"];
          return e;
        });
      }
    limits:
      cpu_ms: 50
      mem_mb: 128
      timeout_ms: 500
FieldTypeDescription
idstringProcessor identifier
inlinestringJavaScript code
limits.cpu_msintCPU time limit
limits.mem_mbintMemory limit
limits.timeout_msintExecution timeout

Flatten

processors:
  - type: flatten
    id: flat
    separator: "__"
    max_depth: 3
    on_collision: last
    empty_object: preserve
    lists: preserve
    empty_list: drop
FieldTypeDefaultDescription
idstring"flatten"Processor identifier
separatorstring"__"Separator between path segments
max_depthintunlimitedStop recursing at this depth; objects at the boundary kept as-is
on_collisionstringlastKey collision policy: last, first, or error
empty_objectstringpreserveEmpty object policy: preserve, drop, or null
listsstringpreserveArray policy: preserve or index
empty_liststringpreserveEmpty array policy: preserve, drop, or null

Outbox

Transforms raw outbox events into routed, sink-ready events. Requires the source to have outbox configured so events are tagged before reaching this processor. See Outbox pattern documentation for full details.

processors:
  - type: outbox
    id: outbox
    topic: "${aggregate_type}.${event_type}"
    default_topic: "events.unrouted"
    raw_payload: true
    columns:
      payload: data
    additional_headers:
      x-trace-id: trace_id
FieldTypeDefaultDescription
idstring"outbox"Processor identifier
tablesarray[]Filter: only process outbox events matching these patterns. Empty = all outbox events.
topicstringTopic template resolved against the raw payload using ${field} placeholders
default_topicstringFallback topic when template resolution fails and no topic column exists
keystringKey template resolved against raw payload. Default: aggregate_id value.
raw_payloadboolfalseDeliver the extracted payload as-is to sinks, bypassing envelope wrapping
strictboolfalseFail the batch if required fields are missing rather than silently falling back
columns.payloadstringpayloadColumn containing the event payload
columns.aggregate_typestringaggregate_typeColumn for aggregate type
columns.aggregate_idstringaggregate_idColumn for aggregate ID
columns.event_typestringevent_typeColumn for event type
columns.topicstringtopicColumn for pre-computed topic override
columns.event_idstringidColumn extracted as df-event-id header
additional_headersmap{}Forward extra payload fields as routing headers: header-name: column-name

Sinks

Sinks deliver events to downstream systems. Each sink supports configurable envelope formats and wire encodings to match consumer expectations. See the Sinks documentation for detailed information on multi-sink patterns, commit policies, and failure handling.

Envelope and encoding

All sinks support these serialization options:

FieldTypeDefaultDescription
envelopeobjectnativeOutput structure format. See Envelopes.
encodingstringjsonWire encoding format

Envelope types:

  • native - Direct Debezium payload structure (default, most efficient)
  • debezium - Full {"payload": ...} wrapper
  • cloudevents - CloudEvents 1.0 specification (requires type_prefix)
# Native (default)
envelope:
  type: native

# Debezium wrapper
envelope:
  type: debezium

# CloudEvents
envelope:
  type: cloudevents
  type_prefix: "com.example.cdc"

Kafka

See Kafka sink documentation for detailed configuration options and best practices.

sinks:
  - type: kafka
    config:
      id: orders-kafka
      brokers: ${KAFKA_BROKERS}
      topic: orders
      envelope:
        type: debezium
      encoding: json
      required: true
      exactly_once: false
      send_timeout_secs: 30
      client_conf:
        security.protocol: SASL_SSL
FieldTypeDefaultDescription
idstring-Sink identifier
brokersstring-Kafka broker addresses
topicstring-Target topic or template
keystring-Message key template
envelopeobjectnativeOutput format
encodingstringjsonWire encoding
requiredbooltrueGates checkpoints
exactly_onceboolfalseTransactional mode
send_timeout_secsint30Send timeout
client_confmap-librdkafka overrides

CloudEvents example:

sinks:
  - type: kafka
    config:
      id: events-kafka
      brokers: ${KAFKA_BROKERS}
      topic: events
      envelope:
        type: cloudevents
        type_prefix: "com.acme.cdc"
      encoding: json

Redis

See Redis sink documentation for detailed configuration options and best practices.

sinks:
  - type: redis
    config:
      id: orders-redis
      uri: ${REDIS_URI}
      stream: orders
      envelope:
        type: native
      encoding: json
      required: true
FieldTypeDefaultDescription
idstring-Sink identifier
uristring-Redis connection URI
streamstring-Redis stream key or template
keystring-Entry key template
envelopeobjectnativeOutput format
encodingstringjsonWire encoding
requiredbooltrueGates checkpoints
send_timeout_secsint5XADD timeout
batch_timeout_secsint30Pipeline timeout
connect_timeout_secsint10Connection timeout

NATS

See NATS sink documentation for detailed configuration options and best practices.

sinks:
  - type: nats
    config:
      id: orders-nats
      url: ${NATS_URL}
      subject: orders.events
      stream: ORDERS
      envelope:
        type: native
      encoding: json
      required: true
      send_timeout_secs: 5
      batch_timeout_secs: 30
FieldTypeDefaultDescription
idstring-Sink identifier
urlstring-NATS server URL
subjectstring-Subject or template
keystring-Message key template
streamstring-JetStream stream name
envelopeobjectnativeOutput format
encodingstringjsonWire encoding
requiredbooltrueGates checkpoints
send_timeout_secsint5Publish timeout
batch_timeout_secsint30Batch timeout
connect_timeout_secsint10Connection timeout
credentials_filestring-NATS credentials file
usernamestring-Auth username
passwordstring-Auth password
tokenstring-Auth token

Batching

batch:
  max_events: 500
  max_bytes: 1048576
  max_ms: 1000
  respect_source_tx: true
  max_inflight: 2
FieldTypeDefaultDescription
max_eventsint500Flush after this many events
max_bytesint1048576Flush after size reaches limit
max_msint1000Flush after time (ms)
respect_source_txbooltrueNever split source transactions
max_inflightint2Max concurrent batches

Commit policy

commit_policy:
  mode: required

# For quorum mode:
commit_policy:
  mode: quorum
  quorum: 2
ModeDescription
allEvery sink must acknowledge before checkpoint
requiredOnly required: true sinks must acknowledge (default)
quorumCheckpoint after quorum sinks acknowledge

Schema sensing

Schema sensing automatically infers and tracks schema from event payloads. See the Schema Sensing documentation for detailed information on how it works, drift detection, and API endpoints.

Performance tip: Schema sensing can be CPU-intensive, especially with deep JSON inspection. Consider your throughput requirements when configuring:

  • Set enabled: false if you don’t need runtime schema inference
  • Limit deep_inspect.max_depth to avoid traversing deeply nested structures
  • Increase sampling.sample_rate to analyze fewer events (e.g., 1 in 100 instead of 1 in 10)
  • Reduce sampling.warmup_events if you’re confident in schema stability
schema_sensing:
  enabled: true
  deep_inspect:
    enabled: true
    max_depth: 3
    max_sample_size: 500
  sampling:
    warmup_events: 50
    sample_rate: 5
    structure_cache: true
    structure_cache_size: 50
  high_cardinality:
    enabled: true
    min_events: 100
    stable_threshold: 0.5
    min_dynamic_fields: 5
FieldTypeDefaultDescription
enabledboolfalseEnable schema sensing
deep_inspect.enabledbooltrueInspect nested JSON
deep_inspect.max_depthint10Max nesting depth
deep_inspect.max_sample_sizeint1000Max events for deep analysis
sampling.warmup_eventsint1000Events to fully analyze first
sampling.sample_rateint10After warmup, analyze 1 in N
sampling.structure_cachebooltrueCache structure fingerprints
sampling.structure_cache_sizeint100Max cached structures
high_cardinality.enabledbooltrueDetect dynamic map keys
high_cardinality.min_eventsint100Events before classification
high_cardinality.stable_thresholdfloat0.5Frequency for stable fields
high_cardinality.min_dynamic_fieldsint5Min unique fields for map

Complete examples

MySQL to Kafka with Debezium envelope

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-mysql-to-kafka
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders

  processors:
    - type: javascript
      id: transform
      inline: |
        function processBatch(events) {
          return events.map(event => {
            event.tags = (event.tags || []).concat(["normalized"]);
            return event;
          });
        }
      limits:
        cpu_ms: 50
        mem_mb: 128
        timeout_ms: 500

  sinks:
    - type: kafka
      config:
        id: orders-kafka
        brokers: ${KAFKA_BROKERS}
        topic: orders
        envelope:
          type: debezium
        encoding: json
        required: true
        exactly_once: false
        client_conf:
          message.timeout.ms: "5000"

  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 1000
    respect_source_tx: true
    max_inflight: 2

  commit_policy:
    mode: required

PostgreSQL to Kafka with CloudEvents

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: users-postgres-to-kafka
  tenant: acme

spec:
  source:
    type: postgres
    config:
      id: users-postgres
      dsn: ${POSTGRES_DSN}
      slot: deltaforge_users
      publication: users_pub
      tables:
        - public.users
        - public.user_sessions
      start_position: earliest

  sinks:
    - type: kafka
      config:
        id: users-kafka
        brokers: ${KAFKA_BROKERS}
        topic: user-events
        envelope:
          type: cloudevents
          type_prefix: "com.acme.users"
        encoding: json
        required: true

  batch:
    max_events: 500
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: required

Multi-sink with different formats

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-multi-sink
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders

  sinks:
    # Kafka Connect expects Debezium format
    - type: kafka
      config:
        id: connect-sink
        brokers: ${KAFKA_BROKERS}
        topic: connect-events
        envelope:
          type: debezium
        required: true

    # Lambda expects CloudEvents
    - type: kafka
      config:
        id: lambda-sink
        brokers: ${KAFKA_BROKERS}
        topic: lambda-events
        envelope:
          type: cloudevents
          type_prefix: "com.acme.cdc"
        required: false

    # Redis cache uses native format
    - type: redis
      config:
        id: cache-redis
        uri: ${REDIS_URI}
        stream: orders-cache
        envelope:
          type: native
        required: false

  batch:
    max_events: 500
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: required

MySQL to NATS

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-mysql-to-nats
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders
        - shop.order_items

  sinks:
    - type: nats
      config:
        id: orders-nats
        url: ${NATS_URL}
        subject: orders.events
        stream: ORDERS
        envelope:
          type: native
        encoding: json
        required: true

  batch:
    max_events: 500
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: required