Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

MySQL to Redis

This example streams MySQL binlog events into a Redis stream with an inline JavaScript transformation for PII redaction.

Overview

ComponentConfiguration
SourceMySQL binlog CDC
ProcessorJavaScript email redaction
SinkRedis Streams
EnvelopeNative (configurable)

Pipeline Configuration

metadata:
  name: orders-mysql-to-redis
  tenant: acme

spec:
  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders

  processors:
    - type: javascript
      id: redact-email
      inline: |
        function processBatch(events) {
          return events.map((event) => {
            if (event.after && event.after.email) {
              event.after.email = "[redacted]";
            }
            return event;
          });
        }
      limits:
        timeout_ms: 500

  sinks:
    - type: redis
      config:
        id: orders-redis
        uri: ${REDIS_URI}
        stream: orders
        envelope:
          type: native
        encoding: json
        required: true

  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 1000

  commit_policy:
    mode: required

Running the Example

1. Set Environment Variables

export MYSQL_DSN="mysql://user:password@localhost:3306/shop"
export REDIS_URI="redis://localhost:6379"

2. Start DeltaForge

# Save config as mysql-redis.yaml
cargo run -p runner -- --config mysql-redis.yaml

3. Insert Test Data

INSERT INTO shop.orders (email, total, status)
VALUES ('alice@example.com', 99.99, 'pending');

4. Verify in Redis

./dev.sh redis-read orders 10

You should see the event with the email redacted:

{
  "before": null,
  "after": {
    "id": 1,
    "email": "[redacted]",
    "total": 99.99,
    "status": "pending"
  },
  "op": "c",
  "ts_ms": 1700000000000
}

Variations

With Debezium Envelope

For Kafka Connect compatibility downstream:

sinks:
  - type: redis
    config:
      id: orders-redis
      uri: ${REDIS_URI}
      stream: orders
      envelope:
        type: debezium

Multi-Sink Fan-Out

Add Kafka alongside Redis for durability:

sinks:
  - type: kafka
    config:
      id: orders-kafka
      brokers: ${KAFKA_BROKERS}
      topic: orders
      envelope:
        type: debezium
      required: true    # Critical path

  - type: redis
    config:
      id: orders-redis
      uri: ${REDIS_URI}
      stream: orders
      envelope:
        type: native
      required: false   # Best-effort

With this configuration, checkpoints only wait for Kafka. Redis failures won’t block the pipeline.

With Schema Sensing

Automatically track schema changes:

spec:
  # ... source and sinks config ...

  schema_sensing:
    enabled: true
    deep_inspect:
      enabled: true
      max_depth: 3
    sampling:
      warmup_events: 100
      sample_rate: 10

Key Concepts Demonstrated

  • JavaScript Processors: Transform events in-flight with custom logic
  • PII Redaction: Mask sensitive data before it reaches downstream systems
  • Envelope Configuration: Choose output format based on consumer needs
  • Commit Policy: Control checkpoint behavior with required flag