Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Configuration

Pipelines are defined as YAML documents that map directly to the PipelineSpec type. Environment variables are expanded before parsing using ${VAR} syntax, so secrets and connection strings can be injected at runtime.

Document structure

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: <pipeline-name>
  tenant: <tenant-id>
spec:
  source: { ... }
  processors: [ ... ]
  sinks: [ ... ]
  batch: { ... }
  commit_policy: { ... }
  schema_sensing: { ... }

Metadata

FieldTypeRequiredDescription
namestringYesUnique pipeline identifier. Used in API routes and metrics.
tenantstringYesBusiness-oriented tenant label for multi-tenancy.

Spec fields

FieldTypeRequiredDescription
sourceobjectYesDatabase source configuration. See Sources.
processorsarrayNoOrdered list of processors. See Processors.
sinksarrayYes (at least one)One or more sinks that receive each batch. See Sinks.
shardingobjectNoOptional hint for downstream distribution.
connection_policyobjectNoHow the runtime establishes upstream connections.
batchobjectNoCommit unit thresholds. See Batching.
commit_policyobjectNoHow sink acknowledgements gate checkpoints. See Commit policy.
schema_sensingobjectNoAutomatic schema inference from event payloads. See Schema sensing.

Sources

MySQL

Captures row-level changes via binlog replication. See MySQL source documentation for prerequisites and detailed configuration.

source:
  type: mysql
  config:
    id: orders-mysql
    dsn: ${MYSQL_DSN}
    tables:
      - shop.orders
      - shop.order_items
FieldTypeDescription
idstringUnique identifier for checkpoints and metrics
dsnstringMySQL connection string with replication privileges
tablesarrayTable patterns to capture; omit for all tables

Table patterns support SQL LIKE syntax:

  • db.table - exact match
  • db.prefix% - tables matching prefix
  • db.* - all tables in database
  • %.table - table in any database

Turso

source:
  type: turso
  config:
    id: turso-main
    url: "libsql://your-db.turso.io"
    auth_token: ${TURSO_AUTH_TOKEN}
    tables: ["users", "orders"]
    cdc_mode: auto
    poll_interval_ms: 1000
    native_cdc:
      level: data
FieldTypeDefaultDescription
idstringLogical identifier
urlstringDatabase URL
auth_tokenstringTurso cloud token
tablesarrayTables to track
cdc_modestringautonative, triggers, polling, auto
poll_interval_msint1000Polling interval
native_cdc.levelstringdatabinlog or data

PostgreSQL

Captures row-level changes via logical replication using the pgoutput plugin. See PostgreSQL source documentation for prerequisites and detailed configuration.

source:
  type: postgres
  config:
    id: orders-postgres
    dsn: ${POSTGRES_DSN}
    slot: deltaforge_orders
    publication: orders_pub
    tables:
      - public.orders
      - public.order_items
    start_position: earliest
FieldTypeDefaultDescription
idstringUnique identifier
dsnstringPostgreSQL connection string
slotstringReplication slot name
publicationstringPublication name
tablesarrayTable patterns to capture
start_positionstringearliestearliest, latest, or {lsn: "X/Y"}

Table patterns support SQL LIKE syntax (same as MySQL):

  • schema.table - exact match
  • schema.prefix% - tables matching prefix
  • schema.* - all tables in schema
  • %.table - table in any schema
  • table - defaults to public.table

Processors

Processors transform batches of events before delivery to sinks.

JavaScript

processors:
  - type: javascript
    id: transform
    inline: |
      function processBatch(events) {
        return events.map(event => {
          event.tags = ["processed"];
          return event;
        });
      }
    limits:
      cpu_ms: 50
      mem_mb: 128
      timeout_ms: 500
FieldTypeDescription
idstringProcessor identifier
inlinestringJS source defining processBatch(events)
limits.cpu_msintCPU time limit per batch
limits.mem_mbintMemory limit
limits.timeout_msintExecution timeout

The processBatch(events) function receives an array of events and can return:

  • An array of events (modified, filtered, or expanded)
  • A single event object (wrapped in array automatically)
  • null or undefined to use the mutated input array
  • Empty array [] to drop all events

Sinks

Kafka

sinks:
  - type: kafka
    config:
      id: orders-kafka
      brokers: ${KAFKA_BROKERS}
      topic: orders
      required: true
      exactly_once: false
      client_conf:
        message.timeout.ms: "5000"
FieldTypeDefaultDescription
idstringSink identifier
brokersstringComma-separated broker list
topicstringDestination topic
requiredbooltrueWhether this sink gates checkpoints
exactly_onceboolfalseEnable EOS semantics
client_confmap{}Raw librdkafka config overrides

Redis

sinks:
  - type: redis
    config:
      id: orders-redis
      uri: ${REDIS_URI}
      stream: orders
      required: true
FieldTypeDefaultDescription
idstringSink identifier
uristringRedis connection URI
streamstringRedis stream key
requiredbooltrueWhether this sink gates checkpoints

NATS

sinks:
  - type: nats
    config:
      id: orders-nats
      url: ${NATS_URL}
      subject: orders.events
      stream: ORDERS
      required: true
      send_timeout_secs: 5
      batch_timeout_secs: 30
FieldTypeDefaultDescription
idstringSink identifier
urlstringNATS server URL
subjectstringSubject to publish to
streamstringJetStream stream name
requiredbooltrueGates checkpoints
send_timeout_secsint5Publish timeout
batch_timeout_secsint30Batch timeout
connect_timeout_secsint10Connection timeout
credentials_filestringNATS credentials file
usernamestringAuth username
passwordstringAuth password
tokenstringAuth token

Batching

batch:
  max_events: 500
  max_bytes: 1048576
  max_ms: 1000
  respect_source_tx: true
  max_inflight: 2
FieldTypeDefaultDescription
max_eventsint500Flush after this many events
max_bytesint1048576Flush after size reaches limit
max_msint1000Flush after time (ms)
respect_source_txbooltrueNever split source transactions
max_inflightint2Max concurrent batches

Commit policy

commit_policy:
  mode: required

# For quorum mode:
commit_policy:
  mode: quorum
  quorum: 2
ModeDescription
allEvery sink must acknowledge before checkpoint
requiredOnly required: true sinks must acknowledge (default)
quorumCheckpoint after quorum sinks acknowledge

Schema sensing

schema_sensing:
  enabled: true
  deep_inspect:
    enabled: true
    max_depth: 3
    max_sample_size: 500
  sampling:
    warmup_events: 50
    sample_rate: 5
    structure_cache: true
    structure_cache_size: 50
FieldTypeDefaultDescription
enabledboolfalseEnable schema sensing
deep_inspect.enabledbooltrueInspect nested JSON
deep_inspect.max_depthint10Max nesting depth
deep_inspect.max_sample_sizeint1000Max events for deep analysis
sampling.warmup_eventsint1000Events to fully analyze first
sampling.sample_rateint10After warmup, analyze 1 in N
sampling.structure_cachebooltrueCache structure fingerprints
sampling.structure_cache_sizeint100Max cached structures

Complete example

MySQL to Kafka

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-mysql-to-kafka
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders

  processors:
    - type: javascript
      id: transform
      inline: |
        function processBatch(events) {
          return events.map(event => {
            event.tags = (event.tags || []).concat(["normalized"]);
            return event;
          });
        }
      limits:
        cpu_ms: 50
        mem_mb: 128
        timeout_ms: 500

  sinks:
    - type: kafka
      config:
        id: orders-kafka
        brokers: ${KAFKA_BROKERS}
        topic: orders
        required: true
        exactly_once: false
        client_conf:
          message.timeout.ms: "5000"
    - type: redis
      config:
        id: orders-redis
        uri: ${REDIS_URI}
        stream: orders
        required: false

  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 1000
    respect_source_tx: true
    max_inflight: 2

  commit_policy:
    mode: required

  schema_sensing:
    enabled: true
    deep_inspect:
      enabled: true
      max_depth: 3
    sampling:
      warmup_events: 50
      sample_rate: 5

PostgreSQL to Kafka

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: users-postgres-to-kafka
  tenant: acme

spec:
  source:
    type: postgres
    config:
      id: users-postgres
      dsn: ${POSTGRES_DSN}
      slot: deltaforge_users
      publication: users_pub
      tables:
        - public.users
        - public.user_sessions
      start_position: earliest

  sinks:
    - type: kafka
      config:
        id: users-kafka
        brokers: ${KAFKA_BROKERS}
        topic: user-events
        required: true

  batch:
    max_events: 500
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: required

MySQL to NATS

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: orders-mysql-to-nats
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders
        - shop.order_items

  sinks:
    - type: nats
      config:
        id: orders-nats
        url: ${NATS_URL}
        subject: orders.events
        stream: ORDERS
        required: true

  batch:
    max_events: 500
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: required