Dataflow Engine
PipelineRunner
The PipelineRunner is the core execution engine for visual pipelines with async node execution.
Key Features
- Async execution with
asyncio - Topological sorting for dependency resolution
- Parallel execution of independent nodes
- Data flow through NetworkX graph
- Smart merging (Union vs Join based on schema)
- Staging support via DuckDB
- Dask/Pandas adapter for distributed processing
Processing Modes
auto: Smart switching based on data sizedask: Force Dask for distributed processingpandas: Force Pandas for in-memory processing
Node Executors
All node executors handle both Pandas and Dask types:
- SourceNodeExecutor: Creates appropriate type based on data size
- PythonNodeExecutor: Converts to Pandas for user code, preserves type in output
- FilterNodeExecutor: Uses DuckDB with automatic conversion
- ReduceNodeExecutor: Uses DuckDB with automatic conversion
- MapNodeExecutor: Handles both types, uses Dask distributed for large datasets
- SqlNodeExecutor: Uses DuckDB with automatic conversion
- RestNodeExecutor: Creates appropriate type based on response size
Usage Example
from pipeline_runner import PipelineRunner
runner = PipelineRunner(
config=pipeline_config,
supabase_client=supabase,
run_id="run-001",
use_mock_data=False,
processing_mode='auto'
)
result = await runner.run_async()
Common Patterns
Adding a New Node Type
- Create executor class in
pipeline_runner.py:
class MyNodeExecutor(NodeExecutor):
async def execute(self, node_id: str, config: Dict, input_df: DataFrame) -> DataFrame:
# Implementation
return output_df
- Register in
PipelineRunner.__init__:
self.executors = {
'my_node': MyNodeExecutor(self),
# ...
}