Sources
DeltaForge captures database changes through pluggable source connectors. Each source is configured under spec.source with a type field and a config object. Environment variables are expanded before parsing using ${VAR} syntax.
Supported Sources
| Source | Status | Description |
|---|---|---|
mysql | ✅ Production | MySQL binlog CDC with GTID support |
postgres | ✅ Production | PostgreSQL logical replication via pgoutput |
turso | 🔧 Beta | Turso/libSQL CDC with multiple modes |
Common Behavior
All sources share these characteristics:
- Checkpointing: Progress is automatically saved and resumed on restart
- Schema tracking: Table schemas are loaded and fingerprinted for change detection
- At-least-once delivery: Events may be redelivered after failures; sinks should be idempotent
- Batching: Events are batched according to the pipeline’s
batchconfiguration - Transaction boundaries:
respect_source_tx: true(default) keeps source transactions intact
Source Interface
Sources implement a common trait that provides:
#![allow(unused)]
fn main() {
trait Source {
fn checkpoint_key(&self) -> &str;
async fn run(&self, tx: Sender<Event>, checkpoint_store: Arc<dyn CheckpointStore>) -> SourceHandle;
}
}
The returned SourceHandle supports pause/resume and graceful cancellation.
Adding Custom Sources
The source interface is pluggable. To add a new source:
- Implement the
Sourcetrait - Add configuration parsing in
deltaforge-config - Register the source type in the pipeline builder
See existing sources for implementation patterns.