Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Schema Sensing and Drift Detection

This example demonstrates DeltaForge’s automatic schema inference for JSON columns and drift detection capabilities.

Overview

ComponentConfiguration
SourcePostgreSQL logical replication
ProcessorNone
SinkKafka
FeatureSchema sensing with deep JSON inspection

Use Case

You have a PostgreSQL database with JSON/JSONB columns and want to:

  • Automatically discover the structure of JSON payloads
  • Detect when JSON schemas change over time (drift)
  • Export inferred schemas as JSON Schema for downstream validation
  • Monitor schema evolution without manual tracking

Pipeline Configuration

apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: products-with-sensing
  tenant: acme

spec:
  source:
    type: postgres
    config:
      id: products-postgres
      dsn: ${POSTGRES_DSN}
      slot: deltaforge_products
      publication: products_pub
      tables:
        - public.products
        - public.product_variants
      start_position: earliest

  sinks:
    - type: kafka
      config:
        id: products-kafka
        brokers: ${KAFKA_BROKERS}
        topic: products.changes
        envelope:
          type: debezium
        encoding: json
        required: true

  batch:
    max_events: 500
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: required

  # Schema sensing configuration
  schema_sensing:
    enabled: true
    
    deep_inspect:
      enabled: true
      max_depth: 5           # How deep to traverse nested JSON
      max_sample_size: 500   # Events to analyze for deep inspection
    
    sampling:
      warmup_events: 100     # Analyze every event during warmup
      sample_rate: 20        # After warmup, analyze 1 in 20 events
      structure_cache: true  # Cache seen structures to avoid re-analysis
      structure_cache_size: 100
    
    tracking:
      detect_drift: true     # Enable drift detection
      drift_threshold: 0.1   # Alert if >10% of events have new fields
    
    output:
      emit_schemas: true     # Include schema info in API responses
      json_schema_format: draft-07

Table Structure

CREATE TABLE products (
  id SERIAL PRIMARY KEY,
  name TEXT NOT NULL,
  sku TEXT UNIQUE,
  price DECIMAL(10,2),
  
  -- JSON columns that schema sensing will analyze
  metadata JSONB,          -- Product attributes, tags, etc.
  specifications JSONB,    -- Technical specs
  
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE product_variants (
  id SERIAL PRIMARY KEY,
  product_id INTEGER REFERENCES products(id),
  variant_name TEXT,
  
  -- Nested JSON with variable structure
  attributes JSONB,        -- Color, size, material, etc.
  pricing JSONB,           -- Regional pricing, discounts
  
  created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Create publication
CREATE PUBLICATION products_pub FOR TABLE products, product_variants;

Sample Data

-- Insert products with JSON metadata
INSERT INTO products (name, sku, price, metadata, specifications) VALUES
(
  'Wireless Headphones',
  'WH-1000',
  299.99,
  '{
    "brand": "AudioTech",
    "category": "Electronics",
    "tags": ["wireless", "bluetooth", "noise-canceling"],
    "ratings": {"average": 4.5, "count": 1250}
  }',
  '{
    "battery_life_hours": 30,
    "driver_size_mm": 40,
    "frequency_response": {"min_hz": 20, "max_hz": 20000},
    "connectivity": ["bluetooth", "aux"]
  }'
);

-- Insert variant with nested attributes
INSERT INTO product_variants (product_id, variant_name, attributes, pricing) VALUES
(
  1,
  'Midnight Black',
  '{
    "color": {"name": "Midnight Black", "hex": "#1a1a2e"},
    "material": "Premium Plastic",
    "weight_grams": 250
  }',
  '{
    "base_price": 299.99,
    "regional": {
      "US": {"price": 299.99, "currency": "USD"},
      "EU": {"price": 279.99, "currency": "EUR"}
    },
    "discounts": [
      {"code": "SAVE20", "percent": 20, "expires": "2025-12-31"}
    ]
  }'
);

Running the Example

1. Set Environment Variables

export POSTGRES_DSN="postgres://user:password@localhost:5432/shop"
export KAFKA_BROKERS="localhost:9092"

2. Start DeltaForge

cargo run -p runner -- --config products-sensing.yaml

3. Insert Data and Let Sensing Analyze

-- Insert several products to build schema profile
INSERT INTO products (name, sku, price, metadata, specifications) VALUES
('Smart Watch', 'SW-200', 399.99, 
 '{"brand": "TechWear", "tags": ["fitness", "smart"]}',
 '{"battery_days": 7, "water_resistant": true}');

Using the Schema Sensing API

List Inferred Schemas

curl http://localhost:8080/pipelines/products-with-sensing/sensing/schemas

Response:

{
  "schemas": [
    {
      "table": "public.products",
      "columns": {
        "metadata": {
          "type": "object",
          "inferred_at": "2025-01-15T10:30:00Z",
          "sample_count": 150
        },
        "specifications": {
          "type": "object",
          "inferred_at": "2025-01-15T10:30:00Z",
          "sample_count": 150
        }
      }
    }
  ]
}

Get Detailed Schema for a Table

curl http://localhost:8080/pipelines/products-with-sensing/sensing/schemas/public.products

Response:

{
  "table": "public.products",
  "json_columns": {
    "metadata": {
      "inferred_schema": {
        "type": "object",
        "properties": {
          "brand": {"type": "string"},
          "category": {"type": "string"},
          "tags": {"type": "array", "items": {"type": "string"}},
          "ratings": {
            "type": "object",
            "properties": {
              "average": {"type": "number"},
              "count": {"type": "integer"}
            }
          }
        }
      },
      "sample_count": 150,
      "last_updated": "2025-01-15T10:35:00Z"
    }
  }
}

Export as JSON Schema

curl http://localhost:8080/pipelines/products-with-sensing/sensing/schemas/public.products/json-schema

Response:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "public.products.metadata",
  "type": "object",
  "properties": {
    "brand": {"type": "string"},
    "category": {"type": "string"},
    "tags": {
      "type": "array",
      "items": {"type": "string"}
    },
    "ratings": {
      "type": "object",
      "properties": {
        "average": {"type": "number"},
        "count": {"type": "integer"}
      }
    }
  }
}

Check Drift Detection

curl http://localhost:8080/pipelines/products-with-sensing/drift

Response:

{
  "drift_detected": true,
  "tables": {
    "public.products": {
      "metadata": {
        "new_fields": ["promotion", "seasonal"],
        "removed_fields": [],
        "type_changes": [],
        "drift_percentage": 0.15,
        "first_seen": "2025-01-15T11:00:00Z"
      }
    }
  }
}

Get Sensing Statistics

curl http://localhost:8080/pipelines/products-with-sensing/sensing/stats

Response:

{
  "total_events_analyzed": 1500,
  "total_events_sampled": 250,
  "cache_hits": 1250,
  "cache_misses": 250,
  "tables_tracked": 2,
  "json_columns_tracked": 4
}

Performance Tuning

Performance tip: Schema sensing can be CPU-intensive. Tune based on your throughput needs.

High-Throughput Configuration

schema_sensing:
  enabled: true
  deep_inspect:
    enabled: true
    max_depth: 3           # Limit depth for faster processing
    max_sample_size: 200   # Fewer samples
  sampling:
    warmup_events: 50      # Shorter warmup
    sample_rate: 100       # Analyze 1 in 100 events
    structure_cache: true
    structure_cache_size: 200  # Larger cache

Development/Debugging Configuration

schema_sensing:
  enabled: true
  deep_inspect:
    enabled: true
    max_depth: 10          # Full depth
    max_sample_size: 1000  # More samples
  sampling:
    warmup_events: 500     # Longer warmup
    sample_rate: 1         # Analyze every event

Key Concepts Demonstrated

  • Automatic Schema Inference: Discover JSON structure without manual definition
  • Deep JSON Inspection: Traverse nested objects and arrays
  • Drift Detection: Alert when schemas change unexpectedly
  • JSON Schema Export: Generate standard schemas for validation
  • Sampling Strategy: Balance accuracy vs. performance