Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Schema Sensing

Schema sensing automatically infers and tracks schema structure from JSON event payloads. This complements the schema registry by discovering schema from data rather than database metadata.

When to Use Schema Sensing

Schema sensing is useful when:

  • Source doesn’t provide schema: Some sources emit JSON without metadata
  • JSON columns: Database JSON/JSONB columns have dynamic structure
  • Schema evolution tracking: Detect when payload structure changes over time
  • Downstream integration: Generate JSON Schema for consumers

How It Works

┌──────────────┐     ┌─────────────────┐     ┌──────────────────┐
│    Event     │────▶│  Schema Sensor  │────▶│  Inferred Schema │
│   Payload    │     │   (sampling)    │     │   + Fingerprint  │
└──────────────┘     └─────────────────┘     └──────────────────┘
                              │
                              ▼
                     ┌─────────────────┐
                     │ Structure Cache │
                     └─────────────────┘
  1. Observation: Events flow through the sensor during batch processing
  2. Sampling: Not every event is fully analyzed (configurable rate)
  3. Deep inspection: Nested JSON structures are recursively analyzed
  4. Fingerprinting: Schema changes are detected via SHA-256 fingerprints
  5. Caching: Repeated structures skip full analysis for performance

Configuration

Enable schema sensing in your pipeline spec:

spec:
  schema_sensing:
    enabled: true
    
    # Deep inspection for nested JSON
    deep_inspect:
      enabled: true
      max_depth: 3
      max_sample_size: 500
    
    # Sampling configuration
    sampling:
      warmup_events: 50
      sample_rate: 5
      structure_cache: true
      structure_cache_size: 50
    
    # Output configuration
    output:
      include_stats: true

Configuration Options

Top Level

FieldTypeDefaultDescription
enabledboolfalseEnable schema sensing

Deep Inspection (deep_inspect)

FieldTypeDefaultDescription
enabledboolfalseEnable deep inspection of nested objects
max_depthinteger3Maximum nesting depth to analyze
max_sample_sizeinteger500Max events to sample for deep analysis

Sampling (sampling)

FieldTypeDefaultDescription
warmup_eventsinteger50Events to analyze before sampling kicks in
sample_rateinteger5Analyze 1 in N events after warmup
structure_cachebooltrueCache structure hashes for performance
structure_cache_sizeinteger50Max cached structures per table

Inferred Types

Schema sensing infers these JSON types:

TypeDescription
nullJSON null value
booleantrue/false
integerWhole numbers
numberFloating point numbers
stringText values
arrayJSON arrays (element types tracked)
objectNested objects (fields recursively analyzed)

For fields with varying types across events, all observed types are recorded.

Schema Evolution

When schema structure changes, the sensor:

  1. Detects change: Fingerprint differs from previous version
  2. Increments sequence: Monotonic version number increases
  3. Logs evolution: Emits structured log with old/new fingerprints
  4. Updates cache: New structure becomes current

Evolution events are available via the REST API and can trigger alerts.

Stabilization

After observing enough events, a schema “stabilizes”:

  • Warmup phase completes
  • Structure stops changing
  • Sampling rate takes effect
  • Cache hit rate increases

Stabilized schemas have stabilized: true in API responses.

API Access

List Inferred Schemas

curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas

Get Schema Details

curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders

Export as JSON Schema

curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders/json-schema

Cache Statistics

curl http://localhost:8080/pipelines/my-pipeline/sensing/stats

Drift Detection

Schema sensing integrates with drift detection to compare:

  • Expected schema: From database metadata (schema registry)
  • Observed schema: From event payloads (schema sensing)

When mismatches occur, drift events are recorded:

Drift TypeDescription
unexpected_nullNon-nullable column has null values
type_mismatchObserved type differs from declared type
undeclared_columnField in data not in schema
missing_columnSchema field never seen in data
json_structure_changeJSON column structure changed

Access drift data via:

curl http://localhost:8080/pipelines/my-pipeline/drift

Performance Considerations

Sampling Tradeoffs

SettingEffect
Higher warmup_eventsBetter initial accuracy, slower stabilization
Higher sample_rateLower CPU usage, slower evolution detection
Larger structure_cache_sizeMore memory, better hit rate

High-throughput pipelines (>10k events/sec):

sampling:
  warmup_events: 100
  sample_rate: 10
  structure_cache: true
  structure_cache_size: 100

Schema evolution monitoring:

sampling:
  warmup_events: 25
  sample_rate: 2
  structure_cache: true

Development/debugging:

sampling:
  warmup_events: 10
  sample_rate: 1  # Analyze every event

Example: JSON Column Sensing

For tables with JSON columns, sensing reveals the internal structure:

# Database schema shows: metadata JSON
# Sensing reveals:
fields:
  - name: "metadata.user_agent"
    types: ["string"]
    nullable: false
  - name: "metadata.ip_address"
    types: ["string"]
    nullable: true
  - name: "metadata.tags"
    types: ["array"]
    array_element_types: ["string"]

This enables downstream consumers to understand JSON column structure without manual documentation.

Metrics

Schema sensing emits these metrics:

MetricTypeDescription
deltaforge_schema_evolutions_total{pipeline}CounterSchema evolution events detected
deltaforge_schema_drift_detected{pipeline}CounterIncremented when drift is detected in a batch
deltaforge_stage_latency_seconds{pipeline,stage="schema_sensing"}HistogramTime spent in schema sensing per batch

Cache statistics (hits, misses, hit rate) are available via the REST API at /pipelines/{name}/sensing/stats but are not currently exposed as Prometheus metrics.