Pipelines are defined as YAML documents that map directly to the PipelineSpec type. Environment variables are expanded before parsing using ${VAR} syntax, so secrets and connection strings can be injected at runtime.
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: <pipeline-name>
tenant: <tenant-id>
spec:
source: { ... }
processors: [ ... ]
sinks: [ ... ]
batch: { ... }
commit_policy: { ... }
schema_sensing: { ... }
Field Type Required Description
namestring Yes Unique pipeline identifier. Used in API routes and metrics.
tenantstring Yes Business-oriented tenant label for multi-tenancy.
Field Type Required Description
sourceobject Yes Database source configuration. See Sources .
processorsarray No Ordered list of processors. See Processors .
sinksarray Yes (at least one) One or more sinks that receive each batch. See Sinks .
shardingobject No Optional hint for downstream distribution.
connection_policyobject No How the runtime establishes upstream connections.
batchobject No Commit unit thresholds. See Batching .
commit_policyobject No How sink acknowledgements gate checkpoints. See Commit policy .
schema_sensingobject No Automatic schema inference from event payloads. See Schema sensing .
Captures row-level changes via binlog replication. See MySQL source documentation for prerequisites and detailed configuration.
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.order_items
Field Type Description
idstring Unique identifier for checkpoints and metrics
dsnstring MySQL connection string with replication privileges
tablesarray Table patterns to capture; omit for all tables
Table patterns support SQL LIKE syntax:
db.table - exact match
db.prefix% - tables matching prefix
db.* - all tables in database
%.table - table in any database
source:
type: turso
config:
id: turso-main
url: "libsql://your-db.turso.io"
auth_token: ${TURSO_AUTH_TOKEN}
tables: ["users", "orders"]
cdc_mode: auto
poll_interval_ms: 1000
native_cdc:
level: data
Field Type Default Description
idstring — Logical identifier
urlstring — Database URL
auth_tokenstring — Turso cloud token
tablesarray — Tables to track
cdc_modestring autonative, triggers, polling, auto
poll_interval_msint 1000Polling interval
native_cdc.levelstring databinlog or data
Captures row-level changes via logical replication using the pgoutput plugin. See PostgreSQL source documentation for prerequisites and detailed configuration.
source:
type: postgres
config:
id: orders-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_orders
publication: orders_pub
tables:
- public.orders
- public.order_items
start_position: earliest
Field Type Default Description
idstring — Unique identifier
dsnstring — PostgreSQL connection string
slotstring — Replication slot name
publicationstring — Publication name
tablesarray — Table patterns to capture
start_positionstring earliestearliest, latest, or {lsn: "X/Y"}
Table patterns support SQL LIKE syntax (same as MySQL):
schema.table - exact match
schema.prefix% - tables matching prefix
schema.* - all tables in schema
%.table - table in any schema
table - defaults to public.table
Processors transform batches of events before delivery to sinks.
processors:
- type: javascript
id: transform
inline: |
function processBatch(events) {
return events.map(event => {
event.tags = ["processed"];
return event;
});
}
limits:
cpu_ms: 50
mem_mb: 128
timeout_ms: 500
Field Type Description
idstring Processor identifier
inlinestring JS source defining processBatch(events)
limits.cpu_msint CPU time limit per batch
limits.mem_mbint Memory limit
limits.timeout_msint Execution timeout
The processBatch(events) function receives an array of events and can return:
An array of events (modified, filtered, or expanded)
A single event object (wrapped in array automatically)
null or undefined to use the mutated input array
Empty array [] to drop all events
sinks:
- type: kafka
config:
id: orders-kafka
brokers: ${KAFKA_BROKERS}
topic: orders
required: true
exactly_once: false
client_conf:
message.timeout.ms: "5000"
Field Type Default Description
idstring — Sink identifier
brokersstring — Comma-separated broker list
topicstring — Destination topic
requiredbool trueWhether this sink gates checkpoints
exactly_oncebool falseEnable EOS semantics
client_confmap {}Raw librdkafka config overrides
sinks:
- type: redis
config:
id: orders-redis
uri: ${REDIS_URI}
stream: orders
required: true
Field Type Default Description
idstring — Sink identifier
uristring — Redis connection URI
streamstring — Redis stream key
requiredbool trueWhether this sink gates checkpoints
sinks:
- type: nats
config:
id: orders-nats
url: ${NATS_URL}
subject: orders.events
stream: ORDERS
required: true
send_timeout_secs: 5
batch_timeout_secs: 30
Field Type Default Description
idstring — Sink identifier
urlstring — NATS server URL
subjectstring — Subject to publish to
streamstring — JetStream stream name
requiredbool trueGates checkpoints
send_timeout_secsint 5Publish timeout
batch_timeout_secsint 30Batch timeout
connect_timeout_secsint 10Connection timeout
credentials_filestring — NATS credentials file
usernamestring — Auth username
passwordstring — Auth password
tokenstring — Auth token
batch:
max_events: 500
max_bytes: 1048576
max_ms: 1000
respect_source_tx: true
max_inflight: 2
Field Type Default Description
max_eventsint 500Flush after this many events
max_bytesint 1048576Flush after size reaches limit
max_msint 1000Flush after time (ms)
respect_source_txbool trueNever split source transactions
max_inflightint 2Max concurrent batches
commit_policy:
mode: required
# For quorum mode:
commit_policy:
mode: quorum
quorum: 2
Mode Description
allEvery sink must acknowledge before checkpoint
requiredOnly required: true sinks must acknowledge (default)
quorumCheckpoint after quorum sinks acknowledge
schema_sensing:
enabled: true
deep_inspect:
enabled: true
max_depth: 3
max_sample_size: 500
sampling:
warmup_events: 50
sample_rate: 5
structure_cache: true
structure_cache_size: 50
Field Type Default Description
enabledbool falseEnable schema sensing
deep_inspect.enabledbool trueInspect nested JSON
deep_inspect.max_depthint 10Max nesting depth
deep_inspect.max_sample_sizeint 1000Max events for deep analysis
sampling.warmup_eventsint 1000Events to fully analyze first
sampling.sample_rateint 10After warmup, analyze 1 in N
sampling.structure_cachebool trueCache structure fingerprints
sampling.structure_cache_sizeint 100Max cached structures
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-mysql-to-kafka
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
processors:
- type: javascript
id: transform
inline: |
function processBatch(events) {
return events.map(event => {
event.tags = (event.tags || []).concat(["normalized"]);
return event;
});
}
limits:
cpu_ms: 50
mem_mb: 128
timeout_ms: 500
sinks:
- type: kafka
config:
id: orders-kafka
brokers: ${KAFKA_BROKERS}
topic: orders
required: true
exactly_once: false
client_conf:
message.timeout.ms: "5000"
- type: redis
config:
id: orders-redis
uri: ${REDIS_URI}
stream: orders
required: false
batch:
max_events: 500
max_bytes: 1048576
max_ms: 1000
respect_source_tx: true
max_inflight: 2
commit_policy:
mode: required
schema_sensing:
enabled: true
deep_inspect:
enabled: true
max_depth: 3
sampling:
warmup_events: 50
sample_rate: 5
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: users-postgres-to-kafka
tenant: acme
spec:
source:
type: postgres
config:
id: users-postgres
dsn: ${POSTGRES_DSN}
slot: deltaforge_users
publication: users_pub
tables:
- public.users
- public.user_sessions
start_position: earliest
sinks:
- type: kafka
config:
id: users-kafka
brokers: ${KAFKA_BROKERS}
topic: user-events
required: true
batch:
max_events: 500
max_ms: 1000
respect_source_tx: true
commit_policy:
mode: required
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
name: orders-mysql-to-nats
tenant: acme
spec:
source:
type: mysql
config:
id: orders-mysql
dsn: ${MYSQL_DSN}
tables:
- shop.orders
- shop.order_items
sinks:
- type: nats
config:
id: orders-nats
url: ${NATS_URL}
subject: orders.events
stream: ORDERS
required: true
batch:
max_events: 500
max_ms: 1000
respect_source_tx: true
commit_policy:
mode: required