Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Dynamic Routing

Dynamic routing controls where each CDC event is delivered - which Kafka topic, Redis stream, or NATS subject receives it. By default, all events go to the single destination configured in the sink (static routing). With dynamic routing, events can be split across destinations based on their content or other attributes of events, pipeline and etc.

Overview

There are two routing mechanisms, and they compose naturally:

  1. Template strings in sink config - resolve per-event from event fields
  2. JavaScript ev.route() - programmatic per-event routing in processors

When both are used, ev.route() overrides take highest priority, then template resolution, then the static config value.

Template Routing

Replace static topic/stream/subject strings with templates containing ${...} variables. Templates are compiled once at startup and resolved per-event with zero regex overhead.

Kafka

sinks:
  - type: kafka
    config:
      id: kafka-routed
      brokers: ${KAFKA_BROKERS}
      topic: "cdc.${source.db}.${source.table}"
      key: "${after.customer_id}"
      envelope:
        type: debezium

Events from shop.orders -> topic cdc.shop.orders, partitioned by customer_id.

Redis

sinks:
  - type: redis
    config:
      id: redis-routed
      uri: ${REDIS_URI}
      stream: "events:${source.table}"
      key: "${after.id}"

Events from orders -> stream events:orders. The key value appears as the df-key field in each stream entry.

NATS

sinks:
  - type: nats
    config:
      id: nats-routed
      url: ${NATS_URL}
      subject: "cdc.${source.db}.${source.table}"
      key: "${after.id}"
      stream: CDC

Events from shop.orders -> subject cdc.shop.orders. The key value appears as the df-key NATS header.

Available Variables

VariableDescriptionExample value
${source.table}Table nameorders
${source.db}Database nameshop
${source.schema}Schema name (PostgreSQL)public
${source.connector}Source typemysql
${op}Operation codec, u, d, r, t
${after.<field>}Field from after image42, cust-abc
${before.<field>}Field from before imageold-value
${tenant_id}Pipeline tenant IDacme

Missing fields resolve to an empty string. A warning is logged once per unique template, not per event.

Static strings (no ${...}) are detected at parse time and have zero overhead on the hot path - no allocation, no resolution.

Env Vars vs Templates

Both use ${...} syntax. The config loader expands environment variables first. Unknown variables pass through as templates for runtime resolution:

brokers: ${KAFKA_BROKERS}      # env var - expanded at load time
topic: "cdc.${source.table}"   # template - passed through to runtime
key: "${after.customer_id}"    # template - resolved per-event

JavaScript Routing

For routing logic that goes beyond field substitution, use ev.route() in a JavaScript processor. This lets you make conditional routing decisions based on event content.

processors:
  - type: javascript
    id: smart-router
    inline: |
      function processBatch(events) {
        for (const ev of events) {
          if (!ev.after) continue;

          if (ev.after.total_amount > 10000) {
            ev.route({
              topic: "orders.priority",
              key: String(ev.after.customer_id),
              headers: {
                "x-tier": "high-value",
                "x-amount": String(ev.after.total_amount)
              }
            });
          } else {
            ev.route({
              topic: "orders.standard",
              key: String(ev.after.customer_id)
            });
          }
        }
        return events;
      }

ev.route() fields

FieldTypeDescription
topicstringOverride destination (topic, stream, or subject)
keystringOverride message/partition key
headersobjectKey-value pairs added to the message

All fields are optional. Only set fields override; omitted fields fall through to config templates or static values.

Calling ev.route() replaces any previous routing on that event - it does not merge.

How headers are delivered

SinkKey deliveryHeader delivery
KafkaKafka message keyKafka message headers
Redisdf-key field in stream entrydf-headers field (JSON)
NATSdf-key NATS headerIndividual NATS headers

Resolution Order

For each event, the destination is resolved in priority order:

ev.route() override  →  config template  →  static config value

Specifically:

  1. If the event has routing.topic set (via ev.route() or programmatically), use it
  2. If the sink config contains a template (has ${...}), resolve it from event fields
  3. Otherwise, use the static config string

The same order applies independently to key and headers.

Examples

See the complete example configurations: