Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Processors

Processors can transform, modify or take extra action per event batch between the source and sinks. They run in the order listed in spec.processors, and each receives the output of the previous one. A processor can modify events, filter them out, or add routing metadata.

Available Processors

ProcessorDescription
javascriptCustom transformations using V8-powered JavaScript
outboxTransactional outbox pattern - extracts payload, resolves topic, sets routing headers
flattenFlatten nested JSON objects into conmbined keys
filterDrop events by op type, table pattern, or field value

JavaScript

Run arbitrary JavaScript against each event batch. Uses the V8 engine via deno_core for near-native speed with configurable resource limits.

processors:
  - type: javascript
    id: enrich
    inline: |
      function processBatch(events) {
        return events.map(e => {
          e.tags = ["processed"];
          return e;
        });
      }
    limits:
      cpu_ms: 50
      mem_mb: 128
      timeout_ms: 500
FieldTypeDefaultDescription
idstring(required)Processor identifier
inlinestring(required)JavaScript source code
limits.cpu_msint50CPU time limit per batch
limits.mem_mbint128V8 heap memory limit
limits.timeout_msint500Wall-clock timeout per batch

Performance

JavaScript processing adds a fixed overhead per batch plus linear scaling per event. Benchmarks show ~70K+ events/sec throughput with typical transforms — roughly 61× slower than native Rust processors, but sufficient for most workloads. If you need higher throughput, keep transform logic simple or move heavy processing downstream.

Tips

  • Return an empty array to drop all events in a batch.
  • Return multiple copies of an event to fan out.
  • JavaScript numbers are f64 — integer columns wider than 53 bits may lose precision. Use string representation for such values.

Outbox

The outbox processor transforms events captured by the outbox pattern into routed, sink-ready events. It extracts business fields from the raw outbox payload, resolves the destination topic, and passes through all non-outbox events unchanged.

processors:
  - type: outbox
    topic: "${aggregate_type}.${event_type}"
    default_topic: "events.unrouted"
FieldTypeDefaultDescription
idstring"outbox"Processor identifier
tablesarray[]Only process outbox events matching these patterns. Empty = all.
topicstringTopic template with ${field} placeholders (resolved against raw payload columns)
default_topicstringFallback when template resolution fails
columnsobject(defaults below)Field name mappings
additional_headersmap{}Forward extra payload fields as routing headers. Key = header name, value = column name.
raw_payloadboolfalseDeliver payload as-is, bypassing envelope wrapping

Column Defaults

KeyDefaultDescription
payload"payload"Field containing the event body
aggregate_type"aggregate_type"Domain aggregate type
aggregate_id"aggregate_id"Aggregate identifier
event_type"event_type"Domain event type
topic"topic"Optional explicit topic override in payload

See the Outbox Pattern guide for source configuration, complete examples, and multi-outbox routing.

Flatten

Flattens nested JSON objects in event payloads into top-level keys joined by a configurable separator. Works on every object-valued field present on the event without assuming any particular envelope structure - CDC before/after, outbox business payloads, or any custom fields introduced by upstream processors are all handled uniformly.

processors:
  - type: flatten
    id: flat
    separator: "__"
    max_depth: 3
    on_collision: last
    empty_object: preserve
    lists: preserve
    empty_list: drop

Input:

{
  "after": {
    "order_id": "abc",
    "customer": {
      "id": 1,
      "address": { "city": "Berlin", "zip": "10115" }
    },
    "tags": ["vip"],
    "meta": {}
  }
}

Output (with defaults — separator: "__", empty_object: preserve, lists: preserve):

{
  "after": {
    "order_id": "abc",
    "customer__id": 1,
    "customer__address__city": "Berlin",
    "customer__address__zip": "10115",
    "tags": ["vip"],
    "meta": {}
  }
}

Configuration

FieldTypeDefaultDescription
idstring"flatten"Processor identifier
separatorstring"__"Separator inserted between path segments
max_depthintunlimitedStop recursing at this depth; objects at the boundary are kept as opaque leaves
on_collisionstringlastWhat to do when two paths produce the same key. last, first, or error
empty_objectstringpreserveHow to handle {} values. preserve, drop, or null
listsstringpreserveHow to handle array values. preserve (keep as-is) or index (expand to field__0, field__1, …)
empty_liststringpreserveHow to handle [] values. preserve, drop, or null

max_depth in practice

max_depth counts nesting levels from the top of the payload object. Objects at the boundary are treated as opaque leaves rather than expanded further, still subject to empty_object policy.

# max_depth: 2

depth 0: customer               -> object, recurse
depth 1: customer__address      -> object, recurse
depth 2: customer__address__geo -> STOP, kept as leaf {"lat": 52.5, "lng": 13.4}

Without max_depth, a deeply nested or recursive payload could produce a large number of keys. Setting a limit is recommended for payloads with variable or unknown nesting depth.

Collision policy

A collision occurs when two input paths produce the same flattened key - typically when a payload already contains a pre-flattened key (e.g. "a__b": 1) alongside a nested object ("a": {"b": 2}).

  • last - the last path to write a key wins (default, never fails)
  • first - the first path to write a key wins, subsequent writes are ignored
  • error - the batch fails immediately, useful in strict pipelines where collisions indicate a schema problem

Working with outbox payloads

After the outbox processor runs, event.after holds the extracted business payload - there is no before. The flatten processor handles this naturally since it operates on whatever fields are present:

processors:
  - type: outbox
    topic: "${aggregate_type}.${event_type}"
  - type: flatten
    id: flat
    separator: "."
    empty_list: drop

Envelope interaction

The flatten processor runs on the raw Event struct before sink delivery. Envelope wrapping happens inside the sink, after all processors have run. This means the envelope always wraps already-flattened data, no special configuration needed.

Source → [flatten processor] → Sink (envelope → bytes)

All envelope formats work as expected:

// Native
{ "before": null, "after": { "customer__id": 1, "customer__address__city": "Berlin" }, "op": "c" }

// Debezium
{ "payload": { "before": null, "after": { "customer__id": 1, "customer__address__city": "Berlin" }, "op": "c" } }

// CloudEvents
{ "specversion": "1.0", ..., "data": { "before": null, "after": { "customer__id": 1, "customer__address__city": "Berlin" } } }

Outbox + raw_payload: true

When the outbox processor is configured with raw_payload: true, the sink delivers event.after directly, bypassing the envelope entirely. If the flatten processor runs after outbox, the raw payload delivered to the sink is the flattened object — which is the intended behavior for analytics sinks that can’t handle nested JSON.

processors:
  - type: outbox
    topic: "${aggregate_type}.${event_type}"
    raw_payload: true       # sink delivers event.after directly, no envelope
  - type: flatten
    id: flat
    separator: "__"
    empty_list: drop

The flatten processor runs second, so by the time the sink delivers the raw payload it is already flat.

Analytics sink example

When sending to column-oriented sinks (ClickHouse, BigQuery, S3 Parquet) that don’t handle nested JSON:

processors:
  - type: flatten
    id: flat
    separator: "__"
    lists: index          # expand arrays to indexed columns
    empty_object: drop    # remove sparse marker objects
    empty_list: drop      # remove empty arrays

Filter

Drops events that do not pass configured criteria. Each criterion is independent — omit any of them to skip that check entirely. An event must pass all configured checks to be forwarded.

processors:
  - type: filter
    id: only-active-orders
    ops: [create, update]
    tables:
      include: ["shop.orders"]
      exclude: ["*.tmp"]
    fields:
      - path: status
        op: eq
        value: "active"
      - path: total
        op: gte
        value: 100
    match: all
FieldTypeDefaultDescription
idstring"filter"Processor identifier
opslist[] (all)Operation types to keep: create, update, delete, read, truncate
tables.includelist[] (all)Table patterns to include. Uses AllowList glob syntax: db.table, shop.*, *.orders
tables.excludelist[] (none)Table patterns to exclude. Applied after include; takes priority
fieldslist[] (skip)Predicates evaluated against event.after
matchstringallHow to combine multiple field predicates: all (every predicate must pass) or any (at least one must pass)

Field operators

The path field is a dot-separated path into event.after, e.g. "status" or "order.total".

OpvalueDescription
eqscalarField equals value
nescalarField does not equal value
existsField is present and non-null
not_existsField is absent or null
gt / gtenumber or stringGreater than / greater than or equal
lt / ltenumber or stringLess than / less than or equal
inarrayField value is one of the items in the array
not_inarrayField value is not in the array
containsscalarString field contains the substring, or array field contains the element
changedField value differs between event.before and event.after. Creates and Deletes always pass (no pair to compare)
regexstringString field matches the regex pattern. Compiled once at startup; invalid patterns fail pipeline initialization

Notes

Numeric equalityeq and in compare integers and floats by value, so 42 and 42.0 are considered equal. This matters when events have passed through the JavaScript processor, which converts all numbers to f64.

not_in with a missing field — if the field is absent from the event, the event passes. Absence is not membership in any set.

regex on non-string fields — silently does not match. Use exists first if the field may be absent or non-string.

changed and the before image — the predicate reads event.before, which is only populated for update operations from sources configured with full row images. Verify your source has REPLICA IDENTITY FULL (PostgreSQL) or binlog_row_image = FULL (MySQL) before using changed in production.

Performance

The filter processor is pure Rust with no serialization overhead. Op and table checks are O(1) to O(patterns). Field predicate evaluation reads event.after directly. Regex patterns are compiled once at construction time. Put a filter early in the processor chain to reduce the batch size before heavier processors (JavaScript, flatten) run.

Examples

Drop deletes and snapshot reads:

- type: filter
  id: no-deletes
  ops: [create, update]

Alert stream - pass if status is failed OR retry count exhausted:

- type: filter
  id: alert-worthy
  match: any
  fields:
    - path: status
      op: eq
      value: "failed"
    - path: retry_count
      op: gte
      value: 3

Filter before enrichment to avoid paying JavaScript overhead on the full stream:

processors:
  - type: filter
    id: low-stock
    ops: [create, update]
    tables:
      include: ["inventory.products"]
    fields:
      - path: stock_qty
        op: lt
        value: 10
  - type: javascript
    id: enrich
    inline: |
      function processBatch(events) {
        return events.map(e => {
          e.after.alert_level = e.after.stock_qty === 0 ? "critical" : "warning";
          return e;
        });
      }

Only forward rows where a status column actually changed value (suppresses no-op updates):

- type: filter
  id: real-status-changes
  ops: [update]
  fields:
    - path: status
      op: changed

Processor Chain

Processors execute in order. Events flow through each processor sequentially:

Source → [Processor 1] → [Processor 2] → ... → Sinks

Each processor receives a Vec<Event> and returns a Vec<Event>. This means processors can:

  • Transform: Modify event fields in place
  • Filter: Return a subset of events (drop unwanted ones)
  • Fan-out: Return more events than received
  • Route: Set event.routing.topic to override sink defaults
  • Enrich: Add headers via event.routing.headers

Non-outbox events always pass through the outbox processor untouched, so it is safe to combine it with JavaScript processors in any order.

Adding Custom Processors

The processor interface is a simple trait:

#![allow(unused)]
fn main() {
#[async_trait]
pub trait Processor: Send + Sync {
    fn id(&self) -> &str;
    async fn process(&self, events: Vec<Event>) -> Result<Vec<Event>>;
}
}

To add a new processor:

  1. Implement the Processor trait in crates/processors
  2. Add a config variant to ProcessorCfg in deltaforge-config
  3. Register the build step in build_processors()