Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Sinks

Sinks receive batches from the coordinator after processors run. Each sink lives under spec.sinks and can be marked as required or best-effort via the required flag. Checkpoint behavior is governed by the pipeline’s commit policy.

Envelope and Encoding

All sinks support configurable envelope formats and wire encodings. See the Envelopes and Encodings page for detailed documentation.

OptionValuesDefaultDescription
envelopenative, debezium, cloudeventsnativeOutput JSON structure
encodingjsonjsonWire format

Quick example:

sinks:
  - type: kafka
    config:
      id: events-kafka
      brokers: localhost:9092
      topic: events
      envelope:
        type: cloudevents
        type_prefix: "com.example.cdc"
      encoding: json

Available Sinks

SinkDescription
kafkaKafka producer sink
natsNATS JetStream sink
redisRedis stream sink

Multiple sinks in one pipeline

You can combine multiple sinks in one pipeline to fan out events to different destinations. However, multi-sink pipelines introduce complexity that requires careful consideration.

Why multiple sinks are challenging

Different performance characteristics: Kafka might handle 100K events/sec while a downstream HTTP webhook processes 100/sec. The slowest sink becomes the bottleneck for the entire pipeline.

Independent failure modes: Each sink can fail independently. Redis might be healthy while Kafka experiences broker failures. Without proper handling, a single sink failure could block the entire pipeline or cause data loss.

No distributed transactions: DeltaForge cannot atomically commit across heterogeneous systems. If Kafka succeeds but Redis fails mid-batch, you face a choice: retry Redis (risking duplicates in Kafka) or skip Redis (losing data there).

Checkpoint semantics: The checkpoint represents “how far we’ve processed from the source.” With multiple sinks, when is it safe to advance? After one sink succeeds? All of them? A majority?

Read the required and commit_policy sections below for options to manage these challenges.

The required flag

The required flag on each sink determines whether that sink must acknowledge successful delivery before the checkpoint advances:

sinks:
  - type: kafka
    config:
      id: primary-kafka
      required: true    # Must succeed for checkpoint to advance
      
  - type: redis
    config:
      id: cache-redis
      required: false   # Best-effort; failures don't block checkpoint

When required: true (default): The sink must acknowledge the batch before the checkpoint can advance. If this sink fails, the pipeline blocks and retries until it succeeds or the operator intervenes.

When required: false: The sink is best-effort. Failures are logged but don’t prevent the checkpoint from advancing. Use this for non-critical destinations where some data loss is acceptable.

Commit policy

The commit_policy works with the required flag to determine checkpoint behavior:

PolicyBehavior
allEvery sink (regardless of required flag) must acknowledge
requiredOnly sinks with required: true must acknowledge (default)
quorumAt least N sinks must acknowledge
commit_policy:
  mode: required   # Only wait for required sinks

sinks:
  - type: kafka
    config:
      required: true   # Checkpoint waits for this
  - type: redis  
    config:
      required: false  # Checkpoint doesn't wait for this
  - type: nats
    config:
      required: true   # Checkpoint waits for this

Practical patterns

Primary + secondary: One critical sink (Kafka for durability) marked required: true, with secondary sinks (Redis for caching, testing or experimentation) marked required: false.

Quorum for redundancy: Three sinks with commit_policy.mode: quorum and quorum: 2. Checkpoint advances when any two succeed, providing fault tolerance.

All-or-nothing: Use commit_policy.mode: all when every destination is critical and you need the strongest consistency guarantee (but affecting rate of delivery).

Multi-format fan-out

For sending the same events to different consumers that expect different formats:

sinks:
  # Kafka Connect expects Debezium format
  - type: kafka
    config:
      id: connect-sink
      brokers: ${KAFKA_BROKERS}
      topic: connect-events
      envelope:
        type: debezium
      required: true

  # Lambda expects CloudEvents
  - type: kafka
    config:
      id: lambda-sink
      brokers: ${KAFKA_BROKERS}
      topic: lambda-events
      envelope:
        type: cloudevents
        type_prefix: "com.acme.cdc"
      required: false

  # Analytics wants raw events
  - type: redis
    config:
      id: analytics-redis
      uri: ${REDIS_URI}
      stream: analytics
      envelope:
        type: native
      required: false

This allows each consumer to receive events in their preferred format without post-processing.