Skip to main content

DataFrame Adapter

Overview

The system supports both Pandas and Dask DataFrames for distributed data processing. The migration is 100% backward compatible - all existing pipelines work without modification.

Key Features

Automatic Type Selection

  • Small datasets (< 10,000 rows): Uses Pandas for lower overhead
  • Large datasets (≥ 10,000 rows): Automatically uses Dask for distributed processing
  • Transparent: Users don't need to change anything

Distributed Processing

  • Map Nodes: Already use Dask distributed for sub-pipeline execution
  • Filter/Reduce/SQL Nodes: Optimized with DuckDB + automatic Dask support
  • Scalable: Can handle datasets larger than memory

Backward Compatibility

  • All existing pipeline configurations work unchanged
  • User Python code still receives pd.DataFrame
  • API responses unchanged
  • No breaking changes

Usage

from dataframe_adapter import DataFrameAdapter, DataFrame

# Create DataFrame (auto-selects Pandas or Dask based on size)
df = DataFrameAdapter.create(data)

# Check type
is_dask = DataFrameAdapter.is_dask(df)

# Convert between types
pandas_df = DataFrameAdapter.to_pandas(df)
dask_df = DataFrameAdapter.to_dask(df)

# Common operations
length = DataFrameAdapter.get_length(df)
is_empty = DataFrameAdapter.is_empty(df)
head_df = DataFrameAdapter.head(df, 10)
concat_df = DataFrameAdapter.concat([df1, df2])

Performance Benefits

When Dask Helps

  • Large datasets (> 10K rows)
  • Memory-intensive operations
  • Parallel transformations
  • Map-reduce patterns
  • Out-of-core processing (data larger than RAM)

When Pandas is Better

  • Small datasets (< 10K rows)
  • Interactive exploration
  • Simple transformations
  • Operations requiring full materialization

Configuration

Current Settings

  • Threshold: 10,000 rows
  • Dask Workers: Auto-detected (LocalCluster)
  • Partitions: Auto-calculated (1 partition per 5,000 rows)

Future Configuration (Planned)

DASK_ENABLED=true
DASK_THRESHOLD=10000
DASK_SCHEDULER_ADDRESS=tcp://scheduler:8786
DASK_N_WORKERS=4
DASK_THREADS_PER_WORKER=2

Monitoring

Dask Dashboard

When Dask client initializes, check logs for:

INFO: Dask Client initialized: http://localhost:8787/status

Visit the dashboard to see:

  • Task graph
  • Worker status
  • Memory usage
  • Task progress

Troubleshooting

Issue: "Dask Client initialization failed"

Solution: This is expected in some environments. System falls back to Pandas.

Issue: "Memory usage increased"

Solution: Dask uses lazy evaluation. Call .compute() only when needed.

Issue: "Performance degraded for small data"

Solution: Threshold is set to 10K rows. Small data uses Pandas automatically.

Issue: "User Python code fails with Dask DataFrame"

Solution: Adapter automatically converts to Pandas before user code execution.

Rollback Plan

If issues arise:

  1. Disable auto-selection in dataframe_adapter.py:

    DASK_THRESHOLD = float('inf')  # Never use Dask
  2. Force Pandas everywhere:

    df = DataFrameAdapter.to_pandas(df)