Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Schema Sensing

Schema sensing automatically infers and tracks schema structure from JSON event payloads. This complements the schema registry by discovering schema from data rather than database metadata.

When to Use Schema Sensing

Schema sensing is useful when:

  • Source doesn’t provide schema: Some sources emit JSON without metadata
  • JSON columns: Database JSON/JSONB columns have dynamic structure
  • Schema evolution tracking: Detect when payload structure changes over time
  • Downstream integration: Generate JSON Schema for consumers
  • Dynamic map keys: Session IDs, trace IDs, or other high-cardinality keys in JSON

How It Works

┌──────────────┐     ┌─────────────────┐     ┌──────────────────┐
│    Event     │────▶│  Schema Sensor  │────▶│  Inferred Schema │
│   Payload    │     │   (sampling)    │     │   + Fingerprint  │
└──────────────┘     └─────────────────┘     └──────────────────┘
                              │
                              ▼
                     ┌─────────────────┐
                     │ Structure Cache │
                     │ + HC Classifier │
                     └─────────────────┘
  1. Observation: Events flow through the sensor during batch processing
  2. Sampling: Not every event is fully analyzed (configurable rate)
  3. Deep inspection: Nested JSON structures are recursively analyzed
  4. High-cardinality detection: Dynamic map keys are classified and normalized
  5. Fingerprinting: Schema changes are detected via SHA-256 fingerprints
  6. Caching: Repeated structures skip full analysis for performance

High-Cardinality Key Handling

JSON payloads often contain dynamic keys like session IDs, trace IDs, or user-generated identifiers:

{
  "id": 1,
  "sessions": {
    "sess_abc123": {"user_id": 42, "started_at": 1700000000},
    "sess_xyz789": {"user_id": 43, "started_at": 1700000001}
  }
}

Without special handling, each unique key (sess_abc123, sess_xyz789) triggers a “schema evolution” event, causing:

  • 0% cache hit rate
  • Constant false evolution alerts
  • Unbounded schema growth

How It Works

DeltaForge uses probabilistic data structures (HyperLogLog, SpaceSaving) to classify fields:

ClassificationDescriptionExample
Stable fieldsAppear in most eventsid, type, timestamp
Dynamic fieldsUnique per event, high cardinalitysess_*, trace_*, uuid_*

When dynamic fields are detected, the schema sensor:

  1. Normalizes keys: Replaces sess_abc123 with <dynamic> placeholder
  2. Uses adaptive hashing: Structure cache ignores dynamic key names
  3. Produces stable fingerprints: Same schema despite different keys

Results

ScenarioWithout HCWith HC
Nested dynamic keys100% evolution rate<1% evolution rate
Top-level dynamic keys0% cache hits>99% cache hits
Stable structsBaseline~20% overhead during warmup, then ~0%

Configuration

Example

spec:
  schema_sensing:
    enabled: true
    
    deep_inspect:
      enabled: true
      max_depth: 3
      max_sample_size: 500
    
    sampling:
      warmup_events: 50
      sample_rate: 5
      structure_cache: true
      structure_cache_size: 50
    
    high_cardinality:
      enabled: true
      min_events: 100
      stable_threshold: 0.5
      min_dynamic_fields: 5
      confidence_threshold: 0.7
      reevaluate_interval: 10000

Options

FieldTypeDefaultDescription
enabledboolfalseEnable schema sensing
deep_inspect
enabledboolfalseInspect nested JSON
max_depthint3Max nesting depth
max_sample_sizeint500Max events for deep analysis
sampling
warmup_eventsint50Full analysis before sampling
sample_rateint5After warmup, analyze 1 in N
structure_cachebooltrueCache structure hashes
structure_cache_sizeint50Max cached per table
high_cardinality
enabledbooltrueDetect dynamic map keys
min_eventsint100Events before classification
stable_thresholdfloat0.5Frequency for stable fields
min_dynamic_fieldsint5Min unique for map detection
confidence_thresholdfloat0.7Required confidence
reevaluate_intervalint10000Re-check interval (0=never)

Inferred Types

Schema sensing infers these JSON types:

TypeDescription
nullJSON null value
booleantrue/false
integerWhole numbers
numberFloating point numbers
stringText values
arrayJSON arrays (element types tracked)
objectNested objects (fields recursively analyzed)

For fields with varying types across events, all observed types are recorded.

Schema Evolution

When schema structure changes, the sensor:

  1. Detects change: Fingerprint differs from previous version
  2. Increments sequence: Monotonic version number increases
  3. Logs evolution: Emits structured log with old/new fingerprints
  4. Updates cache: New structure becomes current

Evolution events are available via the REST API and can trigger alerts.

Stabilization

After observing enough events, a schema “stabilizes”:

  • Warmup phase completes
  • Structure stops changing
  • Sampling rate takes effect
  • Cache hit rate increases

Stabilized schemas have stabilized: true in API responses.

API Access

List Inferred Schemas

curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas

Get Schema Details

curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders

Export as JSON Schema

curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders/json-schema

Cache Statistics

curl http://localhost:8080/pipelines/my-pipeline/sensing/stats

Dynamic Map Classifications

curl http://localhost:8080/pipelines/my-pipeline/sensing/schemas/orders/classifications

Response:

{
  "table": "orders",
  "paths": {
    "": {"stable_fields": ["id", "type", "timestamp"], "has_dynamic_fields": false},
    "sessions": {"stable_fields": [], "has_dynamic_fields": true, "unique_keys": 1523},
    "metadata": {"stable_fields": ["version"], "has_dynamic_fields": true, "unique_keys": 42}
  }
}

Drift Detection

Schema sensing integrates with drift detection to compare:

  • Expected schema: From database metadata (schema registry)
  • Observed schema: From event payloads (schema sensing)

When mismatches occur, drift events are recorded:

Drift TypeDescription
unexpected_nullNon-nullable column has null values
type_mismatchObserved type differs from declared type
undeclared_columnField in data not in schema
missing_columnSchema field never seen in data
json_structure_changeJSON column structure changed

Access drift data via:

curl http://localhost:8080/pipelines/my-pipeline/drift

Performance Considerations

Sampling Tradeoffs

SettingEffect
Higher warmup_eventsBetter initial accuracy, slower stabilization
Higher sample_rateLower CPU usage, slower evolution detection
Larger structure_cache_sizeMore memory, better hit rate

High-throughput pipelines (>10k events/sec):

sampling:
  warmup_events: 100
  sample_rate: 10
  structure_cache: true
  structure_cache_size: 100
high_cardinality:
  enabled: true
  min_events: 200

Schema evolution monitoring:

sampling:
  warmup_events: 25
  sample_rate: 2
  structure_cache: true
high_cardinality:
  enabled: true
  min_events: 50

Payloads with dynamic keys (session stores, feature flags):

sampling:
  structure_cache: true
  structure_cache_size: 50
high_cardinality:
  enabled: true
  min_events: 100
  min_dynamic_fields: 3
  stable_threshold: 0.5

Development/debugging:

sampling:
  warmup_events: 10
  sample_rate: 1  # Analyze every event
high_cardinality:
  enabled: true
  min_events: 20  # Faster classification

Example: JSON Column Sensing

For tables with JSON columns, sensing reveals the internal structure:

# Database schema shows: metadata JSON
# Sensing reveals:
fields:
  - name: "metadata.user_agent"
    types: ["string"]
    nullable: false
  - name: "metadata.ip_address"
    types: ["string"]
    nullable: true
  - name: "metadata.tags"
    types: ["array"]
    array_element_types: ["string"]

This enables downstream consumers to understand JSON column structure without manual documentation.

Metrics

Schema sensing emits these Prometheus metrics:

MetricTypeLabelsDescription
deltaforge_schema_events_totalCountertableTotal events observed
deltaforge_schema_cache_hits_totalCountertableStructure cache hits
deltaforge_schema_cache_misses_totalCountertableStructure cache misses
deltaforge_schema_evolutions_totalCountertableSchema evolutions detected
deltaforge_schema_tables_totalGauge-Tables with detected schemas
deltaforge_schema_dynamic_maps_totalGauge-Paths classified as dynamic maps
deltaforge_schema_sensing_secondsHistogramtablePer-event sensing latency

Example Queries

# Cache hit rate per table
sum(rate(deltaforge_schema_cache_hits_total[5m])) by (table)
/
sum(rate(deltaforge_schema_events_total[5m])) by (table)

# Schema evolution rate (should be near zero after warmup)
sum(rate(deltaforge_schema_evolutions_total[5m])) by (table)

# P99 sensing latency
histogram_quantile(0.99, rate(deltaforge_schema_sensing_seconds_bucket[5m]))