Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

REST API Reference

DeltaForge exposes a REST API for health checks, pipeline management, schema inspection, and drift detection. All endpoints return JSON.

Base URL

Default: http://localhost:8080

Configure with --api-addr:

deltaforge --config pipelines.yaml --api-addr 0.0.0.0:9090

Health Endpoints

Liveness Probe

GET /health

Returns ok when the process is running and all pipelines are healthy. Returns 503 if any pipeline has entered a failed state (e.g. position lost after failover, binlog purged, unrecoverable source error). Use for Kubernetes liveness probes — a 503 indicates the process should be restarted.

Response: 200 OK — all pipelines healthy

{"status": "healthy", "pipelines": 3}

Response: 503 Service Unavailable — one or more pipelines failed

{"status": "unhealthy", "failed_pipelines": ["orders-cdc"]}

Readiness Probe

GET /ready

Returns pipeline states. Use for Kubernetes readiness probes.

Response: 200 OK

{
  "status": "ready",
  "pipelines": [
    {
      "name": "orders-cdc",
      "status": "running",
      "spec": { ... }
    }
  ]
}

Pipeline Management

List Pipelines

GET /pipelines
GET /pipelines?label=env:prod
GET /pipelines?label=env:prod&label=team:platform

Returns all pipelines with current status. Filter by labels with AND logic. Key-only filter (?label=env) matches any value.

Response: 200 OK

[
  {
    "name": "orders-cdc",
    "status": "running",
    "spec": {
      "metadata": { "name": "orders-cdc", "tenant": "acme" },
      "spec": { ... }
    }
  }
]

Get Pipeline

GET /pipelines/{name}

Returns a single pipeline by name with operational status.

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "running",
  "spec": { ... },
  "ops": {
    "uptime_seconds": 3600.5,
    "dlq_entries": 0,
    "sink_errors": {},
    "checkpoints": [
      {"sink_id": "kafka-primary", "position": {"file": "mysql-bin.000005", "pos": 12345}, "age_seconds": 0.3}
    ]
  }
}

Errors:

  • 404 Not Found - Pipeline doesn’t exist

Create Pipeline

POST /pipelines
Content-Type: application/json

Creates a new pipeline from a full spec.

Request:

{
  "metadata": {
    "name": "orders-cdc",
    "tenant": "acme"
  },
  "spec": {
    "source": {
      "type": "mysql",
      "config": {
        "id": "mysql-1",
        "dsn": "mysql://user:pass@host/db",
        "tables": ["shop.orders"]
      }
    },
    "processors": [],
    "sinks": [
      {
        "type": "kafka",
        "config": {
          "id": "kafka-1",
          "brokers": "localhost:9092",
          "topic": "orders"
        }
      }
    ]
  }
}

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "running",
  "spec": { ... }
}

Errors:

  • 409 Conflict - Pipeline already exists

Update Pipeline

PATCH /pipelines/{name}
Content-Type: application/json

Applies a partial update to an existing pipeline. The spec is merged, the pipeline is restarted from its last saved checkpoint, and the new config takes effect immediately. Only the fields present in the request body are changed — omitted fields retain their current values.

If the pipeline is currently stopped, PATCH applies the new config and restarts it from the saved checkpoint. This is the recommended way to tune throughput settings before resuming after a planned stop.

Request:

{
  "spec": {
    "batch": {
      "max_events": 1000,
      "max_ms": 500
    }
  }
}

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "running",
  "spec": { ... }
}

Errors:

  • 404 Not Found - Pipeline doesn’t exist
  • 400 Bad Request - Invalid field value or name mismatch in patch

Pause Pipeline

POST /pipelines/{name}/pause

Suspends event processing while keeping the source connection alive. No new events are consumed from the binlog/WAL. Resume restarts processing from exactly where it paused — no events are missed.

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "paused",
  "spec": { ... }
}

Resume Pipeline

POST /pipelines/{name}/resume

Resumes a paused or stopped pipeline.

  • From paused — restarts event processing immediately; source connection was kept alive.
  • From stopped — reconnects to the source and replays from the last saved checkpoint; any events written to the binlog/WAL while stopped are replayed in order.

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "running",
  "spec": { ... }
}

Stop Pipeline

POST /pipelines/{name}/stop

Gracefully stops a pipeline: flushes in-flight events, saves the binlog/WAL checkpoint, and disconnects from the source. The pipeline remains in the registry and can be resumed with POST /pipelines/{name}/resume or by issuing a PATCH with updated config.

Use stop (rather than delete) when you intend to restart the pipeline later — for example, before a planned maintenance window or when tuning config for a backlog drain.

Response: 200 OK

{
  "name": "orders-cdc",
  "status": "stopped",
  "spec": { ... }
}

Delete Pipeline

DELETE /pipelines/{name}

Permanently removes a pipeline from the runtime. The checkpoint is not preserved. Use stop first if you may want to restart the pipeline later.

Response: 204 No Content

Errors:

  • 404 Not Found - Pipeline doesn’t exist

Schema Management

List Database Schemas

GET /pipelines/{name}/schemas

Returns all tracked database schemas for a pipeline. These are the schemas loaded directly from the source database.

Response: 200 OK

[
  {
    "database": "shop",
    "table": "orders",
    "column_count": 5,
    "primary_key": ["id"],
    "fingerprint": "sha256:a1b2c3d4e5f6...",
    "registry_version": 2
  },
  {
    "database": "shop",
    "table": "customers",
    "column_count": 8,
    "primary_key": ["id"],
    "fingerprint": "sha256:f6e5d4c3b2a1...",
    "registry_version": 1
  }
]

Get Schema Details

GET /pipelines/{name}/schemas/{db}/{table}

Returns detailed schema information including all columns.

Response: 200 OK

{
  "database": "shop",
  "table": "orders",
  "columns": [
    {
      "name": "id",
      "type": "bigint(20) unsigned",
      "nullable": false,
      "default": null,
      "extra": "auto_increment"
    },
    {
      "name": "customer_id",
      "type": "bigint(20)",
      "nullable": false,
      "default": null
    }
  ],
  "primary_key": ["id"],
  "fingerprint": "sha256:a1b2c3d4..."
}

Schema Sensing

Schema sensing automatically infers schema structure from JSON event payloads. This is useful for sources that don’t provide schema metadata or for detecting schema evolution in JSON columns.

List Inferred Schemas

GET /pipelines/{name}/sensing/schemas

Returns all schemas inferred via sensing for a pipeline.

Response: 200 OK

[
  {
    "table": "orders",
    "fingerprint": "sha256:abc123...",
    "sequence": 3,
    "event_count": 1500,
    "stabilized": true,
    "first_seen": "2025-01-15T10:30:00Z",
    "last_seen": "2025-01-15T14:22:00Z"
  }
]
FieldDescription
tableTable name (or table:column for JSON column sensing)
fingerprintSHA-256 content hash of current schema
sequenceMonotonic version number (increments on evolution)
event_countTotal events observed
stabilizedWhether schema has stopped sampling (structure stable)
first_seenFirst observation timestamp
last_seenMost recent observation timestamp

Get Inferred Schema Details

GET /pipelines/{name}/sensing/schemas/{table}

Returns detailed inferred schema including all fields.

Response: 200 OK

{
  "table": "orders",
  "fingerprint": "sha256:abc123...",
  "sequence": 3,
  "event_count": 1500,
  "stabilized": true,
  "fields": [
    {
      "name": "id",
      "types": ["integer"],
      "nullable": false,
      "optional": false
    },
    {
      "name": "metadata",
      "types": ["object"],
      "nullable": true,
      "optional": false,
      "nested_field_count": 5
    },
    {
      "name": "tags",
      "types": ["array"],
      "nullable": false,
      "optional": true,
      "array_element_types": ["string"]
    }
  ],
  "first_seen": "2025-01-15T10:30:00Z",
  "last_seen": "2025-01-15T14:22:00Z"
}

Export JSON Schema

GET /pipelines/{name}/sensing/schemas/{table}/json-schema

Exports the inferred schema as a standard JSON Schema document.

Response: 200 OK

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "orders",
  "type": "object",
  "properties": {
    "id": { "type": "integer" },
    "metadata": { "type": ["object", "null"] },
    "tags": {
      "type": "array",
      "items": { "type": "string" }
    }
  },
  "required": ["id", "metadata"]
}

Get Sensing Cache Statistics

GET /pipelines/{name}/sensing/stats

Returns cache performance statistics for schema sensing.

Response: 200 OK

{
  "tables": [
    {
      "table": "orders",
      "cached_structures": 3,
      "max_cache_size": 100,
      "cache_hits": 1450,
      "cache_misses": 50
    }
  ],
  "total_cache_hits": 1450,
  "total_cache_misses": 50,
  "hit_rate": 0.9667
}

Drift Detection

Drift detection compares expected database schema against observed data patterns to detect mismatches, unexpected nulls, and type drift.

Get Drift Results

GET /pipelines/{name}/drift

Returns drift detection results for all tables in a pipeline.

Response: 200 OK

[
  {
    "table": "orders",
    "has_drift": true,
    "columns": [
      {
        "column": "amount",
        "expected_type": "decimal(10,2)",
        "observed_types": ["string"],
        "mismatch_count": 42,
        "examples": ["\"99.99\""]
      }
    ],
    "events_analyzed": 1500,
    "events_with_drift": 42
  }
]

Get Table Drift

GET /pipelines/{name}/drift/{table}

Returns drift detection results for a specific table.

Response: 200 OK

{
  "table": "orders",
  "has_drift": false,
  "columns": [],
  "events_analyzed": 1000,
  "events_with_drift": 0
}

Errors:

  • 404 Not Found - Table not found or no drift data available

Dead Letter Queue

See the DLQ page for full documentation.

Peek DLQ Entries

GET /pipelines/{name}/journal/dlq?limit=50&sink_id=kafka-primary&error_kind=serialization

Returns DLQ entries (oldest first). All query params are optional.

DLQ Count

GET /pipelines/{name}/journal/dlq/count

Response: 200 OK

{"count": 42}

Acknowledge DLQ Entries

POST /pipelines/{name}/journal/dlq/ack
Content-Type: application/json

{"up_to_seq": 42}

Permanently removes entries from the head up to the given sequence number.

Response: 200 OK

{"acked": 12}

Purge DLQ

DELETE /pipelines/{name}/journal/dlq

Response: 200 OK

{"purged": 42}

Checkpoint Inspection

Get Checkpoints

GET /pipelines/{name}/checkpoints

Returns per-sink checkpoint positions and ages.

Response: 200 OK

[
  {"sink_id": "kafka-primary", "position": {"file": "mysql-bin.000005", "pos": 12345}, "age_seconds": 0.3},
  {"sink_id": "redis-cache", "position": {"file": "mysql-bin.000005", "pos": 11000}, "age_seconds": 2.1}
]

System Endpoints

Log Level

GET /log-level

Returns the current RUST_LOG value.

Response: 200 OK

{"level": "deltaforge=info,sources=info,sinks=info,warn"}

Validate Config

POST /validate
Content-Type: application/json

Dry-run validation of a pipeline config without creating it.

Response: 200 OK — config is valid

{"valid": true, "pipeline": "orders-cdc", "source_type": "mysql", "sink_count": 2}

Response: 400 Bad Request — config has errors

{"valid": false, "error": "spec: missing field `processors` at line 7 column 3"}

Error Responses

All error responses return structured JSON:

{
  "code": "PIPELINE_NOT_FOUND",
  "message": "pipeline orders-cdc not found"
}
Status CodeCodeMeaning
400 Bad RequestPIPELINE_NAME_MISMATCHInvalid request body or name mismatch
404 Not FoundPIPELINE_NOT_FOUNDResource doesn’t exist
409 ConflictPIPELINE_ALREADY_EXISTSResource already exists
500 Internal Server ErrorINTERNAL_ERRORUnexpected server error