CDC Streaming
Overview
Change Data Capture (CDC) enables real-time streaming of database changes.
Architecture
graph TD
DB[Source Database] -->|CDC| Airbyte[Airbyte CDC]
Airbyte -->|Events| Kafka[Kafka]
Kafka -->|Consumer| Pipeline[Dataflow Pipeline]
Pipeline --> Dest[Destination]
Supported Sources
- PostgreSQL (via Debezium)
- MySQL (via Debezium)
- MongoDB (via MongoDB Connector)
Configuration
1. Enable CDC on Source
{
"id": "cdc-source",
"type": "airbyte",
"data": {
"connection_type": "postgresql",
"host": "db.example.com",
"port": 5432,
"database": "sales",
"cdc": {
"enabled": true,
"replication_slot": "dataflow_slot",
"publication": "dataflow_pub"
}
}
}
2. Configure Destination
{
"id": "streaming-dest",
"type": "kafka",
"data": {
"brokers": ["kafka:9092"],
"topic": "db_changes.orders",
"format": "json"
}
}
Event Types
| Operation | Description |
|---|---|
INSERT | New row created |
UPDATE | Row updated |
DELETE | Row deleted |
READ | Initial snapshot |
Event Schema
{
"before": null,
"after": {
"id": 123,
"name": "Updated Name",
"amount": 100.00
},
"op": "U",
"ts_ms": 1705312800000
}
Pipeline Processing
Handle Different Operations
def process_cdc_event(df):
# Separate by operation
inserts = df[df['op'] == 'I']
updates = df[df['op'] == 'U']
deletes = df[df['op'] == 'D']
# Process accordingly
return processed_df
Merge to Target
{
"id": "merge-node",
"type": "database",
"data": {
"mode": "upsert",
"unique_key": "id",
"delete_marker": "op == 'D'"
}
}
Performance
| Metric | Value |
|---|---|
| Latency | < 1 second |
| Throughput | 10K events/sec |
| Parallelism | Configurable |
Troubleshooting
Replication lagging
- Check Debezium status
- Verify Kafka consumer lag
- Increase worker parallelism
Missing events
- Check source database WAL
- Verify replication slot exists
- Review error logs