Skip to main content

Dataflow Engine

PipelineRunner

The PipelineRunner is the core execution engine for visual pipelines with async node execution.

Key Features

  • Async execution with asyncio
  • Topological sorting for dependency resolution
  • Parallel execution of independent nodes
  • Data flow through NetworkX graph
  • Smart merging (Union vs Join based on schema)
  • Staging support via DuckDB
  • Dask/Pandas adapter for distributed processing

Processing Modes

  • auto: Smart switching based on data size
  • dask: Force Dask for distributed processing
  • pandas: Force Pandas for in-memory processing

Node Executors

All node executors handle both Pandas and Dask types:

  1. SourceNodeExecutor: Creates appropriate type based on data size
  2. PythonNodeExecutor: Converts to Pandas for user code, preserves type in output
  3. FilterNodeExecutor: Uses DuckDB with automatic conversion
  4. ReduceNodeExecutor: Uses DuckDB with automatic conversion
  5. MapNodeExecutor: Handles both types, uses Dask distributed for large datasets
  6. SqlNodeExecutor: Uses DuckDB with automatic conversion
  7. RestNodeExecutor: Creates appropriate type based on response size

Usage Example

from pipeline_runner import PipelineRunner

runner = PipelineRunner(
config=pipeline_config,
supabase_client=supabase,
run_id="run-001",
use_mock_data=False,
processing_mode='auto'
)

result = await runner.run_async()

Common Patterns

Adding a New Node Type

  1. Create executor class in pipeline_runner.py:
class MyNodeExecutor(NodeExecutor):
async def execute(self, node_id: str, config: Dict, input_df: DataFrame) -> DataFrame:
# Implementation
return output_df
  1. Register in PipelineRunner.__init__:
self.executors = {
'my_node': MyNodeExecutor(self),
# ...
}