Flow Types and Patterns
Overview
Flows in Qarion ETL represent data transformation pipelines that process data in batches. Each flow type implements a specific pattern for handling data changes, transactions, or event grouping. This document explains the available flow types, when to use them, and the design patterns behind each implementation.
What Are Flows?
A flow is a declarative definition of a data transformation pipeline. It describes:
- Input structure: What data comes in (columns, primary keys)
- Transformation logic: How data is processed (flow-specific)
- Output structure: What datasets are created
- Execution order: DAG (Directed Acyclic Graph) of transformation steps
Flows are defined in TOML files and processed by flow-specific plugins that implement the transformation logic.
Flow Architecture
Flow Definition (TOML)
↓
FlowPlugin (flow-specific logic)
├── generate_datasets() → Creates dataset schemas
├── generate_dag() → Creates execution DAG
└── generate_transformation_instructions() → Creates transformation rules
↓
Execution DAG
├── Ingestion nodes (load data from files)
└── Transformation nodes (transform between datasets)
↓
Code Generation / Execution
Flow Types
1. Change Feed Flow
Purpose: Track changes to data over time by comparing batches and classifying records.
Pattern: State comparison pattern with historical tracking.
When to Use
Use change feed flows when you need to:
- Track what changed between data loads
- Identify new, modified, deleted, or unchanged records
- Maintain a history of all changes
- Build audit trails or change tracking systems
- Support incremental processing based on changes
Use Cases
- Data Warehouse ETL: Track changes from source systems
- Audit Logging: Maintain complete change history
- Incremental Processing: Only process records that changed
- Data Synchronization: Keep multiple systems in sync
- Compliance: Track all data modifications for regulatory requirements
How It Works
- Landing Table: Receives raw batch data with
xt_batch_idand timestamp - Change Feed Table: Compares current batch with historical state and classifies records:
- NEW: First appearance (not in historical state)
- CHANGED: Exists in history but values differ from latest state
- DELETED: Exists in history but not in current batch
- STALE: Exists in both with identical values to latest state
- REACTIVATED: Previously deleted record that reappears
Pattern Rationale
State View Approach: Instead of comparing only with the previous batch, change feed uses a "state view" that represents the latest known state of each record. This approach:
- Handles Missing Batches: If batch N-1 is missing, you can still process batch N correctly
- Accurate Change Detection: Compares against the true latest state, not just the previous batch
- Supports Replay: Can reprocess historical batches without affecting current state
- Idempotent: Processing the same batch multiple times produces the same result
Why Not Simple Diff?
- Simple batch-to-batch comparison fails when batches are skipped
- Doesn't handle out-of-order processing
- Can't detect reactivations (deleted then re-added records)
Example
# flows/orders_change_feed.toml
id = "orders_change_feed"
name = "Orders Change Feed"
flow_type = "change_feed"
[input]
primary_key = ["order_id"]
columns = ["order_id", "customer_id", "order_date", "total_amount", "status"]
[properties]
# Optional: configure load behavior
[properties.load]
source_path = "data/orders"
file_pattern = "orders_*.csv"
This creates:
orders_landing: Raw batch dataorders_change_feed: Classified changes withxt_record_type(NEW, CHANGED, DELETED, STALE, REACTIVATED)
Transformation Logic
The transformation uses SQL window functions and state comparison:
-- Simplified logic
WITH current_batch AS (
SELECT * FROM landing WHERE xt_batch_id = :batch_id
),
historical_state AS (
SELECT DISTINCT ON (primary_key) *
FROM change_feed
WHERE xt_batch_id <= :previous_batch_id
ORDER BY primary_key, xt_batch_id DESC
)
SELECT
CASE
WHEN hist.primary_key IS NULL THEN 'NEW'
WHEN curr.values != hist.values THEN 'CHANGED'
WHEN hist.xt_record_type = 'DELETED' THEN 'REACTIVATED'
ELSE 'STALE'
END AS xt_record_type,
...
FROM current_batch curr
LEFT JOIN historical_state hist ON curr.primary_key = hist.primary_key
2. Delta Publishing Flow
Purpose: Generate financial transaction records from data changes, supporting reversals and corrections.
Pattern: Transaction pattern with reversible operations.
When to Use
Use delta publishing flows when you need to:
- Generate financial transactions from data changes
- Support transaction reversals and corrections
- Maintain transaction history with referential integrity
- Implement accounting-style double-entry or transaction logging
- Track transaction lineage (which transaction was reversed/corrected)
Use Cases
- Financial Systems: Generate accounting transactions from business events
- Audit Trails: Track all transaction modifications
- Compliance: Maintain complete transaction history for regulatory reporting
- Data Corrections: Handle corrections to previously published data
- Event Sourcing: Build event-sourced systems with transaction semantics
How It Works
Delta publishing extends change feed with a third table:
- Landing Table: Raw batch data (same as change feed)
- Change Feed Table: Classified changes (same as change feed)
- Delta Transaction Table: Transaction records with:
xt_transaction_type: INITIAL_BOOKING, REVERSAL, REPUBLISH, DELTA_CORRECTIONxt_referred_transaction_id: Links to the transaction being reversed/corrected
Delta Methods
Delta publishing supports two methods:
REVERSE_REPLAY Method
Pattern: Replay all changes from previous batch to current batch.
How it works:
- For each record in change feed:
- If CHANGED: Reverse previous transaction, create new transaction
- If DELETED: Reverse previous transaction
- If NEW: Create initial booking
- If REACTIVATED: Create republish transaction
When to use:
- When you need complete transaction history
- When reversals must reference original transactions
- When transaction order matters
- For audit and compliance requirements
Example:
Batch 1: Order created → INITIAL_BOOKING (txn_id=1)
Batch 2: Order amount changed → REVERSAL (txn_id=2, refers_to=1) + INITIAL_BOOKING (txn_id=3)
Batch 3: Order deleted → REVERSAL (txn_id=4, refers_to=3)
DELTA_CORRECTION Method
Pattern: Only generate transactions for net changes.
How it works:
- For each record in change feed:
- If CHANGED: Create DELTA_CORRECTION with net difference
- If DELETED: Create REVERSAL
- If NEW: Create INITIAL_BOOKING
- If REACTIVATED: Create REPUBLISH
When to use:
- When you only care about net changes
- When transaction volume is a concern
- When you don't need full replay capability
- For performance optimization
Example:
Batch 1: Order created → INITIAL_BOOKING (amount=100)
Batch 2: Order amount changed to 150 → DELTA_CORRECTION (delta=+50)
Batch 3: Order deleted → REVERSAL
Pattern Rationale
Why Transaction Pattern?
- Reversibility: Can undo any transaction by creating a reversal
- Auditability: Complete history of all transactions and their reversals
- Referential Integrity: Transactions reference each other, enabling lineage tracking
- Financial Compliance: Matches accounting practices (debits/credits, reversals)
Why Two Methods?
- REVERSE_REPLAY: Maximum auditability, complete history, but more transactions
- DELTA_CORRECTION: Efficient, fewer transactions, but less detailed history
Trade-offs:
- REVERSE_REPLAY: More storage, better audit trail, easier to understand
- DELTA_CORRECTION: Less storage, faster processing, but harder to reconstruct full history
Example
# flows/accounting_transactions.toml
id = "accounting_transactions"
name = "Accounting Transactions"
flow_type = "delta_publishing"
[input]
primary_key = ["transaction_id"]
columns = ["transaction_id", "account_id", "amount", "transaction_date"]
[properties]
DELTA_METHOD = "REVERSE_REPLAY" # or "DELTA_CORRECTION"
This creates:
accounting_transactions_landing: Raw batch dataaccounting_transactions_change_feed: Classified changesaccounting_transactions_delta_transaction: Transaction records with types and references
3. Standard Flow
Purpose: Task-based flow that supports ingestion, SQL processing, and custom script tasks executed directly by engines.
Pattern: Task orchestration pattern with direct engine execution.
When to Use
Use standard flows when you need to:
- Build flexible, task-based data pipelines
- Execute SQL directly on your engine
- Run custom scripts (engine-specific)
- Work with SQL-capable engines (SQLite, PostgreSQL, MySQL, etc.)
- Have fine-grained control over each pipeline step
- Avoid the overhead of instruction set conversion
Use Cases
- SQL-Based ETL: Direct SQL transformations without abstraction layer
- Custom Processing: Engine-specific custom scripts
- Data Pipelines: Flexible pipelines with mixed task types
- Prototyping: Quick pipelines without full flow type definition
- Legacy Integration: Integrate existing SQL scripts into Qarion ETL flows
How It Works
Standard flows use a task-based approach:
- Task Definitions: Each task defines what to do (ingestion, SQL, or script)
- Dataset References: Tasks reference source and target datasets
- Dependency Resolution: Dependencies are automatically resolved based on dataset references
- Direct Execution: Tasks execute directly on engines without instruction set conversion
Task Types
Ingestion Tasks
Load data from external sources (files, APIs, etc.).
[[tasks]]
id = "ingest_data"
type = "ingestion"
target_dataset = "raw_data"
[tasks.config]
path = "data/input.csv"
format = "csv"
delimiter = ","
header = true
Configuration:
path: Source file pathformat: File format (csv, json, parquet, etc.)delimiter: Field delimiter (for CSV)header: Whether file has header row
SQL Processing Tasks
Execute SQL queries directly on the engine.
[[tasks]]
id = "process_data"
type = "sql_processing"
source_dataset = "raw_data"
target_dataset = "processed_data"
sql = """
INSERT INTO {target_dataset}
SELECT
id,
name,
UPPER(email) AS email,
created_at
FROM {source_dataset}
WHERE status = 'active'
"""
SQL Templating:
{{ source_dataset }}: Replaced with source dataset table name (single source){{ source_datasets }}: List of source dataset table names (multiple sources, FAN IN)- Access individual sources:
{{ source_datasets[0] }},{{ source_datasets[1] }}, etc.
- Access individual sources:
{{ target_dataset }}: Replaced with target dataset table name{{ batch_id }}: Replaced with current batch ID
Requirements:
- Engine must support SQL execution (
execute_query()method) - SQL must be valid for the engine's dialect
Custom Script Tasks
Execute custom scripts (engine-specific).
[[tasks]]
id = "custom_processing"
type = "custom_script"
source_dataset = "processed_data"
target_dataset = "final_data"
script = """
# Custom Python script for Pandas engine
import pandas as pd
df = context.get_dataframe('processed_data')
df['processed'] = df['value'] * 2
context.save_dataframe('final_data', df)
"""
Requirements:
- Engine must support script execution (
execute_script()method) - Script format depends on engine type
Task Dependencies
Standard flows support both implicit and explicit task dependencies, enabling FAN IN and FAN OUT patterns.
Implicit Dependencies: Tasks automatically depend on datasets produced by other tasks:
Task A (target: dataset_1)
↓
Task B (source: dataset_1, target: dataset_2)
↓
Task C (source: dataset_2, target: dataset_3)
Explicit Dependencies (FAN IN and FAN OUT):
You can explicitly define task dependencies to support complex patterns:
FAN OUT Pattern (One task → Multiple tasks):
[[tasks]]
id = "ingest_data"
type = "ingestion"
target_dataset = "raw_data"
downstream_tasks = ["process_data", "validate_data"] # FAN OUT
[tasks.config]
path = "data/input.csv"
format = "csv"
FAN IN Pattern (Multiple tasks → One task):
[[tasks]]
id = "merge_data"
type = "sql_processing"
source_datasets = ["processed_data", "validated_data"] # Multiple sources
target_dataset = "merged_data"
dependencies = ["process_data", "validate_data"] # Explicit dependencies
sql = """
SELECT * FROM {{ source_datasets[0] }}
UNION ALL
SELECT * FROM {{ source_datasets[1] }}
"""
Dependency Fields:
dependencies: List of task IDs this task depends on (FAN IN support)downstream_tasks: List of task IDs that depend on this task (FAN OUT support)source_datasets: List of source dataset IDs (for multi-source tasks, FAN IN)
Multiple Source Datasets:
When a task needs to combine data from multiple sources, use source_datasets:
[[tasks]]
id = "join_data"
type = "sql_processing"
source_datasets = ["table_a", "table_b"] # Multiple sources
target_dataset = "joined_data"
dependencies = ["task_a", "task_b"] # All upstream tasks
sql = """
SELECT a.*, b.*
FROM {{ source_datasets[0] }} a
JOIN {{ source_datasets[1] }} b ON a.id = b.id
"""
Accessing Multiple Source Datasets:
You can access datasets by ID using the source_dataset() function:
sql = """
SELECT
a.id,
a.name,
b.value
FROM {{ source_dataset('processed_data') }} a
JOIN {{ source_dataset('validated_data') }} b ON a.id = b.id
"""
Benefits of source_dataset() function:
- ✅ More readable and maintainable than
{{ source_datasets[0] }} - ✅ Self-documenting (dataset ID is clear)
- ✅ Order-independent (no need to worry about list order)
Alternative access methods (for backward compatibility):
{{ source_datasets[0] }},{{ source_datasets[1] }}- Access by index{{ source_datasets_dict['dataset_id'] }}- Access by ID from dictionary
Dependencies are resolved automatically when generating the execution DAG, combining both implicit (dataset-based) and explicit (task-based) dependencies.
Pattern Rationale
Why Task-Based Approach?
- Flexibility: Mix different task types in one flow
- Direct Execution: No abstraction overhead for simple operations
- Engine-Specific: Leverage engine-specific features directly
- Simplicity: Easier to understand and debug than instruction sets
Why Not Instruction Sets?
- Overhead: Instruction set conversion adds complexity
- Limitations: Some operations are easier to express directly
- Engine Features: Direct execution allows using engine-specific features
- Performance: Direct execution can be more efficient
Trade-offs:
- ✅ More flexible and direct
- ✅ Better for SQL-heavy workflows
- ✅ Easier to integrate existing SQL scripts
- ❌ Less portable (engine-specific)
- ❌ No automatic engine abstraction
- ❌ Requires compatible engines
Example
# flows/sql_pipeline.toml
id = "sql_pipeline"
name = "SQL Processing Pipeline"
flow_type = "standard"
namespace = "raw"
# Flow-level variables
[variables]
min_amount = 0.01
status = "completed"
lookback_days = 30
[[tasks]]
id = "ingest_orders"
type = "ingestion"
name = "Ingest Orders"
target_dataset = "orders_raw"
[tasks.config]
# Variables can be used in file paths
path = "data/orders_{{ batch_id }}.csv"
format = "csv"
[[tasks]]
id = "clean_orders"
type = "sql_processing"
name = "Clean Orders"
source_dataset = "orders_raw"
target_dataset = "orders_clean"
# Use DBT-like variable syntax
sql = """
INSERT INTO {{ target_dataset }}
SELECT
order_id,
customer_id,
TRIM(order_date) AS order_date,
CAST(amount AS DECIMAL(10,2)) AS amount
FROM {{ source_dataset }}
WHERE order_id IS NOT NULL
AND amount >= {{ var("min_amount") }}
"""
[[tasks]]
id = "aggregate_sales"
type = "sql_processing"
name = "Aggregate Sales"
source_dataset = "orders_clean"
target_dataset = "daily_sales"
# Task-specific variable override
[tasks.variables]
lookback_days = 60
sql = """
INSERT INTO {{ target_dataset }}
SELECT
DATE(order_date) AS sale_date,
COUNT(*) AS order_count,
SUM(amount) AS total_sales,
AVG(amount) AS avg_order_value
FROM {{ source_dataset }}
WHERE order_date >= DATE('now', '-{{ var("lookback_days") }} days')
AND status = '{{ var("status") }}'
GROUP BY DATE(order_date)
"""
This creates:
orders_raw: Raw ingested dataorders_clean: Cleaned and validated datadaily_sales: Aggregated daily statistics
Variables and Templating
Standard flows support variables and templating for dynamic configuration. Variables are organized into categories:
1. Built-in Runtime Variables:
These variables are automatically available in all tasks:
{{ source_dataset }}: Source dataset table name{{ target_dataset }}: Target dataset table name{{ batch_id }}: Current batch ID for this execution
2. Execution Date and Time Variables:
When an execution date is provided, these date/time variables are automatically available:
{{ ds }}: Execution date as YYYY-MM-DD (e.g., "2024-01-15"){{ ds_nodash }}: Execution date as YYYYMMDD (e.g., "20240115"){{ ts }}: Execution timestamp as YYYY-MM-DDTHH:MM:SS{{ ts_nodash }}: Execution timestamp as YYYYMMDDTHHMMSS{{ yesterday_ds }}: Previous day as YYYY-MM-DD{{ tomorrow_ds }}: Next day as YYYY-MM-DD{{ prev_ds }}: Alias foryesterday_ds{{ next_ds }}: Alias fortomorrow_ds{{ execution_date }}: Full datetime object
3. Custom Variables:
You can define custom variables at two levels:
- Flow-level variables: Defined in
[variables]section, available to all tasks - Task-level variables: Defined in
[tasks.variables], override flow-level variables
Variable Syntax:
{{ var('variable_name') }}: Access a variable (raises error if not found){{ var('variable_name', 'default_value') }}: Access with default value{{ variable_name }}: Direct variable reference (shorthand, when variable is in context)
Template Features:
- Jinja2 syntax: Full Jinja2 templating support
- Built-in filters:
upper,lower,default - Template rendering: All SQL, scripts, and config paths support templating
Example with Variables:
[variables]
threshold = 100
status = "active"
date_format = "%Y-%m-%d"
[[tasks]]
id = "filtered_query"
type = "sql_processing"
[tasks.variables]
threshold = 200 # Overrides flow-level threshold
sql = """
SELECT * FROM {{ source_dataset }}
WHERE value > {{ var("threshold") }}
AND status = '{{ var("status") }}'
AND created_at >= DATE('now', '-{{ var("lookback_days", 30) }} days')
"""
Custom Script Templates:
Custom script tasks also support templating:
[[tasks]]
id = "custom_transform"
type = "custom_script"
[tasks.variables]
multiplier = 2.5
script = """
# Python script with variables
import pandas as pd
df = context.get_dataframe('{{ source_dataset }}')
df['value'] = df['value'] * {{ var("multiplier") }}
df['status'] = '{{ var("status") }}'
df['execution_date'] = '{{ ds }}'
context.save_dataframe('{{ target_dataset }}', df)
"""
Execution Date Use Cases:
Execution date variables are useful for:
- Date Partitioning: Process data for specific dates
- Backfilling: Re-run historical data processing
- Time-based Filtering: Filter data by execution date
- File Paths: Use in file paths for date-partitioned data
Example:
[[tasks]]
id = "daily_aggregation"
type = "sql_processing"
source_dataset = "raw_data"
target_dataset = "daily_stats"
sql = """
INSERT INTO {{ target_dataset }}
SELECT
DATE(created_at) AS date,
COUNT(*) AS records,
SUM(amount) AS total
FROM {{ source_dataset }}
WHERE DATE(created_at) = '{{ ds }}'
GROUP BY DATE(created_at)
"""
[[tasks]]
id = "ingest_daily_data"
type = "ingestion"
target_dataset = "daily_data"
[tasks.config]
# Use execution date in file path
path = "data/input/{{ ds_nodash }}.csv"
format = "csv"
Execution Date in Flow Execution:
Execution dates can be passed when executing flows:
from datetime import datetime
from flows import FlowExecutionService
service = FlowExecutionService(engine, transformation_service)
# Plan with execution date
plan = service.plan_flow_execution(
flow_definition=flow_def,
datasets=datasets,
batch_id=1,
execution_date=datetime(2024, 1, 15) # Process data for Jan 15, 2024
)
# Execute with execution date
result = service.execute_flow(
execution_plan=plan,
batch_id=1,
execution_date=datetime(2024, 1, 15)
)
If not provided, execution date defaults to current date/time.
Execution Flow
- Ingestion Tasks: Execute first, loading data into target datasets
- SQL/Script Tasks: Execute in dependency order (based on dataset references)
- Error Handling: Failed tasks stop the pipeline (no automatic retry)
Engine Compatibility
| Engine | Ingestion | SQL Processing | Custom Script |
|---|---|---|---|
| SQLite | ✅ | ✅ | ❌ |
| PostgreSQL | ✅ | ✅ | ❌ |
| MySQL | ✅ | ✅ | ❌ |
| Pandas | ✅ | ❌ | ✅ (Python) |
| DuckDB | ✅ | ✅ | ❌ |
Note: Custom script support depends on engine implementation.
4. Growth Accounting Flow
Purpose: Analyze user growth by decomposing it into acquisitions, churn, and resurrections.
Pattern: Growth decomposition pattern with time-period analysis.
When to Use
Use growth accounting flows when you need to:
- Understand the true drivers of user growth
- Identify which experiences reduce churn
- Discover what brings inactive users back
- Focus on sustainable growth metrics
- Make data-driven decisions about retention and reactivation
Use Cases
- User Analytics: Analyze user base dynamics over time
- Product Growth: Understand growth drivers beyond acquisition
- Retention Analysis: Identify churn patterns and reactivation opportunities
- Growth Strategy: Make informed decisions about where to invest resources
How It Works
Growth accounting decomposes user growth using the formula:
Net Growth = Acquisitions - Churn + Resurrections
Processing Steps:
- Activity Periods: Aggregate user activity by time period (daily/weekly/monthly)
- Acquisitions: Identify new users who engaged for the first time
- Churn: Identify users who stopped being active (based on threshold)
- Resurrections: Identify users who were previously churned but returned
- Growth Summary: Calculate net growth and active user counts
Key Insight: A 1% improvement in churn or resurrections can have twice the impact of a 1% lift in acquisition.
Example
# flows/user_growth.toml
id = "user_growth"
name = "User Growth Analysis"
flow_type = "growth_accounting"
namespace = "analytics"
[input]
columns = [
{name = "user_id", schema_type = "string", required = true},
{name = "timestamp", schema_type = "timestamp", required = true},
{name = "activity_type", schema_type = "string", required = false}
]
primary_key = ["user_id", "timestamp"]
[properties]
user_id_column = "user_id"
timestamp_column = "timestamp"
period_type = "daily" # daily, weekly, or monthly
churn_threshold_days = 30
Generated Datasets:
user_growth_activity_periods: User activity by time perioduser_growth_acquisitions: New user acquisitions per perioduser_growth_churn: User churn per perioduser_growth_resurrections: User resurrections per perioduser_growth_growth_summary: Growth accounting summary
Growth Summary Columns:
period: Time period (date)acquisitions: Count of new userschurn: Count of users who churnedresurrections: Count of users who returnednet_growth: Acquisitions - Churn + Resurrectionsactive_users: Total active users in period
Execution Flow
- Activity Periods: Aggregate user activity data by configured period
- Acquisitions: Identify first-time users per period
- Churn: Identify users who stopped being active (based on threshold)
- Resurrections: Match churned users who became active again
- Growth Summary: Aggregate all metrics and calculate net growth
Configuration
Required Properties:
user_id_column: User identifier column name (default: "user_id")timestamp_column: Timestamp column name (default: "timestamp")
Optional Properties:
period_type: Analysis period - "daily", "weekly", or "monthly" (default: "daily")churn_threshold_days: Days of inactivity to consider churned (default: 30)
Benefits
- ✅ Provides actionable insights into growth drivers
- ✅ Shifts focus from acquisition to retention and reactivation
- ✅ Enables data-driven decisions about product improvements
- ✅ Identifies high-impact opportunities (churn reduction, reactivation)
- ✅ Measures sustainable growth, not just user count
5. Sessionization Flow
Purpose: Group events into sessions based on user and time-based rules.
Pattern: Time-window grouping pattern with configurable session rules.
When to Use
Use sessionization flows when you need to:
- Group events by user into sessions
- Define sessions based on time windows or gaps
- Analyze user behavior within sessions
- Build session-based analytics
- Process event streams into sessionized data
Use Cases
- Web Analytics: Group page views into user sessions
- Mobile App Analytics: Track user sessions in mobile apps
- IoT Data: Group sensor readings into sessions
- Customer Journey: Analyze customer interactions within sessions
- Fraud Detection: Identify suspicious session patterns
How It Works
- Landing Table: Raw events with user identifier and timestamp
- Sessions Table: Grouped sessions with:
- Session identifier
- User identifier
- Session start/end times
- Session attributes (first event, last event, event count, etc.)
Session Rules
Sessions are created based on configurable rules:
Time Window (Session Timeout)
Events within a time window belong to the same session.
[properties]
session_timeout_seconds = 1800 # 30 minutes
Logic: If time between events > timeout, start new session.
Gap Threshold
Events separated by more than a gap threshold start a new session.
[properties]
session_gap_threshold_seconds = 600 # 10 minutes
Logic: If gap between events > threshold, start new session.
Note: You can use either timeout or gap threshold, or both.
Pattern Rationale
Why Sessionization?
- User Behavior Analysis: Understand how users interact over time
- Event Grouping: Transform event streams into meaningful sessions
- Analytics: Enable session-based metrics (session duration, events per session)
- Business Logic: Many business processes operate on session-level data
Why Time-Based Rules?
- Flexibility: Different use cases need different session definitions
- Configurability: Can adjust session boundaries without code changes
- Real-World Patterns: Matches how users actually behave (sessions end after inactivity)
Why Not Fixed Windows?
- Fixed windows (e.g., hourly sessions) don't match user behavior
- Users don't align to arbitrary time boundaries
- Gap-based sessions are more intuitive and accurate
Example
# flows/user_sessions.toml
id = "user_sessions"
name = "User Session Tracking"
flow_type = "sessionization"
[input]
columns = ["event_id", "user_id", "timestamp", "event_type", "page_url"]
[properties]
user_id_field = "user_id"
timestamp_field = "timestamp"
session_timeout_seconds = 1800 # 30 minutes
This creates:
user_sessions_landing: Raw eventsuser_sessions_sessions: Grouped sessions with session_id, start_time, end_time, event_count
Transformation Logic
The transformation groups events using window functions:
-- Simplified logic
WITH ranked_events AS (
SELECT *,
LAG(timestamp) OVER (PARTITION BY user_id ORDER BY timestamp) AS prev_timestamp
FROM landing
WHERE xt_batch_id = :batch_id
),
session_boundaries AS (
SELECT *,
CASE
WHEN prev_timestamp IS NULL THEN 1 -- First event
WHEN timestamp - prev_timestamp > :session_timeout THEN 1 -- Gap detected
ELSE 0
END AS is_session_start
FROM ranked_events
),
session_ids AS (
SELECT *,
SUM(is_session_start) OVER (PARTITION BY user_id ORDER BY timestamp) AS session_number
FROM session_boundaries
)
SELECT
user_id,
CONCAT(user_id, '_', session_number) AS session_id,
MIN(timestamp) AS session_start,
MAX(timestamp) AS session_end,
COUNT(*) AS event_count
FROM session_ids
GROUP BY user_id, session_number
Design Patterns Explained
Why Plugin Architecture?
Problem: Different flow types need completely different transformation logic.
Solution: Plugin pattern allows each flow type to encapsulate its own logic.
Benefits:
- Extensibility: Add new flow types without modifying core code
- Isolation: Flow-specific bugs don't affect other flows
- Testability: Each plugin can be tested independently
- Maintainability: Changes to one flow type don't impact others
Alternative (Not Used): Single class with if/else statements
- Would lead to massive, unmaintainable code
- Hard to test individual flow types
- Violates Open/Closed Principle
Why Engine-Agnostic Instructions?
Problem: Need to support multiple execution engines (SQL, Pandas, PySpark).
Solution: Transformation instructions describe "what" to do, not "how".
Benefits:
- Portability: Same flow works with different engines
- Code Generation: Can generate SQL, Python, or other code
- Testing: Can test logic without specific engine
- Future-Proof: Easy to add new engines
Alternative (Not Used): Generate engine-specific code directly
- Would require separate code paths for each engine
- Hard to maintain consistency across engines
- Can't easily switch engines
Why State View for Change Feed?
Problem: Need to accurately detect changes even when batches are missing or out of order.
Solution: Compare against latest known state, not just previous batch.
Benefits:
- Robustness: Handles missing batches gracefully
- Accuracy: Always compares against true latest state
- Idempotency: Reprocessing batches produces same result
- Flexibility: Can process batches in any order
Alternative (Not Used): Simple batch-to-batch comparison
- Fails when batches are skipped
- Can't handle out-of-order processing
- Less accurate change detection
Why Transaction Pattern for Delta Publishing?
Problem: Need to support reversals and corrections in financial systems.
Solution: Generate explicit transaction records with types and references.
Benefits:
- Reversibility: Can undo any transaction
- Auditability: Complete transaction history
- Referential Integrity: Transactions reference each other
- Compliance: Matches accounting practices
Alternative (Not Used): Just update records in place
- Can't track reversals
- No audit trail
- Doesn't match financial system requirements
Why Time-Based Sessionization?
Problem: Need to group events into meaningful sessions that match user behavior.
Solution: Configurable time windows and gap thresholds.
Benefits:
- Flexibility: Different use cases need different rules
- Accuracy: Matches real user behavior
- Configurability: Adjust without code changes
- Business Logic: Aligns with how businesses think about sessions
Alternative (Not Used): Fixed time windows (hourly, daily)
- Doesn't match user behavior
- Arbitrary boundaries
- Less accurate session definitions
Choosing the Right Flow Type
Decision Tree
Do you need to track changes over time?
├─ Yes → Do you need financial transactions?
│ ├─ Yes → Use Delta Publishing
│ │ └─ Choose DELTA_METHOD:
│ │ ├─ Need complete audit trail? → REVERSE_REPLAY
│ │ └─ Need efficiency? → DELTA_CORRECTION
│ └─ No → Use Change Feed
│
└─ No → Do you need to group events by user?
├─ Yes → Use Sessionization
└─ No → Do you need direct SQL/script execution?
├─ Yes → Use Standard Flow
└─ No → Consider custom flow type
Comparison Table
| Feature | Change Feed | Delta Publishing | Sessionization | Standard Flow |
|---|---|---|---|---|
| Primary Use Case | Change tracking | Financial transactions | Event grouping | Task-based pipelines |
| Output Tables | 2 (landing, change_feed) | 3 (landing, change_feed, delta_transaction) | 2 (landing, sessions) | Variable (task-defined) |
| Change Classification | ✅ (NEW, CHANGED, DELETED, STALE, REACTIVATED) | ✅ (same as change feed) | ❌ | ❌ |
| Transaction Types | ❌ | ✅ (INITIAL_BOOKING, REVERSAL, etc.) | ❌ | ❌ |
| Session Grouping | ❌ | ❌ | ✅ | ❌ |
| Time-Based Rules | ❌ | ❌ | ✅ | ❌ |
| State Comparison | ✅ | ✅ | ❌ | ❌ |
| Reversibility | ❌ | ✅ | ❌ | ❌ |
| Audit Trail | ✅ | ✅✅ (enhanced) | ❌ | ❌ |
| Direct SQL Execution | ❌ | ❌ | ❌ | ✅ |
| Custom Scripts | ❌ | ❌ | ❌ | ✅ |
| Engine Portability | ✅ | ✅ | ✅ | ❌ (engine-specific) |
| Instruction Sets | ✅ | ✅ | ✅ | ❌ (direct execution) |
Flow Execution
Execution Order
Flows execute in topological order (DAG):
- Ingestion Steps: Load data from files into landing tables
- Transformation Steps: Transform between datasets (in dependency order)
Batch Processing
Each flow processes data in batches:
- Each batch has a unique
batch_id - Previous batch ID is used for change detection
- Transformations are idempotent (can reprocess safely)
Error Handling
- Validation: Flow definitions are validated before execution
- Rollback: Errors trigger transaction rollback
- Logging: Comprehensive error logging for debugging
- Recovery: Can reprocess failed batches
Extending Flows
Creating a Custom Flow Type
- Create Plugin Class: Inherit from
FlowPlugin - Implement Methods:
generate_datasets(): Define output schemasgenerate_dag(): Define execution ordergenerate_transformation_instructions(): Define transformation logic
- Register Plugin: Use
register_flow_plugin()
See Developer Guide for detailed instructions.
Best Practices
Flow Design
- Clear Primary Keys: Always define primary keys for accurate change detection
- Descriptive Names: Use clear, descriptive names for flows and datasets
- Documentation: Add descriptions to flow definitions
- Validation: Validate flow definitions before deployment
Performance
- Batch Size: Process data in appropriate batch sizes
- Indexing: Ensure primary keys are indexed
- Partitioning: Consider partitioning large tables
- Monitoring: Monitor batch processing times
Maintenance
- Version Control: Keep flow definitions in version control
- Testing: Test flows with sample data before production
- Documentation: Document flow-specific business logic
- Monitoring: Monitor flow execution and data quality
Related Documentation
- Architecture Overview - System architecture
- Transformation Instructions - How transformations work
- Developer Guide - Creating custom flow types
- Testing Guide - Testing flows