Flow Types Reference
Complete reference for available flow types in Qarion ETL.
Processing Types
All flows support two processing strategies:
- INCREMENTAL: Processes only new/changed data since last run (default for most flows)
- FULL_REFRESH: Rebuilds entire dataset from scratch (can be forced for any flow)
Forcing Full Refresh:
- Set
force_full_refresh=Truein execution API - Set
previous_batch_id=Nonewhen executing - Useful for reprocessing historical data or fixing data issues
See Batch Processing and Incrementality for detailed information.
Flow Types Overview
Qarion ETL supports multiple flow types, each designed for specific data processing patterns. The following table provides a quick overview:
| Flow Type | Primary Purpose | Category | Key Use Cases | Main Output |
|---|---|---|---|---|
| Change Feed | Change Detection | Transformation | Track data changes, audit trails, incremental processing | Change feed table with record classifications (NEW, CHANGED, DELETED, etc.) |
| Delta Publishing | Transaction Processing | Integration | Financial transactions, accounting systems, transaction history | Delta transaction table with merge operations |
| Export Flow | Data Export | Export | Export data to files, APIs, databases with different modes | External files (CSV, JSON, Parquet) |
| Sessionization | Event Grouping | Event Processing | Web analytics, user behavior analysis, event grouping | Sessionized events with session IDs |
| Growth Accounting | Growth Analysis | Analytics | User growth metrics, churn analysis, retention | Growth summary with acquisitions, churn, resurrections |
| Outbox | Event Publishing | Integration | Event-driven architectures, reliable message delivery | Outbox table with events ready for publishing |
| SCD2 | Historical Tracking | Transformation | Dimension history, point-in-time queries, data warehousing | SCD2 table with effective dates and versioning |
| Standard | Custom Pipelines | Transformation | Flexible task-based pipelines, SQL processing, custom scripts | User-defined datasets via tasks |
Flow Categories
Integration Flows
- Delta Publishing: Processes financial transactions with merge operations
- Outbox: Ensures reliable event publishing in distributed systems
Transformation Flows
- Change Feed: Tracks and classifies data changes over time
- SCD2: Maintains historical versions of dimension data
- Standard: Flexible task-based transformation pipelines
Export Flows
- Export Flow: Exports data from datasets to external destinations with multiple modes
Event Processing Flows
- Sessionization: Groups events into sessions based on time windows
Analytics Flows
- Growth Accounting: Analyzes user growth by decomposing into components
Choosing the Right Flow Type
- Need to track what changed? → Use Change Feed Flow
- Processing financial transactions? → Use Delta Publishing Flow
- Need to export data to files? → Use Export Flow
- Grouping events into sessions? → Use Sessionization Flow
- Analyzing user growth? → Use Growth Accounting Flow
- Publishing events reliably? → Use Outbox Flow
- Tracking dimension history? → Use SCD2 Flow
- Need custom transformations? → Use Standard Flow
Change Feed Flow
Type: change_feed
Processing Type: INCREMENTAL (default) - Can be forced to FULL_REFRESH
Tracks changes to data over time by comparing batches and classifying records.
When to Use
- Track what changed between data loads
- Identify new, modified, deleted, or unchanged records
- Maintain a history of all changes
- Build audit trails or change tracking systems
- Support incremental processing based on changes
Classification Types
- NEW: First appearance (not in historical state)
- CHANGED: Exists in history but values differ from latest state
- DELETED: Exists in history but not in current batch
- STALE: Exists in both with identical values to latest state
- REACTIVATED: Previously deleted record that reappears
Example
id = "orders_change_feed"
name = "Orders Change Feed"
flow_type = "change_feed"
[input]
primary_key = ["order_id"]
columns = ["order_id", "customer_id", "order_date", "total_amount", "status"]
[properties.load]
source_path = "data/orders"
file_pattern = "orders_*.csv"
Export Flow
Type: export_flow
Processing Type: N/A (exports from existing datasets)
Exports data from existing datasets (change_feed, delta_transaction, etc.) to external destinations with support for different export modes.
When to Use
- Export processed data to files for downstream consumption
- Export only changed data (incremental exports)
- Export specific batches for reprocessing
- Export data in different formats (CSV, JSON, Parquet)
- Integrate with external systems that consume files
Export Modes
- full: Export all data from the source dataset
- batch: Export only data from a specific batch_id
- incremental: Export only new and changed records (for change_feed)
- changes_only: Export only changed records (new, changed, deleted)
Example
id = "export_orders"
name = "Export Orders"
flow_type = "export_flow"
[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders_changes.csv"
export_mode = "incremental"
format = "csv"
exporter_config = {
delimiter = ","
include_header = true
}
Integration with Change Feed
Export only new and changed records from a change_feed dataset:
[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders_incremental_{batch_id}.csv"
export_mode = "incremental"
format = "csv"
Integration with Delta Publishing
Export delta transactions:
[properties.export]
source_dataset_id = "delta_transaction_orders"
file_path = "exports/delta_transactions.parquet"
export_mode = "full"
format = "parquet"
exporter_config = {
compression = "snappy"
}
See Export Flow Documentation for complete documentation.
Delta Publishing Flow
Type: delta_publishing
Processing Type: INCREMENTAL (default) - Can be forced to FULL_REFRESH
Processes financial transactions with merge operations, supporting incremental processing.
When to Use
- Process financial transactions
- Handle incremental updates
- Maintain transaction history
- Build data warehouses for financial data
Datasets Created
- Landing Table: Receives raw batch data
- Change Feed Table: Tracks changes
- Delta Transaction Table: Final transaction state
Example
id = "transactions_delta"
name = "Transactions Delta Publishing"
flow_type = "delta_publishing"
[input]
primary_key = ["transaction_id"]
columns = ["transaction_id", "account_id", "amount", "transaction_date", "type"]
[properties]
namespace = "finance"
Sessionization Flow
Type: sessionization
Processing Type: FULL_REFRESH (always processes all events)
Groups events into sessions based on time windows and user identifiers.
When to Use
- Group events into sessions
- Analyze user behavior
- Process web analytics data
- Identify user sessions
Properties
session_timeout_minutes: Time window for session grouping (default: 30)
Example
id = "user_sessions"
name = "User Sessionization"
flow_type = "sessionization"
[input]
primary_key = ["event_id"]
columns = ["event_id", "user_id", "event_time", "event_type", "page_url"]
[properties]
session_timeout_minutes = 30
Growth Accounting Flow
Type: growth_accounting
Processing Type: FULL_REFRESH (recalculates all periods)
Purpose: Analyzes user growth by decomposing it into acquisitions, churn, and resurrections.
When to Use:
- You need to understand the true drivers of user growth
- You want to identify which experiences reduce churn
- You need to discover what brings inactive users back
- You want to focus on sustainable growth metrics
Important Notes:
- Requires user activity data with user identifiers and timestamps
- Supports daily, weekly, or monthly period analysis
- Configurable churn threshold (days of inactivity)
- Growth formula: Net Growth = Acquisitions - Churn + Resurrections
Required Input:
user_id: User identifier columntimestamp: Activity timestamp column- Optional: Additional activity metadata columns
Configuration Properties:
user_id_column: Name of user identifier column (default: "user_id")timestamp_column: Name of timestamp column (default: "timestamp")period_type: Analysis period - "daily", "weekly", or "monthly" (default: "daily")churn_threshold_days: Days of inactivity to consider churned (default: 30)
Generated Datasets:
{flow_id}_activity_periods: User activity by time period{flow_id}_acquisitions: New user acquisitions per period{flow_id}_churn: User churn per period{flow_id}_resurrections: User resurrections per period{flow_id}_growth_summary: Growth accounting summary with net growth
Example:
id = "user_growth"
name = "User Growth Analysis"
flow_type = "growth_accounting"
namespace = "analytics"
[input]
columns = [
{name = "user_id", schema_type = "string", required = true},
{name = "timestamp", schema_type = "timestamp", required = true}
]
primary_key = ["user_id", "timestamp"]
[properties]
period_type = "daily"
churn_threshold_days = 30
Growth Metrics:
- Acquisitions: Count of new users per period
- Churn: Count of users who stopped being active per period
- Resurrections: Count of previously churned users who returned per period
- Net Growth: Acquisitions - Churn + Resurrections
- Active Users: Total active users per period
Outbox Flow
Type: outbox
Processing Type: INCREMENTAL (default) - Always inserts new events, can be forced to FULL_REFRESH
Ensures reliable message delivery in distributed systems by writing events to an outbox table as part of the same transaction as source data. A separate publisher process reads from the outbox and publishes messages.
When to Use
- Build event-driven architectures with reliable message delivery
- Ensure events are never lost (atomic writes with source data)
- Support exactly-once or at-least-once message publishing
- Implement event sourcing patterns
- Decouple event publishing from business logic
- Enable retry mechanisms for failed message publishing
- Build microservices with reliable inter-service communication
How It Works
The outbox pattern solves the "dual-write problem" in distributed systems where you need to:
- Write to a database (source data)
- Publish an event (message queue/stream)
Instead of doing both in separate transactions (which can fail independently), the outbox pattern:
- Writes source data and event to outbox in the same transaction (atomic)
- A separate publisher process reads from the outbox and publishes messages
- Updates outbox status to "published" after successful publishing
- Supports retries for failed publishes
Key Concepts
- Event Type: Type of event (e.g., 'order_created', 'user_updated')
- Aggregate ID: ID of the aggregate root (e.g., order_id, user_id)
- Payload: Event data as JSON
- Status: Tracks event state: 'pending', 'published', 'failed'
- Retry Count: Number of publish attempts
- Idempotency: Supports idempotent message publishing
Datasets Created
-
Landing Table (
{flow_id}_landing): Receives raw input data/events- All input columns
xt_batch_id: Identifies the batch/snapshottimestamp: Ingestion timestamp- Primary key:
(input_columns, xt_batch_id)
-
Outbox Table (
{flow_id}_outbox): Events to be publishedoutbox_id: Surrogate key (PRIMARY KEY, auto-incrementing)event_type: Type of event (extracted from input)aggregate_id: ID of the aggregate root (extracted from input)payload: Event payload as JSON (all or selected input columns)status: Status ('pending', 'published', 'failed')created_at: When the event was createdpublished_at: When the event was published (nullable)retry_count: Number of publish attemptserror_message: Error message if publish failed (nullable)xt_batch_id: Batch that created this event
Required Input
columns: List of input column definitionsproperties.event_type_column: Name of column containing event type (default: 'event_type')properties.aggregate_id_column: Name of column containing aggregate ID (default: 'id')properties.payload_columns: List of columns to include in payload (empty = all columns)
Configuration Properties
event_type_column: Name of column in input that contains event type (default: "event_type")aggregate_id_column: Name of column in input that contains aggregate ID (default: "id")payload_columns: List of column names to include in payload JSON (empty list = all columns)
Example
id = "order_events_outbox"
name = "Order Events Outbox"
flow_type = "outbox"
namespace = "events"
[input]
columns = [
{name = "order_id", schema_type = "string", required = true},
{name = "event_type", schema_type = "string", required = true},
{name = "customer_id", schema_type = "string", required = false},
{name = "amount", schema_type = "float", required = false},
{name = "status", schema_type = "string", required = false},
{name = "created_at", schema_type = "datetime", required = false}
]
[properties]
event_type_column = "event_type"
aggregate_id_column = "order_id"
payload_columns = [] # Empty means all columns
Publisher Process
The outbox table is populated by the flow, but a separate publisher process is needed to:
- Read pending events: Query
SELECT * FROM {flow_id}_outbox WHERE status = 'pending' ORDER BY created_at - Publish messages: Send to message queue/stream (Kafka, RabbitMQ, SQS, etc.)
- Update status: Set
status = 'published',published_at = NOW()on success - Handle failures: Set
status = 'failed', incrementretry_count, storeerror_message - Retry logic: Implement retry with exponential backoff for failed events
Querying Outbox Data
Get pending events:
SELECT * FROM order_events_outbox
WHERE status = 'pending'
ORDER BY created_at
LIMIT 100
Get published events:
SELECT * FROM order_events_outbox
WHERE status = 'published'
ORDER BY published_at DESC
Get failed events for retry:
SELECT * FROM order_events_outbox
WHERE status = 'failed'
AND retry_count < 5
ORDER BY created_at
Mark event as published:
UPDATE order_events_outbox
SET status = 'published',
published_at = NOW()
WHERE outbox_id = :outbox_id
Important Notes
- Events are written atomically with source data (same transaction)
- Events start with
status = 'pending' - Publisher process is separate from the flow (not included in flow execution)
- Supports idempotent publishing (check if already published before publishing)
- Retry logic should be implemented in the publisher process
- Consider implementing dead-letter queue for events that fail after max retries
SCD2 Flow
Type: scd2
Processing Type: INCREMENTAL (default) - Can be forced to FULL_REFRESH
Tracks historical changes to dimension data by creating new records with effective dates when attributes change, maintaining a complete history.
When to Use
- Track historical changes to dimension data (customer, product, employee, etc.)
- Maintain complete audit trail of all attribute changes
- Support point-in-time queries (e.g., "What was the customer's address on Jan 1st?")
- Build data warehouses with slowly changing dimensions
- Preserve history when business keys change over time
- Enable time-based analysis and reporting
How It Works
SCD2 (Slowly Changing Dimension Type 2) is a data warehousing pattern that tracks historical changes by creating new records instead of updating existing ones. When a dimension attribute changes:
- The old record is closed by setting its
effective_end_date - The
is_currentflag is set tofalseon the old record - A new record is created with:
- New attribute values
effective_start_dateset to the change dateeffective_end_dateset toNULL(or far future date)is_currentflag set totrue
Key Concepts
- Business Key: The natural key from your input (defined by
primary_keyin the flow definition). Used to identify which records represent the same entity. - Surrogate Key: Auto-incrementing
scd2_idthat uniquely identifies each version of a record. - Effective Dates:
effective_start_dateandeffective_end_datedefine when each version was valid. - Current Record: Only one record per business key has
is_current = trueat any time.
Datasets Created
-
Landing Table (
{flow_id}_landing): Receives raw batch data- All input columns
xt_batch_id: Identifies the batch/snapshottimestamp: Ingestion timestamp- Primary key:
(primary_key_columns, xt_batch_id)
-
SCD2 Table (
{flow_id}_scd2): Tracks historical changesscd2_id: Surrogate key (PRIMARY KEY, auto-incrementing)- All input columns (dimension attributes)
effective_start_date: When this version became effectiveeffective_end_date: When this version ended (NULL for current records)is_current: Boolean flag indicating if this is the current versionxt_batch_id: Batch that created this version
Required Input
primary_key: One or more columns that uniquely identify the dimension entity (used as business key)columns: List of dimension attribute columns
Example
id = "customer_scd2"
name = "Customer SCD2"
flow_type = "scd2"
namespace = "dimensions"
[input]
primary_key = ["customer_id"]
columns = [
{name = "customer_id", schema_type = "string", required = true},
{name = "customer_name", schema_type = "string", required = true},
{name = "email", schema_type = "string", required = false},
{name = "address", schema_type = "string", required = false},
{name = "status", schema_type = "string", required = false}
]
Querying SCD2 Data
Get current version of all records:
SELECT * FROM customer_scd2 WHERE is_current = true
Get point-in-time snapshot:
SELECT * FROM customer_scd2
WHERE effective_start_date <= '2024-01-15'
AND (effective_end_date IS NULL OR effective_end_date > '2024-01-15')
Get all versions of a specific customer:
SELECT * FROM customer_scd2
WHERE customer_id = 'CUST001'
ORDER BY effective_start_date
Important Notes
- SCD2 flows require a
primary_keydefinition in the input (used as business key) - The business key is used to match records and determine when changes occur
- Only one record per business key has
is_current = trueat any time - Historical records are never deleted, only closed with an end date
- Supports complete audit trail of all changes over time
Standard Flow
Type: standard
Processing Type: Configurable per task - Can be FULL_REFRESH or INCREMENTAL depending on task configuration
Task-based flow that supports ingestion, SQL processing, and custom script tasks executed directly by engines.
When to Use
- Build flexible, task-based data pipelines
- Execute SQL directly on your engine
- Run custom scripts (engine-specific)
- Work with SQL-capable engines (SQLite, PostgreSQL, MySQL, etc.)
- Have fine-grained control over each pipeline step
Important Notes
- Standard flows execute tasks directly on engines, making them engine-specific
- SQL processing tasks require SQL-capable engines
- Custom script tasks require engines that support script execution
- Tasks are not converted to instruction sets, so they only work with compatible engines
Task Types
Ingestion Tasks
Load data from external sources (CSV, JSON, etc.).
[[tasks]]
id = "ingest_data"
type = "ingestion"
target_dataset = "raw_data"
[tasks.config]
path = "data/input.csv"
format = "csv"
delimiter = ","
header = true
SQL Processing Tasks
Execute SQL queries directly.
[[tasks]]
id = "process_data"
type = "sql_processing"
source_dataset = "raw_data"
target_dataset = "processed_data"
sql = """
INSERT INTO {target_dataset}
SELECT * FROM {source_dataset}
WHERE status = 'active'
"""
Variables and Templating:
Standard flows use Jinja2 templating with support for multiple variable types:
1. Built-in Runtime Variables:
{{ source_dataset }}: Source dataset table name{{ target_dataset }}: Target dataset table name{{ batch_id }}: Current batch ID
2. Execution Date and Time Variables: Available when an execution date is provided:
{{ ds }}: Execution date as YYYY-MM-DD (e.g., "2024-01-15"){{ ds_nodash }}: Execution date as YYYYMMDD (e.g., "20240115"){{ ts }}: Execution timestamp as YYYY-MM-DDTHH:MM:SS{{ ts_nodash }}: Execution timestamp as YYYYMMDDTHHMMSS{{ yesterday_ds }}: Previous day as YYYY-MM-DD{{ tomorrow_ds }}: Next day as YYYY-MM-DD{{ prev_ds }}: Alias foryesterday_ds{{ next_ds }}: Alias fortomorrow_ds{{ execution_date }}: Full datetime object
3. Custom Variables:
- Flow-level: Defined in
[variables]section, available to all tasks - Task-level: Defined in
[tasks.variables], override flow-level variables - Syntax:
{{ var('variable_name') }}or{{ var('variable_name', 'default') }} - Shorthand:
{{ variable_name }}(when variable is in context)
Custom Script Tasks
Execute custom scripts (engine-specific).
[[tasks]]
id = "custom_processing"
type = "custom_script"
source_dataset = "processed_data"
target_dataset = "final_data"
script = "# Custom script content"
Task Dependencies
Standard flows support both implicit and explicit task dependencies, enabling FAN IN and FAN OUT patterns.
Implicit Dependencies
Tasks automatically depend on datasets produced by other tasks. If task B's source_dataset matches task A's target_dataset, task B will automatically depend on task A.
Explicit Dependencies
You can explicitly define task dependencies to support complex patterns:
FAN OUT Pattern (One task → Multiple tasks):
[[tasks]]
id = "ingest_data"
type = "ingestion"
target_dataset = "raw_data"
downstream_tasks = ["process_data", "validate_data"] # FAN OUT
[tasks.config]
path = "data/input.csv"
format = "csv"
[[tasks]]
id = "process_data"
type = "sql_processing"
source_dataset = "raw_data"
target_dataset = "processed_data"
dependencies = ["ingest_data"] # Explicit dependency
sql = "SELECT * FROM {{ source_dataset }}"
FAN IN Pattern (Multiple tasks → One task):
[[tasks]]
id = "merge_data"
type = "sql_processing"
# Multiple source datasets (FAN IN)
source_datasets = ["processed_data", "validated_data"]
target_dataset = "merged_data"
# Explicit dependencies (FAN IN)
dependencies = ["process_data", "validate_data"]
sql = """
SELECT * FROM {{ source_datasets[0] }}
UNION ALL
SELECT * FROM {{ source_datasets[1] }}
"""
Dependency Fields
-
dependencies: List of task IDs this task depends on (FAN IN support)- Example:
dependencies = ["task_1", "task_2"] - Ensures this task waits for all listed tasks to complete
- Example:
-
downstream_tasks: List of task IDs that depend on this task (FAN OUT support)- Example:
downstream_tasks = ["task_3", "task_4"] - Declares that multiple tasks depend on this task
- Example:
-
source_datasets: List of source dataset IDs (for multi-source tasks, FAN IN)- Example:
source_datasets = ["dataset_a", "dataset_b"] - Use when a task needs to combine data from multiple sources
- Access in templates:
{{ source_datasets[0] }},{{ source_datasets[1] }}, etc.
- Example:
Multiple Source Datasets
When a task needs to combine data from multiple sources, use source_datasets:
[[tasks]]
id = "join_data"
type = "sql_processing"
source_datasets = ["table_a", "table_b", "table_c"] # Multiple sources
target_dataset = "joined_data"
dependencies = ["task_a", "task_b", "task_c"] # All upstream tasks
sql = """
SELECT
a.id,
a.name,
b.value,
c.category
FROM {{ source_dataset('table_a') }} a
JOIN {{ source_dataset('table_b') }} b ON a.id = b.id
JOIN {{ source_dataset('table_c') }} c ON a.id = c.id
"""
Note: When using source_datasets, you can still provide source_dataset for backward compatibility. The first item in source_datasets will be used as the primary source.
Accessing Multiple Source Datasets:
You can access datasets by ID using the source_dataset() function:
sql = """
SELECT
a.id,
a.name,
b.value
FROM {{ source_dataset('processed_data') }} a
JOIN {{ source_dataset('validated_data') }} b ON a.id = b.id
"""
Benefits of source_dataset() function:
- ✅ More readable and maintainable than
{{ source_datasets[0] }} - ✅ Self-documenting (dataset ID is clear)
- ✅ Order-independent (no need to worry about list order)
Alternative access methods (for backward compatibility):
{{ source_datasets[0] }},{{ source_datasets[1] }}- Access by index{{ source_datasets_dict['dataset_id'] }}- Access by ID from dictionary
Dependencies are resolved automatically when generating the execution DAG, combining both implicit (dataset-based) and explicit (task-based) dependencies.
Example
id = "data_pipeline"
name = "Data Processing Pipeline"
flow_type = "standard"
namespace = "raw"
# Flow-level variables (available to all tasks)
[variables]
min_value = 0
status_filter = "active"
lookback_days = 30
[[tasks]]
id = "ingest_raw_data"
type = "ingestion"
target_dataset = "raw_data"
[tasks.config]
# Variables can be used in file paths
path = "data/input_{{ batch_id }}.csv"
format = "csv"
[[tasks]]
id = "process_data"
type = "sql_processing"
source_dataset = "raw_data"
target_dataset = "processed_data"
# Task-specific variables (override flow-level)
[tasks.variables]
threshold = 100
sql = """
INSERT INTO {{ target_dataset }}
SELECT
id,
name,
UPPER(email) AS email
FROM {{ source_dataset }}
WHERE status = '{{ var("status_filter") }}'
AND value > {{ var("threshold") }}
"""
Engine Compatibility
| Engine | Ingestion | SQL Processing | Custom Script |
|---|---|---|---|
| SQLite | ✅ | ✅ | ❌ |
| PostgreSQL | ✅ | ✅ | ❌ |
| MySQL | ✅ | ✅ | ❌ |
| Pandas | ✅ | ❌ | ✅ (Python) |
| DuckDB | ✅ | ✅ | ❌ |