Observability System
Overview
Sensyze Dataflow provides a comprehensive, async, and config-driven observability system. All logs and metrics are consolidated in a portable SQLite store (observability_logs).
Key Features
- Async Logging: High-performance background flushing with zero impact on the main execution thread.
- Unified Storage: All logs and metrics are consolidated in SQLite.
- Data Lineage: Automatic tracking of data flow between nodes.
- Rich Metadata: Captures node labels, data samples, and schema definitions.
- Multi-Tenant Isolation: Native support for
user_idpropagation. - System Metrics: Auto-captures CPU usage (%) and Memory (MB).
Architecture
graph LR
App[Host App] -->|queue_event| Q[Memory Queue]
subgraph Background Process
Q -->|batch| W[Worker Thread]
W -->|persist| S[(SQLite Store)]
W -->|evaluate| AE[Alert Engine]
AE -->|broadcast| SSE[SSE / Webhooks]
end
Configuration
The system is entirely config-driven via environment variables.
| Variable | Default | Description |
|---|---|---|
OBSERVABILITY_STORE_BACKEND | sqlite | sqlite or memory. |
OBSERVABILITY_SQLITE_PATH | auto | Path to SQLite file. |
OBSERVABILITY_MAX_RUNS | 5 | Retention policy. |
OBSERVABILITY_MIN_LOG_LEVEL | INFO | DEBUG, INFO, WARNING, ERROR, CRITICAL. |
OBSERVABILITY_SAMPLE_SIZE | 5 | Max rows per data snapshot. |
OBSERVABILITY_CAPTURE_INPUT | true | Capture input data samples. |
OBSERVABILITY_CAPTURE_OUTPUT | true | Capture output data samples. |
OBSERVABILITY_CAPTURE_METRICS | true | Auto-capture Memory and CPU. |
Data Captured
Pipeline Events
- Start/Stop timestamps
- Global row counts and node execution status
user_idfor multi-tenant tracing
Node Events
- Node Label: Human-friendly name
- Data Samples: 5-row input/output snapshots
- Schema: Column names and types
- Metrics: Duration (ms), Memory (MB), CPU (%)
Data Lineage
- Tracks edges between nodes
- Records
source_node_id→target_node_id
Usage Example
from observability_logger import get_logger
from observability_config import ObservabilityConfig
config = ObservabilityConfig.from_env()
obs_logger = get_logger(config=config)
obs_logger.log_node_complete(
run_id="run-001",
node_id="transform_1",
node_type="python",
input_rows=1000,
output_rows=950,
duration_ms=45.2,
user_id="user_abc",
metadata={"node_label": "Filter Inactive Users"}
)
Troubleshooting
Logs not showing in UI
- Verify
OBSERVABILITY_SSE_ENABLED=true - Check the
logs/observability/observability.sqlitefile size - Ensure the browser is connected to the SSE stream
Storage Growth
- Adjust
OBSERVABILITY_MAX_RUNSto a lower value - Disable data samples:
OBSERVABILITY_CAPTURE_INPUT=false