Skip to main content

Flow Types Reference

Complete reference for available flow types in Qarion ETL.

Processing Types

All flows support two processing strategies:

  • INCREMENTAL: Processes only new/changed data since last run (default for most flows)
  • FULL_REFRESH: Rebuilds entire dataset from scratch (can be forced for any flow)

Forcing Full Refresh:

  • Set force_full_refresh=True in execution API
  • Set previous_batch_id=None when executing
  • Useful for reprocessing historical data or fixing data issues

See Batch Processing and Incrementality for detailed information.

Flow Types Overview

Qarion ETL supports multiple flow types, each designed for specific data processing patterns. The following table provides a quick overview:

Flow TypePrimary PurposeCategoryKey Use CasesMain Output
Change FeedChange DetectionTransformationTrack data changes, audit trails, incremental processingChange feed table with record classifications (NEW, CHANGED, DELETED, etc.)
Delta PublishingTransaction ProcessingIntegrationFinancial transactions, accounting systems, transaction historyDelta transaction table with merge operations
Export FlowData ExportExportExport data to files, APIs, databases with different modesExternal files (CSV, JSON, Parquet)
SessionizationEvent GroupingEvent ProcessingWeb analytics, user behavior analysis, event groupingSessionized events with session IDs
Growth AccountingGrowth AnalysisAnalyticsUser growth metrics, churn analysis, retentionGrowth summary with acquisitions, churn, resurrections
OutboxEvent PublishingIntegrationEvent-driven architectures, reliable message deliveryOutbox table with events ready for publishing
SCD2Historical TrackingTransformationDimension history, point-in-time queries, data warehousingSCD2 table with effective dates and versioning
StandardCustom PipelinesTransformationFlexible task-based pipelines, SQL processing, custom scriptsUser-defined datasets via tasks

Flow Categories

Integration Flows

  • Delta Publishing: Processes financial transactions with merge operations
  • Outbox: Ensures reliable event publishing in distributed systems

Transformation Flows

  • Change Feed: Tracks and classifies data changes over time
  • SCD2: Maintains historical versions of dimension data
  • Standard: Flexible task-based transformation pipelines

Export Flows

  • Export Flow: Exports data from datasets to external destinations with multiple modes

Event Processing Flows

  • Sessionization: Groups events into sessions based on time windows

Analytics Flows

  • Growth Accounting: Analyzes user growth by decomposing into components

Choosing the Right Flow Type

  • Need to track what changed? → Use Change Feed Flow
  • Processing financial transactions? → Use Delta Publishing Flow
  • Need to export data to files? → Use Export Flow
  • Grouping events into sessions? → Use Sessionization Flow
  • Analyzing user growth? → Use Growth Accounting Flow
  • Publishing events reliably? → Use Outbox Flow
  • Tracking dimension history? → Use SCD2 Flow
  • Need custom transformations? → Use Standard Flow

Change Feed Flow

Type: change_feed

Processing Type: INCREMENTAL (default) - Can be forced to FULL_REFRESH

Tracks changes to data over time by comparing batches and classifying records.

When to Use

  • Track what changed between data loads
  • Identify new, modified, deleted, or unchanged records
  • Maintain a history of all changes
  • Build audit trails or change tracking systems
  • Support incremental processing based on changes

Classification Types

  • NEW: First appearance (not in historical state)
  • CHANGED: Exists in history but values differ from latest state
  • DELETED: Exists in history but not in current batch
  • STALE: Exists in both with identical values to latest state
  • REACTIVATED: Previously deleted record that reappears

Example

id = "orders_change_feed"
name = "Orders Change Feed"
flow_type = "change_feed"

[input]
primary_key = ["order_id"]
columns = ["order_id", "customer_id", "order_date", "total_amount", "status"]

[properties.load]
source_path = "data/orders"
file_pattern = "orders_*.csv"

Export Flow

Type: export_flow

Processing Type: N/A (exports from existing datasets)

Exports data from existing datasets (change_feed, delta_transaction, etc.) to external destinations with support for different export modes.

When to Use

  • Export processed data to files for downstream consumption
  • Export only changed data (incremental exports)
  • Export specific batches for reprocessing
  • Export data in different formats (CSV, JSON, Parquet)
  • Integrate with external systems that consume files

Export Modes

  • full: Export all data from the source dataset
  • batch: Export only data from a specific batch_id
  • incremental: Export only new and changed records (for change_feed)
  • changes_only: Export only changed records (new, changed, deleted)

Example

id = "export_orders"
name = "Export Orders"
flow_type = "export_flow"

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders_changes.csv"
export_mode = "incremental"
format = "csv"
exporter_config = {
delimiter = ","
include_header = true
}

Integration with Change Feed

Export only new and changed records from a change_feed dataset:

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders_incremental_{batch_id}.csv"
export_mode = "incremental"
format = "csv"

Integration with Delta Publishing

Export delta transactions:

[properties.export]
source_dataset_id = "delta_transaction_orders"
file_path = "exports/delta_transactions.parquet"
export_mode = "full"
format = "parquet"
exporter_config = {
compression = "snappy"
}

See Export Flow Documentation for complete documentation.

Delta Publishing Flow

Type: delta_publishing

Processing Type: INCREMENTAL (default) - Can be forced to FULL_REFRESH

Processes financial transactions with merge operations, supporting incremental processing.

When to Use

  • Process financial transactions
  • Handle incremental updates
  • Maintain transaction history
  • Build data warehouses for financial data

Datasets Created

  1. Landing Table: Receives raw batch data
  2. Change Feed Table: Tracks changes
  3. Delta Transaction Table: Final transaction state

Example

id = "transactions_delta"
name = "Transactions Delta Publishing"
flow_type = "delta_publishing"

[input]
primary_key = ["transaction_id"]
columns = ["transaction_id", "account_id", "amount", "transaction_date", "type"]

[properties]
namespace = "finance"

Sessionization Flow

Type: sessionization

Processing Type: FULL_REFRESH (always processes all events)

Groups events into sessions based on time windows and user identifiers.

When to Use

  • Group events into sessions
  • Analyze user behavior
  • Process web analytics data
  • Identify user sessions

Properties

  • session_timeout_minutes: Time window for session grouping (default: 30)

Example

id = "user_sessions"
name = "User Sessionization"
flow_type = "sessionization"

[input]
primary_key = ["event_id"]
columns = ["event_id", "user_id", "event_time", "event_type", "page_url"]

[properties]
session_timeout_minutes = 30

Growth Accounting Flow

Type: growth_accounting

Processing Type: FULL_REFRESH (recalculates all periods)

Purpose: Analyzes user growth by decomposing it into acquisitions, churn, and resurrections.

When to Use:

  • You need to understand the true drivers of user growth
  • You want to identify which experiences reduce churn
  • You need to discover what brings inactive users back
  • You want to focus on sustainable growth metrics

Important Notes:

  • Requires user activity data with user identifiers and timestamps
  • Supports daily, weekly, or monthly period analysis
  • Configurable churn threshold (days of inactivity)
  • Growth formula: Net Growth = Acquisitions - Churn + Resurrections

Required Input:

  • user_id: User identifier column
  • timestamp: Activity timestamp column
  • Optional: Additional activity metadata columns

Configuration Properties:

  • user_id_column: Name of user identifier column (default: "user_id")
  • timestamp_column: Name of timestamp column (default: "timestamp")
  • period_type: Analysis period - "daily", "weekly", or "monthly" (default: "daily")
  • churn_threshold_days: Days of inactivity to consider churned (default: 30)

Generated Datasets:

  1. {flow_id}_activity_periods: User activity by time period
  2. {flow_id}_acquisitions: New user acquisitions per period
  3. {flow_id}_churn: User churn per period
  4. {flow_id}_resurrections: User resurrections per period
  5. {flow_id}_growth_summary: Growth accounting summary with net growth

Example:

id = "user_growth"
name = "User Growth Analysis"
flow_type = "growth_accounting"
namespace = "analytics"

[input]
columns = [
{name = "user_id", schema_type = "string", required = true},
{name = "timestamp", schema_type = "timestamp", required = true}
]
primary_key = ["user_id", "timestamp"]

[properties]
period_type = "daily"
churn_threshold_days = 30

Growth Metrics:

  • Acquisitions: Count of new users per period
  • Churn: Count of users who stopped being active per period
  • Resurrections: Count of previously churned users who returned per period
  • Net Growth: Acquisitions - Churn + Resurrections
  • Active Users: Total active users per period

Outbox Flow

Type: outbox

Processing Type: INCREMENTAL (default) - Always inserts new events, can be forced to FULL_REFRESH

Ensures reliable message delivery in distributed systems by writing events to an outbox table as part of the same transaction as source data. A separate publisher process reads from the outbox and publishes messages.

When to Use

  • Build event-driven architectures with reliable message delivery
  • Ensure events are never lost (atomic writes with source data)
  • Support exactly-once or at-least-once message publishing
  • Implement event sourcing patterns
  • Decouple event publishing from business logic
  • Enable retry mechanisms for failed message publishing
  • Build microservices with reliable inter-service communication

How It Works

The outbox pattern solves the "dual-write problem" in distributed systems where you need to:

  1. Write to a database (source data)
  2. Publish an event (message queue/stream)

Instead of doing both in separate transactions (which can fail independently), the outbox pattern:

  1. Writes source data and event to outbox in the same transaction (atomic)
  2. A separate publisher process reads from the outbox and publishes messages
  3. Updates outbox status to "published" after successful publishing
  4. Supports retries for failed publishes

Key Concepts

  • Event Type: Type of event (e.g., 'order_created', 'user_updated')
  • Aggregate ID: ID of the aggregate root (e.g., order_id, user_id)
  • Payload: Event data as JSON
  • Status: Tracks event state: 'pending', 'published', 'failed'
  • Retry Count: Number of publish attempts
  • Idempotency: Supports idempotent message publishing

Datasets Created

  1. Landing Table ({flow_id}_landing): Receives raw input data/events

    • All input columns
    • xt_batch_id: Identifies the batch/snapshot
    • timestamp: Ingestion timestamp
    • Primary key: (input_columns, xt_batch_id)
  2. Outbox Table ({flow_id}_outbox): Events to be published

    • outbox_id: Surrogate key (PRIMARY KEY, auto-incrementing)
    • event_type: Type of event (extracted from input)
    • aggregate_id: ID of the aggregate root (extracted from input)
    • payload: Event payload as JSON (all or selected input columns)
    • status: Status ('pending', 'published', 'failed')
    • created_at: When the event was created
    • published_at: When the event was published (nullable)
    • retry_count: Number of publish attempts
    • error_message: Error message if publish failed (nullable)
    • xt_batch_id: Batch that created this event

Required Input

  • columns: List of input column definitions
  • properties.event_type_column: Name of column containing event type (default: 'event_type')
  • properties.aggregate_id_column: Name of column containing aggregate ID (default: 'id')
  • properties.payload_columns: List of columns to include in payload (empty = all columns)

Configuration Properties

  • event_type_column: Name of column in input that contains event type (default: "event_type")
  • aggregate_id_column: Name of column in input that contains aggregate ID (default: "id")
  • payload_columns: List of column names to include in payload JSON (empty list = all columns)

Example

id = "order_events_outbox"
name = "Order Events Outbox"
flow_type = "outbox"
namespace = "events"

[input]
columns = [
{name = "order_id", schema_type = "string", required = true},
{name = "event_type", schema_type = "string", required = true},
{name = "customer_id", schema_type = "string", required = false},
{name = "amount", schema_type = "float", required = false},
{name = "status", schema_type = "string", required = false},
{name = "created_at", schema_type = "datetime", required = false}
]

[properties]
event_type_column = "event_type"
aggregate_id_column = "order_id"
payload_columns = [] # Empty means all columns

Publisher Process

The outbox table is populated by the flow, but a separate publisher process is needed to:

  1. Read pending events: Query SELECT * FROM {flow_id}_outbox WHERE status = 'pending' ORDER BY created_at
  2. Publish messages: Send to message queue/stream (Kafka, RabbitMQ, SQS, etc.)
  3. Update status: Set status = 'published', published_at = NOW() on success
  4. Handle failures: Set status = 'failed', increment retry_count, store error_message
  5. Retry logic: Implement retry with exponential backoff for failed events

Querying Outbox Data

Get pending events:

SELECT * FROM order_events_outbox
WHERE status = 'pending'
ORDER BY created_at
LIMIT 100

Get published events:

SELECT * FROM order_events_outbox
WHERE status = 'published'
ORDER BY published_at DESC

Get failed events for retry:

SELECT * FROM order_events_outbox
WHERE status = 'failed'
AND retry_count < 5
ORDER BY created_at

Mark event as published:

UPDATE order_events_outbox
SET status = 'published',
published_at = NOW()
WHERE outbox_id = :outbox_id

Important Notes

  • Events are written atomically with source data (same transaction)
  • Events start with status = 'pending'
  • Publisher process is separate from the flow (not included in flow execution)
  • Supports idempotent publishing (check if already published before publishing)
  • Retry logic should be implemented in the publisher process
  • Consider implementing dead-letter queue for events that fail after max retries

SCD2 Flow

Type: scd2

Processing Type: INCREMENTAL (default) - Can be forced to FULL_REFRESH

Tracks historical changes to dimension data by creating new records with effective dates when attributes change, maintaining a complete history.

When to Use

  • Track historical changes to dimension data (customer, product, employee, etc.)
  • Maintain complete audit trail of all attribute changes
  • Support point-in-time queries (e.g., "What was the customer's address on Jan 1st?")
  • Build data warehouses with slowly changing dimensions
  • Preserve history when business keys change over time
  • Enable time-based analysis and reporting

How It Works

SCD2 (Slowly Changing Dimension Type 2) is a data warehousing pattern that tracks historical changes by creating new records instead of updating existing ones. When a dimension attribute changes:

  1. The old record is closed by setting its effective_end_date
  2. The is_current flag is set to false on the old record
  3. A new record is created with:
    • New attribute values
    • effective_start_date set to the change date
    • effective_end_date set to NULL (or far future date)
    • is_current flag set to true

Key Concepts

  • Business Key: The natural key from your input (defined by primary_key in the flow definition). Used to identify which records represent the same entity.
  • Surrogate Key: Auto-incrementing scd2_id that uniquely identifies each version of a record.
  • Effective Dates: effective_start_date and effective_end_date define when each version was valid.
  • Current Record: Only one record per business key has is_current = true at any time.

Datasets Created

  1. Landing Table ({flow_id}_landing): Receives raw batch data

    • All input columns
    • xt_batch_id: Identifies the batch/snapshot
    • timestamp: Ingestion timestamp
    • Primary key: (primary_key_columns, xt_batch_id)
  2. SCD2 Table ({flow_id}_scd2): Tracks historical changes

    • scd2_id: Surrogate key (PRIMARY KEY, auto-incrementing)
    • All input columns (dimension attributes)
    • effective_start_date: When this version became effective
    • effective_end_date: When this version ended (NULL for current records)
    • is_current: Boolean flag indicating if this is the current version
    • xt_batch_id: Batch that created this version

Required Input

  • primary_key: One or more columns that uniquely identify the dimension entity (used as business key)
  • columns: List of dimension attribute columns

Example

id = "customer_scd2"
name = "Customer SCD2"
flow_type = "scd2"
namespace = "dimensions"

[input]
primary_key = ["customer_id"]
columns = [
{name = "customer_id", schema_type = "string", required = true},
{name = "customer_name", schema_type = "string", required = true},
{name = "email", schema_type = "string", required = false},
{name = "address", schema_type = "string", required = false},
{name = "status", schema_type = "string", required = false}
]

Querying SCD2 Data

Get current version of all records:

SELECT * FROM customer_scd2 WHERE is_current = true

Get point-in-time snapshot:

SELECT * FROM customer_scd2
WHERE effective_start_date <= '2024-01-15'
AND (effective_end_date IS NULL OR effective_end_date > '2024-01-15')

Get all versions of a specific customer:

SELECT * FROM customer_scd2
WHERE customer_id = 'CUST001'
ORDER BY effective_start_date

Important Notes

  • SCD2 flows require a primary_key definition in the input (used as business key)
  • The business key is used to match records and determine when changes occur
  • Only one record per business key has is_current = true at any time
  • Historical records are never deleted, only closed with an end date
  • Supports complete audit trail of all changes over time

Standard Flow

Type: standard

Processing Type: Configurable per task - Can be FULL_REFRESH or INCREMENTAL depending on task configuration

Task-based flow that supports ingestion, SQL processing, and custom script tasks executed directly by engines.

When to Use

  • Build flexible, task-based data pipelines
  • Execute SQL directly on your engine
  • Run custom scripts (engine-specific)
  • Work with SQL-capable engines (SQLite, PostgreSQL, MySQL, etc.)
  • Have fine-grained control over each pipeline step

Important Notes

  • Standard flows execute tasks directly on engines, making them engine-specific
  • SQL processing tasks require SQL-capable engines
  • Custom script tasks require engines that support script execution
  • Tasks are not converted to instruction sets, so they only work with compatible engines

Task Types

Ingestion Tasks

Load data from external sources (CSV, JSON, etc.).

[[tasks]]
id = "ingest_data"
type = "ingestion"
target_dataset = "raw_data"
[tasks.config]
path = "data/input.csv"
format = "csv"
delimiter = ","
header = true

SQL Processing Tasks

Execute SQL queries directly.

[[tasks]]
id = "process_data"
type = "sql_processing"
source_dataset = "raw_data"
target_dataset = "processed_data"
sql = """
INSERT INTO {target_dataset}
SELECT * FROM {source_dataset}
WHERE status = 'active'
"""

Variables and Templating:

Standard flows use Jinja2 templating with support for multiple variable types:

1. Built-in Runtime Variables:

  • {{ source_dataset }}: Source dataset table name
  • {{ target_dataset }}: Target dataset table name
  • {{ batch_id }}: Current batch ID

2. Execution Date and Time Variables: Available when an execution date is provided:

  • {{ ds }}: Execution date as YYYY-MM-DD (e.g., "2024-01-15")
  • {{ ds_nodash }}: Execution date as YYYYMMDD (e.g., "20240115")
  • {{ ts }}: Execution timestamp as YYYY-MM-DDTHH:MM:SS
  • {{ ts_nodash }}: Execution timestamp as YYYYMMDDTHHMMSS
  • {{ yesterday_ds }}: Previous day as YYYY-MM-DD
  • {{ tomorrow_ds }}: Next day as YYYY-MM-DD
  • {{ prev_ds }}: Alias for yesterday_ds
  • {{ next_ds }}: Alias for tomorrow_ds
  • {{ execution_date }}: Full datetime object

3. Custom Variables:

  • Flow-level: Defined in [variables] section, available to all tasks
  • Task-level: Defined in [tasks.variables], override flow-level variables
  • Syntax: {{ var('variable_name') }} or {{ var('variable_name', 'default') }}
  • Shorthand: {{ variable_name }} (when variable is in context)

Custom Script Tasks

Execute custom scripts (engine-specific).

[[tasks]]
id = "custom_processing"
type = "custom_script"
source_dataset = "processed_data"
target_dataset = "final_data"
script = "# Custom script content"

Task Dependencies

Standard flows support both implicit and explicit task dependencies, enabling FAN IN and FAN OUT patterns.

Implicit Dependencies

Tasks automatically depend on datasets produced by other tasks. If task B's source_dataset matches task A's target_dataset, task B will automatically depend on task A.

Explicit Dependencies

You can explicitly define task dependencies to support complex patterns:

FAN OUT Pattern (One task → Multiple tasks):

[[tasks]]
id = "ingest_data"
type = "ingestion"
target_dataset = "raw_data"
downstream_tasks = ["process_data", "validate_data"] # FAN OUT
[tasks.config]
path = "data/input.csv"
format = "csv"

[[tasks]]
id = "process_data"
type = "sql_processing"
source_dataset = "raw_data"
target_dataset = "processed_data"
dependencies = ["ingest_data"] # Explicit dependency
sql = "SELECT * FROM {{ source_dataset }}"

FAN IN Pattern (Multiple tasks → One task):

[[tasks]]
id = "merge_data"
type = "sql_processing"
# Multiple source datasets (FAN IN)
source_datasets = ["processed_data", "validated_data"]
target_dataset = "merged_data"
# Explicit dependencies (FAN IN)
dependencies = ["process_data", "validate_data"]
sql = """
SELECT * FROM {{ source_datasets[0] }}
UNION ALL
SELECT * FROM {{ source_datasets[1] }}
"""

Dependency Fields

  • dependencies: List of task IDs this task depends on (FAN IN support)

    • Example: dependencies = ["task_1", "task_2"]
    • Ensures this task waits for all listed tasks to complete
  • downstream_tasks: List of task IDs that depend on this task (FAN OUT support)

    • Example: downstream_tasks = ["task_3", "task_4"]
    • Declares that multiple tasks depend on this task
  • source_datasets: List of source dataset IDs (for multi-source tasks, FAN IN)

    • Example: source_datasets = ["dataset_a", "dataset_b"]
    • Use when a task needs to combine data from multiple sources
    • Access in templates: {{ source_datasets[0] }}, {{ source_datasets[1] }}, etc.

Multiple Source Datasets

When a task needs to combine data from multiple sources, use source_datasets:

[[tasks]]
id = "join_data"
type = "sql_processing"
source_datasets = ["table_a", "table_b", "table_c"] # Multiple sources
target_dataset = "joined_data"
dependencies = ["task_a", "task_b", "task_c"] # All upstream tasks
sql = """
SELECT
a.id,
a.name,
b.value,
c.category
FROM {{ source_dataset('table_a') }} a
JOIN {{ source_dataset('table_b') }} b ON a.id = b.id
JOIN {{ source_dataset('table_c') }} c ON a.id = c.id
"""

Note: When using source_datasets, you can still provide source_dataset for backward compatibility. The first item in source_datasets will be used as the primary source.

Accessing Multiple Source Datasets:

You can access datasets by ID using the source_dataset() function:

sql = """
SELECT
a.id,
a.name,
b.value
FROM {{ source_dataset('processed_data') }} a
JOIN {{ source_dataset('validated_data') }} b ON a.id = b.id
"""

Benefits of source_dataset() function:

  • ✅ More readable and maintainable than {{ source_datasets[0] }}
  • ✅ Self-documenting (dataset ID is clear)
  • ✅ Order-independent (no need to worry about list order)

Alternative access methods (for backward compatibility):

  • {{ source_datasets[0] }}, {{ source_datasets[1] }} - Access by index
  • {{ source_datasets_dict['dataset_id'] }} - Access by ID from dictionary

Dependencies are resolved automatically when generating the execution DAG, combining both implicit (dataset-based) and explicit (task-based) dependencies.

Example

id = "data_pipeline"
name = "Data Processing Pipeline"
flow_type = "standard"
namespace = "raw"

# Flow-level variables (available to all tasks)
[variables]
min_value = 0
status_filter = "active"
lookback_days = 30

[[tasks]]
id = "ingest_raw_data"
type = "ingestion"
target_dataset = "raw_data"
[tasks.config]
# Variables can be used in file paths
path = "data/input_{{ batch_id }}.csv"
format = "csv"

[[tasks]]
id = "process_data"
type = "sql_processing"
source_dataset = "raw_data"
target_dataset = "processed_data"
# Task-specific variables (override flow-level)
[tasks.variables]
threshold = 100
sql = """
INSERT INTO {{ target_dataset }}
SELECT
id,
name,
UPPER(email) AS email
FROM {{ source_dataset }}
WHERE status = '{{ var("status_filter") }}'
AND value > {{ var("threshold") }}
"""

Engine Compatibility

EngineIngestionSQL ProcessingCustom Script
SQLite
PostgreSQL
MySQL
Pandas✅ (Python)
DuckDB