Skip to main content

CDC Streaming

Overview

Change Data Capture (CDC) enables real-time streaming of database changes.

Architecture

graph TD
DB[Source Database] -->|CDC| Airbyte[Airbyte CDC]
Airbyte -->|Events| Kafka[Kafka]
Kafka -->|Consumer| Pipeline[Dataflow Pipeline]
Pipeline --> Dest[Destination]

Supported Sources

  • PostgreSQL (via Debezium)
  • MySQL (via Debezium)
  • MongoDB (via MongoDB Connector)

Configuration

1. Enable CDC on Source

{
"id": "cdc-source",
"type": "airbyte",
"data": {
"connection_type": "postgresql",
"host": "db.example.com",
"port": 5432,
"database": "sales",
"cdc": {
"enabled": true,
"replication_slot": "dataflow_slot",
"publication": "dataflow_pub"
}
}
}

2. Configure Destination

{
"id": "streaming-dest",
"type": "kafka",
"data": {
"brokers": ["kafka:9092"],
"topic": "db_changes.orders",
"format": "json"
}
}

Event Types

OperationDescription
INSERTNew row created
UPDATERow updated
DELETERow deleted
READInitial snapshot

Event Schema

{
"before": null,
"after": {
"id": 123,
"name": "Updated Name",
"amount": 100.00
},
"op": "U",
"ts_ms": 1705312800000
}

Pipeline Processing

Handle Different Operations

def process_cdc_event(df):
# Separate by operation
inserts = df[df['op'] == 'I']
updates = df[df['op'] == 'U']
deletes = df[df['op'] == 'D']

# Process accordingly
return processed_df

Merge to Target

{
"id": "merge-node",
"type": "database",
"data": {
"mode": "upsert",
"unique_key": "id",
"delete_marker": "op == 'D'"
}
}

Performance

MetricValue
Latency< 1 second
Throughput10K events/sec
ParallelismConfigurable

Troubleshooting

Replication lagging

  1. Check Debezium status
  2. Verify Kafka consumer lag
  3. Increase worker parallelism

Missing events

  1. Check source database WAL
  2. Verify replication slot exists
  3. Review error logs