Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Kafka

Kafka sink

The Kafka sink publishes batches to a Kafka topic using rdkafka.

When to use Kafka

Kafka excels as the backbone for event-driven architectures where durability, ordering, and replay capabilities are critical.

Real-world applications

Use CaseDescription
Event sourcingStore all state changes as an immutable log; rebuild application state by replaying events
Microservices integrationDecouple services with async messaging; each service consumes relevant topics
Real-time analytics pipelinesFeed CDC events to Spark, Flink, or ksqlDB for streaming transformations
Data lake ingestionStream database changes to S3/HDFS via Kafka Connect for analytics and ML
Audit loggingCapture every database mutation for compliance, debugging, and forensics
Cross-datacenter replicationUse MirrorMaker 2 to replicate topics across regions for DR

Pros and cons

ProsCons
Durability - Configurable replication ensures no data lossOperational complexity - Requires ZooKeeper/KRaft, careful tuning
Ordering guarantees - Per-partition ordering with consumer groupsLatency - Batching and replication add milliseconds of delay
Replay capability - Configurable retention allows reprocessingResource intensive - High disk I/O and memory requirements
Ecosystem - Connect, Streams, Schema Registry, ksqlDBLearning curve - Partitioning, offsets, consumer groups to master
Throughput - Handles millions of messages per secondCold start - Cluster setup and topic configuration overhead
Exactly-once semantics - Transactions for critical workloadsCost - Managed services can be expensive at scale

Configuration

sinks:
  - type: kafka
    config:
      id: orders-kafka
      brokers: ${KAFKA_BROKERS}
      topic: orders
      required: true
      exactly_once: false
      client_conf:
        message.timeout.ms: "5000"
        acks: "all"
FieldTypeDefaultDescription
idstringSink identifier
brokersstringComma-separated broker list
topicstringDestination topic
requiredbooltrueGates checkpoints
exactly_onceboolfalseEnable EOS semantics
client_confmap{}librdkafka overrides

Exactly-once semantics

When exactly_once: true, the Kafka sink uses a transactional producer. Each batch is wrapped in a Kafka transaction (begin_transaction / commit_transaction), so either all events in the batch are committed atomically or none are.

sinks:
  - type: kafka
    config:
      id: orders-kafka
      brokers: ${KAFKA_BROKERS}
      topic: orders
      exactly_once: true

How it works

  1. DeltaForge assigns a stable transactional.id per pipeline-sink pair (deltaforge-{pipeline}-{sink_id})
  2. On startup, init_transactions() registers with the broker and fences any zombie producer from a previous instance
  3. Each batch: begin_transaction() → produce messages → commit_transaction()
  4. If commit fails, the transaction is aborted and the batch retried
  5. Consumers using isolation.level=read_committed only see committed batches

Requirements

  • Kafka 2.5+ (transaction support)
  • isolation.level=read_committed on consumers (otherwise they see uncommitted messages)
  • Only one DeltaForge instance per pipeline at a time (the broker will fence duplicates)

Transactional overrides

When exactly_once: true, DeltaForge automatically configures:

SettingValueWhy
transactional.iddeltaforge-{pipeline}-{sink_id}Broker fencing
transaction.timeout.ms60000Max transaction duration
enable.idempotencetrueRequired for transactions
acksallRequired for transactions
message.timeout.ms30000Must be ≤ transaction.timeout.ms
delivery.timeout.ms30000Must be ≤ transaction.timeout.ms

You do not need to set these in client_conf — they are applied automatically and cannot be overridden.

Fatal errors

If the broker fences the producer (another instance started with the same transactional.id), DeltaForge treats this as a fatal error and stops the pipeline. This is not retryable — resolve the duplicate instance before restarting.

Performance impact

Transactions add ~1-3ms overhead per batch for the two-phase commit. With properly sized batches (max_events=16000, max_bytes=16MB), throughput impact is ~7-11%. See the Performance guide for tuning details and benchmark results.

SettingRecommendedDescription
acksallWait for all replicas for durability
message.timeout.ms30000Total time to deliver a message
retries2147483647Retry indefinitely (with backoff)
enable.idempotencetruePrevent duplicates on retry
compression.typelz4Balance between CPU and bandwidth

Consuming events

Kafka CLI

# Consume from beginning
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --from-beginning

# Consume with consumer group
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --group deltaforge-consumers

Go consumer example

config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest

group, _ := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)

for {
    err := group.Consume(ctx, []string{"orders"}, &handler{})
    if err != nil {
        log.Printf("Consumer error: %v", err)
    }
}

type handler struct{}

func (h *handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        var event Event
        json.Unmarshal(msg.Value, &event)
        process(event)
        session.MarkMessage(msg, "")
    }
    return nil
}

Rust consumer example

#![allow(unused)]
fn main() {
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;

let consumer: StreamConsumer = ClientConfig::new()
    .set("bootstrap.servers", "localhost:9092")
    .set("group.id", "my-group")
    .set("auto.offset.reset", "earliest")
    .create()?;

consumer.subscribe(&["orders"])?;

loop {
    match consumer.recv().await {
        Ok(msg) => {
            let payload = msg.payload_view::<str>().unwrap()?;
            let event: Event = serde_json::from_str(payload)?;
            process(event);
            consumer.commit_message(&msg, CommitMode::Async)?;
        }
        Err(e) => eprintln!("Kafka error: {}", e),
    }
}
}

Python consumer example

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    event = message.value
    process(event)
    consumer.commit()

Failure modes

FailureSymptomsDeltaForge behaviorResolution
Broker unavailableConnection refused, timeoutRetries with backoff; blocks checkpointRestore broker; check network
Topic not foundUnknownTopicOrPartitionFails batch; retriesCreate topic or enable auto-create
Authentication failureSaslAuthenticationFailedFails fast, no retryFix credentials in config
Authorization failureTopicAuthorizationFailedFails fast, no retryGrant ACLs for producer
Message too largeMessageSizeTooLargeFails message permanentlyIncrease message.max.bytes or filter large events
Leader electionNotLeaderForPartitionAutomatic retry after metadata refreshWait for election; usually transient
Disk fullKafkaStorageExceptionRetries indefinitelyAdd disk space; purge old segments
Network partitionTimeouts, partial failuresRetries; may produce duplicatesRestore network; idempotence prevents dups
Producer fencedProducerFenced errorFatal — pipeline stops immediatelyEnsure only one instance per pipeline; restart after resolving
Transaction timeouttransaction.timeout.ms exceededTransaction aborted; batch retriedIncrease timeout or reduce batch size
SSL/TLS errorsHandshake failuresFails fastFix certificates, verify truststore

Failure scenarios and data guarantees

Broker failure during batch delivery

  1. DeltaForge sends batch of 100 events
  2. 50 events delivered, broker crashes
  3. rdkafka detects failure, retries remaining 50
  4. If idempotence enabled: no duplicates
  5. If not: possible duplicates of events near failure point
  6. Checkpoint only saved after ALL events acknowledged

DeltaForge crash after Kafka ack, before checkpoint

  1. Batch delivered to Kafka successfully
  2. DeltaForge crashes before saving checkpoint
  3. On restart: replays from last checkpoint
  4. Without exactly_once: duplicate events in Kafka (at-least-once); consumer must handle idempotently
  5. With exactly_once: replayed batch gets a new transaction; consumers with read_committed see duplicates but each batch is atomic

Monitoring recommendations

DeltaForge exposes these metrics for Kafka sink monitoring:

# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink}      # Events delivered
deltaforge_sink_batch_total{pipeline,sink}       # Batches delivered
deltaforge_sink_latency_seconds{pipeline,sink}   # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"}  # Stage timing

For deeper Kafka broker visibility, monitor your Kafka cluster directly:

  • Broker metrics via JMX or Kafka’s built-in metrics
  • Consumer lag via kafka-consumer-groups.sh
  • Topic throughput via broker dashboards

Note: Internal rdkafka producer statistics (message queues, broker RTT, etc.) are not currently exposed by DeltaForge. This is a potential future enhancement.

Notes

  • Combine Kafka with other sinks to fan out data; use commit policy to control checkpoint behavior
  • For exactly-once semantics, set exactly_once: true and ensure your Kafka cluster supports transactions (2.5+)
  • With exactly_once: false (default), idempotent production is still enabled — duplicates are prevented during retries but not across DeltaForge restarts
  • Adjust client_conf for durability (acks=all) or performance based on your requirements
  • Consider partitioning strategy for ordering guarantees within partitions