DataFrame Adapter
Overview
The system supports both Pandas and Dask DataFrames for distributed data processing. The migration is 100% backward compatible - all existing pipelines work without modification.
Key Features
Automatic Type Selection
- Small datasets (< 10,000 rows): Uses Pandas for lower overhead
- Large datasets (≥ 10,000 rows): Automatically uses Dask for distributed processing
- Transparent: Users don't need to change anything
Distributed Processing
- Map Nodes: Already use Dask distributed for sub-pipeline execution
- Filter/Reduce/SQL Nodes: Optimized with DuckDB + automatic Dask support
- Scalable: Can handle datasets larger than memory
Backward Compatibility
- All existing pipeline configurations work unchanged
- User Python code still receives
pd.DataFrame - API responses unchanged
- No breaking changes
Usage
from dataframe_adapter import DataFrameAdapter, DataFrame
# Create DataFrame (auto-selects Pandas or Dask based on size)
df = DataFrameAdapter.create(data)
# Check type
is_dask = DataFrameAdapter.is_dask(df)
# Convert between types
pandas_df = DataFrameAdapter.to_pandas(df)
dask_df = DataFrameAdapter.to_dask(df)
# Common operations
length = DataFrameAdapter.get_length(df)
is_empty = DataFrameAdapter.is_empty(df)
head_df = DataFrameAdapter.head(df, 10)
concat_df = DataFrameAdapter.concat([df1, df2])
Performance Benefits
When Dask Helps
- Large datasets (> 10K rows)
- Memory-intensive operations
- Parallel transformations
- Map-reduce patterns
- Out-of-core processing (data larger than RAM)
When Pandas is Better
- Small datasets (< 10K rows)
- Interactive exploration
- Simple transformations
- Operations requiring full materialization
Configuration
Current Settings
- Threshold: 10,000 rows
- Dask Workers: Auto-detected (LocalCluster)
- Partitions: Auto-calculated (1 partition per 5,000 rows)
Future Configuration (Planned)
DASK_ENABLED=true
DASK_THRESHOLD=10000
DASK_SCHEDULER_ADDRESS=tcp://scheduler:8786
DASK_N_WORKERS=4
DASK_THREADS_PER_WORKER=2
Monitoring
Dask Dashboard
When Dask client initializes, check logs for:
INFO: Dask Client initialized: http://localhost:8787/status
Visit the dashboard to see:
- Task graph
- Worker status
- Memory usage
- Task progress
Troubleshooting
Issue: "Dask Client initialization failed"
Solution: This is expected in some environments. System falls back to Pandas.
Issue: "Memory usage increased"
Solution: Dask uses lazy evaluation. Call .compute() only when needed.
Issue: "Performance degraded for small data"
Solution: Threshold is set to 10K rows. Small data uses Pandas automatically.
Issue: "User Python code fails with Dask DataFrame"
Solution: Adapter automatically converts to Pandas before user code execution.
Rollback Plan
If issues arise:
Disable auto-selection in
dataframe_adapter.py:DASK_THRESHOLD = float('inf') # Never use DaskForce Pandas everywhere:
df = DataFrameAdapter.to_pandas(df)