Skip to main content

Flow Types and Patterns

Overview

Flows in Qarion ETL represent data transformation pipelines that process data in batches. Each flow type implements a specific pattern for handling data changes, transactions, or event grouping. This document explains the available flow types, when to use them, and the design patterns behind each implementation.

What Are Flows?

A flow is a declarative definition of a data transformation pipeline. It describes:

  • Input structure: What data comes in (columns, primary keys)
  • Transformation logic: How data is processed (flow-specific)
  • Output structure: What datasets are created
  • Execution order: DAG (Directed Acyclic Graph) of transformation steps

Flows are defined in TOML files and processed by flow-specific plugins that implement the transformation logic.

Flow Architecture

Flow Definition (TOML)

FlowPlugin (flow-specific logic)
├── generate_datasets() → Creates dataset schemas
├── generate_dag() → Creates execution DAG
└── generate_transformation_instructions() → Creates transformation rules

Execution DAG
├── Ingestion nodes (load data from files)
└── Transformation nodes (transform between datasets)

Code Generation / Execution

Flow Types

1. Change Feed Flow

Purpose: Track changes to data over time by comparing batches and classifying records.

Pattern: State comparison pattern with historical tracking.

When to Use

Use change feed flows when you need to:

  • Track what changed between data loads
  • Identify new, modified, deleted, or unchanged records
  • Maintain a history of all changes
  • Build audit trails or change tracking systems
  • Support incremental processing based on changes

Use Cases

  • Data Warehouse ETL: Track changes from source systems
  • Audit Logging: Maintain complete change history
  • Incremental Processing: Only process records that changed
  • Data Synchronization: Keep multiple systems in sync
  • Compliance: Track all data modifications for regulatory requirements

How It Works

  1. Landing Table: Receives raw batch data with xt_batch_id and timestamp
  2. Change Feed Table: Compares current batch with historical state and classifies records:
    • NEW: First appearance (not in historical state)
    • CHANGED: Exists in history but values differ from latest state
    • DELETED: Exists in history but not in current batch
    • STALE: Exists in both with identical values to latest state
    • REACTIVATED: Previously deleted record that reappears

Pattern Rationale

State View Approach: Instead of comparing only with the previous batch, change feed uses a "state view" that represents the latest known state of each record. This approach:

  • Handles Missing Batches: If batch N-1 is missing, you can still process batch N correctly
  • Accurate Change Detection: Compares against the true latest state, not just the previous batch
  • Supports Replay: Can reprocess historical batches without affecting current state
  • Idempotent: Processing the same batch multiple times produces the same result

Why Not Simple Diff?

  • Simple batch-to-batch comparison fails when batches are skipped
  • Doesn't handle out-of-order processing
  • Can't detect reactivations (deleted then re-added records)

Example

# flows/orders_change_feed.toml
id = "orders_change_feed"
name = "Orders Change Feed"
flow_type = "change_feed"

[input]
primary_key = ["order_id"]
columns = ["order_id", "customer_id", "order_date", "total_amount", "status"]

[properties]
# Optional: configure load behavior
[properties.load]
source_path = "data/orders"
file_pattern = "orders_*.csv"

This creates:

  • orders_landing: Raw batch data
  • orders_change_feed: Classified changes with xt_record_type (NEW, CHANGED, DELETED, STALE, REACTIVATED)

Transformation Logic

The transformation uses SQL window functions and state comparison:

-- Simplified logic
WITH current_batch AS (
SELECT * FROM landing WHERE xt_batch_id = :batch_id
),
historical_state AS (
SELECT DISTINCT ON (primary_key) *
FROM change_feed
WHERE xt_batch_id <= :previous_batch_id
ORDER BY primary_key, xt_batch_id DESC
)
SELECT
CASE
WHEN hist.primary_key IS NULL THEN 'NEW'
WHEN curr.values != hist.values THEN 'CHANGED'
WHEN hist.xt_record_type = 'DELETED' THEN 'REACTIVATED'
ELSE 'STALE'
END AS xt_record_type,
...
FROM current_batch curr
LEFT JOIN historical_state hist ON curr.primary_key = hist.primary_key

2. Delta Publishing Flow

Purpose: Generate financial transaction records from data changes, supporting reversals and corrections.

Pattern: Transaction pattern with reversible operations.

When to Use

Use delta publishing flows when you need to:

  • Generate financial transactions from data changes
  • Support transaction reversals and corrections
  • Maintain transaction history with referential integrity
  • Implement accounting-style double-entry or transaction logging
  • Track transaction lineage (which transaction was reversed/corrected)

Use Cases

  • Financial Systems: Generate accounting transactions from business events
  • Audit Trails: Track all transaction modifications
  • Compliance: Maintain complete transaction history for regulatory reporting
  • Data Corrections: Handle corrections to previously published data
  • Event Sourcing: Build event-sourced systems with transaction semantics

How It Works

Delta publishing extends change feed with a third table:

  1. Landing Table: Raw batch data (same as change feed)
  2. Change Feed Table: Classified changes (same as change feed)
  3. Delta Transaction Table: Transaction records with:
    • xt_transaction_type: INITIAL_BOOKING, REVERSAL, REPUBLISH, DELTA_CORRECTION
    • xt_referred_transaction_id: Links to the transaction being reversed/corrected

Delta Methods

Delta publishing supports two methods:

REVERSE_REPLAY Method

Pattern: Replay all changes from previous batch to current batch.

How it works:

  • For each record in change feed:
    • If CHANGED: Reverse previous transaction, create new transaction
    • If DELETED: Reverse previous transaction
    • If NEW: Create initial booking
    • If REACTIVATED: Create republish transaction

When to use:

  • When you need complete transaction history
  • When reversals must reference original transactions
  • When transaction order matters
  • For audit and compliance requirements

Example:

Batch 1: Order created → INITIAL_BOOKING (txn_id=1)
Batch 2: Order amount changed → REVERSAL (txn_id=2, refers_to=1) + INITIAL_BOOKING (txn_id=3)
Batch 3: Order deleted → REVERSAL (txn_id=4, refers_to=3)
DELTA_CORRECTION Method

Pattern: Only generate transactions for net changes.

How it works:

  • For each record in change feed:
    • If CHANGED: Create DELTA_CORRECTION with net difference
    • If DELETED: Create REVERSAL
    • If NEW: Create INITIAL_BOOKING
    • If REACTIVATED: Create REPUBLISH

When to use:

  • When you only care about net changes
  • When transaction volume is a concern
  • When you don't need full replay capability
  • For performance optimization

Example:

Batch 1: Order created → INITIAL_BOOKING (amount=100)
Batch 2: Order amount changed to 150 → DELTA_CORRECTION (delta=+50)
Batch 3: Order deleted → REVERSAL

Pattern Rationale

Why Transaction Pattern?

  • Reversibility: Can undo any transaction by creating a reversal
  • Auditability: Complete history of all transactions and their reversals
  • Referential Integrity: Transactions reference each other, enabling lineage tracking
  • Financial Compliance: Matches accounting practices (debits/credits, reversals)

Why Two Methods?

  • REVERSE_REPLAY: Maximum auditability, complete history, but more transactions
  • DELTA_CORRECTION: Efficient, fewer transactions, but less detailed history

Trade-offs:

  • REVERSE_REPLAY: More storage, better audit trail, easier to understand
  • DELTA_CORRECTION: Less storage, faster processing, but harder to reconstruct full history

Example

# flows/accounting_transactions.toml
id = "accounting_transactions"
name = "Accounting Transactions"
flow_type = "delta_publishing"

[input]
primary_key = ["transaction_id"]
columns = ["transaction_id", "account_id", "amount", "transaction_date"]

[properties]
DELTA_METHOD = "REVERSE_REPLAY" # or "DELTA_CORRECTION"

This creates:

  • accounting_transactions_landing: Raw batch data
  • accounting_transactions_change_feed: Classified changes
  • accounting_transactions_delta_transaction: Transaction records with types and references

3. Standard Flow

Purpose: Task-based flow that supports ingestion, SQL processing, and custom script tasks executed directly by engines.

Pattern: Task orchestration pattern with direct engine execution.

When to Use

Use standard flows when you need to:

  • Build flexible, task-based data pipelines
  • Execute SQL directly on your engine
  • Run custom scripts (engine-specific)
  • Work with SQL-capable engines (SQLite, PostgreSQL, MySQL, etc.)
  • Have fine-grained control over each pipeline step
  • Avoid the overhead of instruction set conversion

Use Cases

  • SQL-Based ETL: Direct SQL transformations without abstraction layer
  • Custom Processing: Engine-specific custom scripts
  • Data Pipelines: Flexible pipelines with mixed task types
  • Prototyping: Quick pipelines without full flow type definition
  • Legacy Integration: Integrate existing SQL scripts into Qarion ETL flows

How It Works

Standard flows use a task-based approach:

  1. Task Definitions: Each task defines what to do (ingestion, SQL, or script)
  2. Dataset References: Tasks reference source and target datasets
  3. Dependency Resolution: Dependencies are automatically resolved based on dataset references
  4. Direct Execution: Tasks execute directly on engines without instruction set conversion

Task Types

Ingestion Tasks

Load data from external sources (files, APIs, etc.).

[[tasks]]
id = "ingest_data"
type = "ingestion"
target_dataset = "raw_data"
[tasks.config]
path = "data/input.csv"
format = "csv"
delimiter = ","
header = true

Configuration:

  • path: Source file path
  • format: File format (csv, json, parquet, etc.)
  • delimiter: Field delimiter (for CSV)
  • header: Whether file has header row
SQL Processing Tasks

Execute SQL queries directly on the engine.

[[tasks]]
id = "process_data"
type = "sql_processing"
source_dataset = "raw_data"
target_dataset = "processed_data"
sql = """
INSERT INTO {target_dataset}
SELECT
id,
name,
UPPER(email) AS email,
created_at
FROM {source_dataset}
WHERE status = 'active'
"""

SQL Templating:

  • {{ source_dataset }}: Replaced with source dataset table name (single source)
  • {{ source_datasets }}: List of source dataset table names (multiple sources, FAN IN)
    • Access individual sources: {{ source_datasets[0] }}, {{ source_datasets[1] }}, etc.
  • {{ target_dataset }}: Replaced with target dataset table name
  • {{ batch_id }}: Replaced with current batch ID

Requirements:

  • Engine must support SQL execution (execute_query() method)
  • SQL must be valid for the engine's dialect
Custom Script Tasks

Execute custom scripts (engine-specific).

[[tasks]]
id = "custom_processing"
type = "custom_script"
source_dataset = "processed_data"
target_dataset = "final_data"
script = """
# Custom Python script for Pandas engine
import pandas as pd
df = context.get_dataframe('processed_data')
df['processed'] = df['value'] * 2
context.save_dataframe('final_data', df)
"""

Requirements:

  • Engine must support script execution (execute_script() method)
  • Script format depends on engine type

Task Dependencies

Standard flows support both implicit and explicit task dependencies, enabling FAN IN and FAN OUT patterns.

Implicit Dependencies: Tasks automatically depend on datasets produced by other tasks:

Task A (target: dataset_1)

Task B (source: dataset_1, target: dataset_2)

Task C (source: dataset_2, target: dataset_3)

Explicit Dependencies (FAN IN and FAN OUT):

You can explicitly define task dependencies to support complex patterns:

FAN OUT Pattern (One task → Multiple tasks):

[[tasks]]
id = "ingest_data"
type = "ingestion"
target_dataset = "raw_data"
downstream_tasks = ["process_data", "validate_data"] # FAN OUT
[tasks.config]
path = "data/input.csv"
format = "csv"

FAN IN Pattern (Multiple tasks → One task):

[[tasks]]
id = "merge_data"
type = "sql_processing"
source_datasets = ["processed_data", "validated_data"] # Multiple sources
target_dataset = "merged_data"
dependencies = ["process_data", "validate_data"] # Explicit dependencies
sql = """
SELECT * FROM {{ source_datasets[0] }}
UNION ALL
SELECT * FROM {{ source_datasets[1] }}
"""

Dependency Fields:

  • dependencies: List of task IDs this task depends on (FAN IN support)
  • downstream_tasks: List of task IDs that depend on this task (FAN OUT support)
  • source_datasets: List of source dataset IDs (for multi-source tasks, FAN IN)

Multiple Source Datasets: When a task needs to combine data from multiple sources, use source_datasets:

[[tasks]]
id = "join_data"
type = "sql_processing"
source_datasets = ["table_a", "table_b"] # Multiple sources
target_dataset = "joined_data"
dependencies = ["task_a", "task_b"] # All upstream tasks
sql = """
SELECT a.*, b.*
FROM {{ source_datasets[0] }} a
JOIN {{ source_datasets[1] }} b ON a.id = b.id
"""

Accessing Multiple Source Datasets:

You can access datasets by ID using the source_dataset() function:

sql = """
SELECT
a.id,
a.name,
b.value
FROM {{ source_dataset('processed_data') }} a
JOIN {{ source_dataset('validated_data') }} b ON a.id = b.id
"""

Benefits of source_dataset() function:

  • ✅ More readable and maintainable than {{ source_datasets[0] }}
  • ✅ Self-documenting (dataset ID is clear)
  • ✅ Order-independent (no need to worry about list order)

Alternative access methods (for backward compatibility):

  • {{ source_datasets[0] }}, {{ source_datasets[1] }} - Access by index
  • {{ source_datasets_dict['dataset_id'] }} - Access by ID from dictionary

Dependencies are resolved automatically when generating the execution DAG, combining both implicit (dataset-based) and explicit (task-based) dependencies.

Pattern Rationale

Why Task-Based Approach?

  • Flexibility: Mix different task types in one flow
  • Direct Execution: No abstraction overhead for simple operations
  • Engine-Specific: Leverage engine-specific features directly
  • Simplicity: Easier to understand and debug than instruction sets

Why Not Instruction Sets?

  • Overhead: Instruction set conversion adds complexity
  • Limitations: Some operations are easier to express directly
  • Engine Features: Direct execution allows using engine-specific features
  • Performance: Direct execution can be more efficient

Trade-offs:

  • ✅ More flexible and direct
  • ✅ Better for SQL-heavy workflows
  • ✅ Easier to integrate existing SQL scripts
  • ❌ Less portable (engine-specific)
  • ❌ No automatic engine abstraction
  • ❌ Requires compatible engines

Example

# flows/sql_pipeline.toml
id = "sql_pipeline"
name = "SQL Processing Pipeline"
flow_type = "standard"
namespace = "raw"

# Flow-level variables
[variables]
min_amount = 0.01
status = "completed"
lookback_days = 30

[[tasks]]
id = "ingest_orders"
type = "ingestion"
name = "Ingest Orders"
target_dataset = "orders_raw"
[tasks.config]
# Variables can be used in file paths
path = "data/orders_{{ batch_id }}.csv"
format = "csv"

[[tasks]]
id = "clean_orders"
type = "sql_processing"
name = "Clean Orders"
source_dataset = "orders_raw"
target_dataset = "orders_clean"
# Use DBT-like variable syntax
sql = """
INSERT INTO {{ target_dataset }}
SELECT
order_id,
customer_id,
TRIM(order_date) AS order_date,
CAST(amount AS DECIMAL(10,2)) AS amount
FROM {{ source_dataset }}
WHERE order_id IS NOT NULL
AND amount >= {{ var("min_amount") }}
"""

[[tasks]]
id = "aggregate_sales"
type = "sql_processing"
name = "Aggregate Sales"
source_dataset = "orders_clean"
target_dataset = "daily_sales"
# Task-specific variable override
[tasks.variables]
lookback_days = 60
sql = """
INSERT INTO {{ target_dataset }}
SELECT
DATE(order_date) AS sale_date,
COUNT(*) AS order_count,
SUM(amount) AS total_sales,
AVG(amount) AS avg_order_value
FROM {{ source_dataset }}
WHERE order_date >= DATE('now', '-{{ var("lookback_days") }} days')
AND status = '{{ var("status") }}'
GROUP BY DATE(order_date)
"""

This creates:

  • orders_raw: Raw ingested data
  • orders_clean: Cleaned and validated data
  • daily_sales: Aggregated daily statistics

Variables and Templating

Standard flows support variables and templating for dynamic configuration. Variables are organized into categories:

1. Built-in Runtime Variables:

These variables are automatically available in all tasks:

  • {{ source_dataset }}: Source dataset table name
  • {{ target_dataset }}: Target dataset table name
  • {{ batch_id }}: Current batch ID for this execution

2. Execution Date and Time Variables:

When an execution date is provided, these date/time variables are automatically available:

  • {{ ds }}: Execution date as YYYY-MM-DD (e.g., "2024-01-15")
  • {{ ds_nodash }}: Execution date as YYYYMMDD (e.g., "20240115")
  • {{ ts }}: Execution timestamp as YYYY-MM-DDTHH:MM:SS
  • {{ ts_nodash }}: Execution timestamp as YYYYMMDDTHHMMSS
  • {{ yesterday_ds }}: Previous day as YYYY-MM-DD
  • {{ tomorrow_ds }}: Next day as YYYY-MM-DD
  • {{ prev_ds }}: Alias for yesterday_ds
  • {{ next_ds }}: Alias for tomorrow_ds
  • {{ execution_date }}: Full datetime object

3. Custom Variables:

You can define custom variables at two levels:

  • Flow-level variables: Defined in [variables] section, available to all tasks
  • Task-level variables: Defined in [tasks.variables], override flow-level variables

Variable Syntax:

  • {{ var('variable_name') }}: Access a variable (raises error if not found)
  • {{ var('variable_name', 'default_value') }}: Access with default value
  • {{ variable_name }}: Direct variable reference (shorthand, when variable is in context)

Template Features:

  • Jinja2 syntax: Full Jinja2 templating support
  • Built-in filters: upper, lower, default
  • Template rendering: All SQL, scripts, and config paths support templating

Example with Variables:

[variables]
threshold = 100
status = "active"
date_format = "%Y-%m-%d"

[[tasks]]
id = "filtered_query"
type = "sql_processing"
[tasks.variables]
threshold = 200 # Overrides flow-level threshold
sql = """
SELECT * FROM {{ source_dataset }}
WHERE value > {{ var("threshold") }}
AND status = '{{ var("status") }}'
AND created_at >= DATE('now', '-{{ var("lookback_days", 30) }} days')
"""

Custom Script Templates:

Custom script tasks also support templating:

[[tasks]]
id = "custom_transform"
type = "custom_script"
[tasks.variables]
multiplier = 2.5
script = """
# Python script with variables
import pandas as pd
df = context.get_dataframe('{{ source_dataset }}')
df['value'] = df['value'] * {{ var("multiplier") }}
df['status'] = '{{ var("status") }}'
df['execution_date'] = '{{ ds }}'
context.save_dataframe('{{ target_dataset }}', df)
"""

Execution Date Use Cases:

Execution date variables are useful for:

  • Date Partitioning: Process data for specific dates
  • Backfilling: Re-run historical data processing
  • Time-based Filtering: Filter data by execution date
  • File Paths: Use in file paths for date-partitioned data

Example:

[[tasks]]
id = "daily_aggregation"
type = "sql_processing"
source_dataset = "raw_data"
target_dataset = "daily_stats"
sql = """
INSERT INTO {{ target_dataset }}
SELECT
DATE(created_at) AS date,
COUNT(*) AS records,
SUM(amount) AS total
FROM {{ source_dataset }}
WHERE DATE(created_at) = '{{ ds }}'
GROUP BY DATE(created_at)
"""

[[tasks]]
id = "ingest_daily_data"
type = "ingestion"
target_dataset = "daily_data"
[tasks.config]
# Use execution date in file path
path = "data/input/{{ ds_nodash }}.csv"
format = "csv"

Execution Date in Flow Execution:

Execution dates can be passed when executing flows:

from datetime import datetime
from flows import FlowExecutionService

service = FlowExecutionService(engine, transformation_service)

# Plan with execution date
plan = service.plan_flow_execution(
flow_definition=flow_def,
datasets=datasets,
batch_id=1,
execution_date=datetime(2024, 1, 15) # Process data for Jan 15, 2024
)

# Execute with execution date
result = service.execute_flow(
execution_plan=plan,
batch_id=1,
execution_date=datetime(2024, 1, 15)
)

If not provided, execution date defaults to current date/time.

Execution Flow

  1. Ingestion Tasks: Execute first, loading data into target datasets
  2. SQL/Script Tasks: Execute in dependency order (based on dataset references)
  3. Error Handling: Failed tasks stop the pipeline (no automatic retry)

Engine Compatibility

EngineIngestionSQL ProcessingCustom Script
SQLite
PostgreSQL
MySQL
Pandas✅ (Python)
DuckDB

Note: Custom script support depends on engine implementation.


4. Growth Accounting Flow

Purpose: Analyze user growth by decomposing it into acquisitions, churn, and resurrections.

Pattern: Growth decomposition pattern with time-period analysis.

When to Use

Use growth accounting flows when you need to:

  • Understand the true drivers of user growth
  • Identify which experiences reduce churn
  • Discover what brings inactive users back
  • Focus on sustainable growth metrics
  • Make data-driven decisions about retention and reactivation

Use Cases

  • User Analytics: Analyze user base dynamics over time
  • Product Growth: Understand growth drivers beyond acquisition
  • Retention Analysis: Identify churn patterns and reactivation opportunities
  • Growth Strategy: Make informed decisions about where to invest resources

How It Works

Growth accounting decomposes user growth using the formula:

Net Growth = Acquisitions - Churn + Resurrections

Processing Steps:

  1. Activity Periods: Aggregate user activity by time period (daily/weekly/monthly)
  2. Acquisitions: Identify new users who engaged for the first time
  3. Churn: Identify users who stopped being active (based on threshold)
  4. Resurrections: Identify users who were previously churned but returned
  5. Growth Summary: Calculate net growth and active user counts

Key Insight: A 1% improvement in churn or resurrections can have twice the impact of a 1% lift in acquisition.

Example

# flows/user_growth.toml
id = "user_growth"
name = "User Growth Analysis"
flow_type = "growth_accounting"
namespace = "analytics"

[input]
columns = [
{name = "user_id", schema_type = "string", required = true},
{name = "timestamp", schema_type = "timestamp", required = true},
{name = "activity_type", schema_type = "string", required = false}
]
primary_key = ["user_id", "timestamp"]

[properties]
user_id_column = "user_id"
timestamp_column = "timestamp"
period_type = "daily" # daily, weekly, or monthly
churn_threshold_days = 30

Generated Datasets:

  • user_growth_activity_periods: User activity by time period
  • user_growth_acquisitions: New user acquisitions per period
  • user_growth_churn: User churn per period
  • user_growth_resurrections: User resurrections per period
  • user_growth_growth_summary: Growth accounting summary

Growth Summary Columns:

  • period: Time period (date)
  • acquisitions: Count of new users
  • churn: Count of users who churned
  • resurrections: Count of users who returned
  • net_growth: Acquisitions - Churn + Resurrections
  • active_users: Total active users in period

Execution Flow

  1. Activity Periods: Aggregate user activity data by configured period
  2. Acquisitions: Identify first-time users per period
  3. Churn: Identify users who stopped being active (based on threshold)
  4. Resurrections: Match churned users who became active again
  5. Growth Summary: Aggregate all metrics and calculate net growth

Configuration

Required Properties:

  • user_id_column: User identifier column name (default: "user_id")
  • timestamp_column: Timestamp column name (default: "timestamp")

Optional Properties:

  • period_type: Analysis period - "daily", "weekly", or "monthly" (default: "daily")
  • churn_threshold_days: Days of inactivity to consider churned (default: 30)

Benefits

  • ✅ Provides actionable insights into growth drivers
  • ✅ Shifts focus from acquisition to retention and reactivation
  • ✅ Enables data-driven decisions about product improvements
  • ✅ Identifies high-impact opportunities (churn reduction, reactivation)
  • ✅ Measures sustainable growth, not just user count

5. Sessionization Flow

Purpose: Group events into sessions based on user and time-based rules.

Pattern: Time-window grouping pattern with configurable session rules.

When to Use

Use sessionization flows when you need to:

  • Group events by user into sessions
  • Define sessions based on time windows or gaps
  • Analyze user behavior within sessions
  • Build session-based analytics
  • Process event streams into sessionized data

Use Cases

  • Web Analytics: Group page views into user sessions
  • Mobile App Analytics: Track user sessions in mobile apps
  • IoT Data: Group sensor readings into sessions
  • Customer Journey: Analyze customer interactions within sessions
  • Fraud Detection: Identify suspicious session patterns

How It Works

  1. Landing Table: Raw events with user identifier and timestamp
  2. Sessions Table: Grouped sessions with:
    • Session identifier
    • User identifier
    • Session start/end times
    • Session attributes (first event, last event, event count, etc.)

Session Rules

Sessions are created based on configurable rules:

Time Window (Session Timeout)

Events within a time window belong to the same session.

[properties]
session_timeout_seconds = 1800 # 30 minutes

Logic: If time between events > timeout, start new session.

Gap Threshold

Events separated by more than a gap threshold start a new session.

[properties]
session_gap_threshold_seconds = 600 # 10 minutes

Logic: If gap between events > threshold, start new session.

Note: You can use either timeout or gap threshold, or both.

Pattern Rationale

Why Sessionization?

  • User Behavior Analysis: Understand how users interact over time
  • Event Grouping: Transform event streams into meaningful sessions
  • Analytics: Enable session-based metrics (session duration, events per session)
  • Business Logic: Many business processes operate on session-level data

Why Time-Based Rules?

  • Flexibility: Different use cases need different session definitions
  • Configurability: Can adjust session boundaries without code changes
  • Real-World Patterns: Matches how users actually behave (sessions end after inactivity)

Why Not Fixed Windows?

  • Fixed windows (e.g., hourly sessions) don't match user behavior
  • Users don't align to arbitrary time boundaries
  • Gap-based sessions are more intuitive and accurate

Example

# flows/user_sessions.toml
id = "user_sessions"
name = "User Session Tracking"
flow_type = "sessionization"

[input]
columns = ["event_id", "user_id", "timestamp", "event_type", "page_url"]

[properties]
user_id_field = "user_id"
timestamp_field = "timestamp"
session_timeout_seconds = 1800 # 30 minutes

This creates:

  • user_sessions_landing: Raw events
  • user_sessions_sessions: Grouped sessions with session_id, start_time, end_time, event_count

Transformation Logic

The transformation groups events using window functions:

-- Simplified logic
WITH ranked_events AS (
SELECT *,
LAG(timestamp) OVER (PARTITION BY user_id ORDER BY timestamp) AS prev_timestamp
FROM landing
WHERE xt_batch_id = :batch_id
),
session_boundaries AS (
SELECT *,
CASE
WHEN prev_timestamp IS NULL THEN 1 -- First event
WHEN timestamp - prev_timestamp > :session_timeout THEN 1 -- Gap detected
ELSE 0
END AS is_session_start
FROM ranked_events
),
session_ids AS (
SELECT *,
SUM(is_session_start) OVER (PARTITION BY user_id ORDER BY timestamp) AS session_number
FROM session_boundaries
)
SELECT
user_id,
CONCAT(user_id, '_', session_number) AS session_id,
MIN(timestamp) AS session_start,
MAX(timestamp) AS session_end,
COUNT(*) AS event_count
FROM session_ids
GROUP BY user_id, session_number

Design Patterns Explained

Why Plugin Architecture?

Problem: Different flow types need completely different transformation logic.

Solution: Plugin pattern allows each flow type to encapsulate its own logic.

Benefits:

  • Extensibility: Add new flow types without modifying core code
  • Isolation: Flow-specific bugs don't affect other flows
  • Testability: Each plugin can be tested independently
  • Maintainability: Changes to one flow type don't impact others

Alternative (Not Used): Single class with if/else statements

  • Would lead to massive, unmaintainable code
  • Hard to test individual flow types
  • Violates Open/Closed Principle

Why Engine-Agnostic Instructions?

Problem: Need to support multiple execution engines (SQL, Pandas, PySpark).

Solution: Transformation instructions describe "what" to do, not "how".

Benefits:

  • Portability: Same flow works with different engines
  • Code Generation: Can generate SQL, Python, or other code
  • Testing: Can test logic without specific engine
  • Future-Proof: Easy to add new engines

Alternative (Not Used): Generate engine-specific code directly

  • Would require separate code paths for each engine
  • Hard to maintain consistency across engines
  • Can't easily switch engines

Why State View for Change Feed?

Problem: Need to accurately detect changes even when batches are missing or out of order.

Solution: Compare against latest known state, not just previous batch.

Benefits:

  • Robustness: Handles missing batches gracefully
  • Accuracy: Always compares against true latest state
  • Idempotency: Reprocessing batches produces same result
  • Flexibility: Can process batches in any order

Alternative (Not Used): Simple batch-to-batch comparison

  • Fails when batches are skipped
  • Can't handle out-of-order processing
  • Less accurate change detection

Why Transaction Pattern for Delta Publishing?

Problem: Need to support reversals and corrections in financial systems.

Solution: Generate explicit transaction records with types and references.

Benefits:

  • Reversibility: Can undo any transaction
  • Auditability: Complete transaction history
  • Referential Integrity: Transactions reference each other
  • Compliance: Matches accounting practices

Alternative (Not Used): Just update records in place

  • Can't track reversals
  • No audit trail
  • Doesn't match financial system requirements

Why Time-Based Sessionization?

Problem: Need to group events into meaningful sessions that match user behavior.

Solution: Configurable time windows and gap thresholds.

Benefits:

  • Flexibility: Different use cases need different rules
  • Accuracy: Matches real user behavior
  • Configurability: Adjust without code changes
  • Business Logic: Aligns with how businesses think about sessions

Alternative (Not Used): Fixed time windows (hourly, daily)

  • Doesn't match user behavior
  • Arbitrary boundaries
  • Less accurate session definitions

Choosing the Right Flow Type

Decision Tree

Do you need to track changes over time?
├─ Yes → Do you need financial transactions?
│ ├─ Yes → Use Delta Publishing
│ │ └─ Choose DELTA_METHOD:
│ │ ├─ Need complete audit trail? → REVERSE_REPLAY
│ │ └─ Need efficiency? → DELTA_CORRECTION
│ └─ No → Use Change Feed

└─ No → Do you need to group events by user?
├─ Yes → Use Sessionization
└─ No → Do you need direct SQL/script execution?
├─ Yes → Use Standard Flow
└─ No → Consider custom flow type

Comparison Table

FeatureChange FeedDelta PublishingSessionizationStandard Flow
Primary Use CaseChange trackingFinancial transactionsEvent groupingTask-based pipelines
Output Tables2 (landing, change_feed)3 (landing, change_feed, delta_transaction)2 (landing, sessions)Variable (task-defined)
Change Classification✅ (NEW, CHANGED, DELETED, STALE, REACTIVATED)✅ (same as change feed)
Transaction Types✅ (INITIAL_BOOKING, REVERSAL, etc.)
Session Grouping
Time-Based Rules
State Comparison
Reversibility
Audit Trail✅✅ (enhanced)
Direct SQL Execution
Custom Scripts
Engine Portability❌ (engine-specific)
Instruction Sets❌ (direct execution)

Flow Execution

Execution Order

Flows execute in topological order (DAG):

  1. Ingestion Steps: Load data from files into landing tables
  2. Transformation Steps: Transform between datasets (in dependency order)

Batch Processing

Each flow processes data in batches:

  • Each batch has a unique batch_id
  • Previous batch ID is used for change detection
  • Transformations are idempotent (can reprocess safely)

Error Handling

  • Validation: Flow definitions are validated before execution
  • Rollback: Errors trigger transaction rollback
  • Logging: Comprehensive error logging for debugging
  • Recovery: Can reprocess failed batches

Extending Flows

Creating a Custom Flow Type

  1. Create Plugin Class: Inherit from FlowPlugin
  2. Implement Methods:
    • generate_datasets(): Define output schemas
    • generate_dag(): Define execution order
    • generate_transformation_instructions(): Define transformation logic
  3. Register Plugin: Use register_flow_plugin()

See Developer Guide for detailed instructions.

Best Practices

Flow Design

  1. Clear Primary Keys: Always define primary keys for accurate change detection
  2. Descriptive Names: Use clear, descriptive names for flows and datasets
  3. Documentation: Add descriptions to flow definitions
  4. Validation: Validate flow definitions before deployment

Performance

  1. Batch Size: Process data in appropriate batch sizes
  2. Indexing: Ensure primary keys are indexed
  3. Partitioning: Consider partitioning large tables
  4. Monitoring: Monitor batch processing times

Maintenance

  1. Version Control: Keep flow definitions in version control
  2. Testing: Test flows with sample data before production
  3. Documentation: Document flow-specific business logic
  4. Monitoring: Monitor flow execution and data quality