Kafka sink
The Kafka sink publishes batches to a Kafka topic using rdkafka.
When to use Kafka
Kafka excels as the backbone for event-driven architectures where durability, ordering, and replay capabilities are critical.
Real-world applications
| Use Case | Description |
|---|---|
| Event sourcing | Store all state changes as an immutable log; rebuild application state by replaying events |
| Microservices integration | Decouple services with async messaging; each service consumes relevant topics |
| Real-time analytics pipelines | Feed CDC events to Spark, Flink, or ksqlDB for streaming transformations |
| Data lake ingestion | Stream database changes to S3/HDFS via Kafka Connect for analytics and ML |
| Audit logging | Capture every database mutation for compliance, debugging, and forensics |
| Cross-datacenter replication | Use MirrorMaker 2 to replicate topics across regions for DR |
Pros and cons
| Pros | Cons |
|---|---|
| ✅ Durability - Configurable replication ensures no data loss | ❌ Operational complexity - Requires ZooKeeper/KRaft, careful tuning |
| ✅ Ordering guarantees - Per-partition ordering with consumer groups | ❌ Latency - Batching and replication add milliseconds of delay |
| ✅ Replay capability - Configurable retention allows reprocessing | ❌ Resource intensive - High disk I/O and memory requirements |
| ✅ Ecosystem - Connect, Streams, Schema Registry, ksqlDB | ❌ Learning curve - Partitioning, offsets, consumer groups to master |
| ✅ Throughput - Handles millions of messages per second | ❌ Cold start - Cluster setup and topic configuration overhead |
| ✅ Exactly-once semantics - Transactions for critical workloads | ❌ Cost - Managed services can be expensive at scale |
Configuration
|
|
Exactly-once semantics
When exactly_once: true, the Kafka sink uses a transactional producer. Each batch is wrapped in a Kafka transaction (begin_transaction / commit_transaction), so either all events in the batch are committed atomically or none are.
sinks:
- type: kafka
config:
id: orders-kafka
brokers: ${KAFKA_BROKERS}
topic: orders
exactly_once: true
How it works
- DeltaForge assigns a stable
transactional.idper pipeline-sink pair (deltaforge-{pipeline}-{sink_id}) - On startup,
init_transactions()registers with the broker and fences any zombie producer from a previous instance - Each batch:
begin_transaction()→ produce messages →commit_transaction() - If commit fails, the transaction is aborted and the batch retried
- Consumers using
isolation.level=read_committedonly see committed batches
Requirements
- Kafka 2.5+ (transaction support)
isolation.level=read_committedon consumers (otherwise they see uncommitted messages)- Only one DeltaForge instance per pipeline at a time (the broker will fence duplicates)
Transactional overrides
When exactly_once: true, DeltaForge automatically configures:
| Setting | Value | Why |
|---|---|---|
transactional.id | deltaforge-{pipeline}-{sink_id} | Broker fencing |
transaction.timeout.ms | 60000 | Max transaction duration |
enable.idempotence | true | Required for transactions |
acks | all | Required for transactions |
message.timeout.ms | 30000 | Must be ≤ transaction.timeout.ms |
delivery.timeout.ms | 30000 | Must be ≤ transaction.timeout.ms |
You do not need to set these in client_conf — they are applied automatically and cannot be overridden.
Fatal errors
If the broker fences the producer (another instance started with the same transactional.id), DeltaForge treats this as a fatal error and stops the pipeline. This is not retryable — resolve the duplicate instance before restarting.
Performance impact
Transactions add ~1-3ms overhead per batch for the two-phase commit. With properly sized batches (max_events=16000, max_bytes=16MB), throughput impact is ~7-11%. See the Performance guide for tuning details and benchmark results.
Recommended client_conf settings
| Setting | Recommended | Description |
|---|---|---|
acks | all | Wait for all replicas for durability |
message.timeout.ms | 30000 | Total time to deliver a message |
retries | 2147483647 | Retry indefinitely (with backoff) |
enable.idempotence | true | Prevent duplicates on retry |
compression.type | lz4 | Balance between CPU and bandwidth |
Consuming events
Kafka CLI
# Consume from beginning
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--from-beginning
# Consume with consumer group
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--group deltaforge-consumers
Go consumer example
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest
group, _ := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
for {
err := group.Consume(ctx, []string{"orders"}, &handler{})
if err != nil {
log.Printf("Consumer error: %v", err)
}
}
type handler struct{}
func (h *handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
var event Event
json.Unmarshal(msg.Value, &event)
process(event)
session.MarkMessage(msg, "")
}
return nil
}
Rust consumer example
#![allow(unused)]
fn main() {
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", "my-group")
.set("auto.offset.reset", "earliest")
.create()?;
consumer.subscribe(&["orders"])?;
loop {
match consumer.recv().await {
Ok(msg) => {
let payload = msg.payload_view::<str>().unwrap()?;
let event: Event = serde_json::from_str(payload)?;
process(event);
consumer.commit_message(&msg, CommitMode::Async)?;
}
Err(e) => eprintln!("Kafka error: {}", e),
}
}
}
Python consumer example
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
event = message.value
process(event)
consumer.commit()
Failure modes
| Failure | Symptoms | DeltaForge behavior | Resolution |
|---|---|---|---|
| Broker unavailable | Connection refused, timeout | Retries with backoff; blocks checkpoint | Restore broker; check network |
| Topic not found | UnknownTopicOrPartition | Fails batch; retries | Create topic or enable auto-create |
| Authentication failure | SaslAuthenticationFailed | Fails fast, no retry | Fix credentials in config |
| Authorization failure | TopicAuthorizationFailed | Fails fast, no retry | Grant ACLs for producer |
| Message too large | MessageSizeTooLarge | Fails message permanently | Increase message.max.bytes or filter large events |
| Leader election | NotLeaderForPartition | Automatic retry after metadata refresh | Wait for election; usually transient |
| Disk full | KafkaStorageException | Retries indefinitely | Add disk space; purge old segments |
| Network partition | Timeouts, partial failures | Retries; may produce duplicates | Restore network; idempotence prevents dups |
| Producer fenced | ProducerFenced error | Fatal — pipeline stops immediately | Ensure only one instance per pipeline; restart after resolving |
| Transaction timeout | transaction.timeout.ms exceeded | Transaction aborted; batch retried | Increase timeout or reduce batch size |
| SSL/TLS errors | Handshake failures | Fails fast | Fix certificates, verify truststore |
Failure scenarios and data guarantees
Broker failure during batch delivery
- DeltaForge sends batch of 100 events
- 50 events delivered, broker crashes
- rdkafka detects failure, retries remaining 50
- If idempotence enabled: no duplicates
- If not: possible duplicates of events near failure point
- Checkpoint only saved after ALL events acknowledged
DeltaForge crash after Kafka ack, before checkpoint
- Batch delivered to Kafka successfully
- DeltaForge crashes before saving checkpoint
- On restart: replays from last checkpoint
- Without
exactly_once: duplicate events in Kafka (at-least-once); consumer must handle idempotently - With
exactly_once: replayed batch gets a new transaction; consumers withread_committedsee duplicates but each batch is atomic
Monitoring recommendations
DeltaForge exposes these metrics for Kafka sink monitoring:
# DeltaForge sink metrics (exposed at /metrics on port 9000)
deltaforge_sink_events_total{pipeline,sink} # Events delivered
deltaforge_sink_batch_total{pipeline,sink} # Batches delivered
deltaforge_sink_latency_seconds{pipeline,sink} # Delivery latency histogram
deltaforge_stage_latency_seconds{pipeline,stage="sink"} # Stage timing
For deeper Kafka broker visibility, monitor your Kafka cluster directly:
- Broker metrics via JMX or Kafka’s built-in metrics
- Consumer lag via
kafka-consumer-groups.sh - Topic throughput via broker dashboards
Note: Internal
rdkafkaproducer statistics (message queues, broker RTT, etc.) are not currently exposed by DeltaForge. This is a potential future enhancement.
Notes
- Combine Kafka with other sinks to fan out data; use commit policy to control checkpoint behavior
- For exactly-once semantics, set
exactly_once: trueand ensure your Kafka cluster supports transactions (2.5+) - With
exactly_once: false(default), idempotent production is still enabled — duplicates are prevented during retries but not across DeltaForge restarts - Adjust
client_conffor durability (acks=all) or performance based on your requirements - Consider partitioning strategy for ordering guarantees within partitions