Skip to main content

Using Flows in Qarion ETL

A comprehensive guide to using flows to build data transformation pipelines in Qarion ETL.

What Are Flows?

A flow is a declarative definition of a data transformation pipeline. It describes:

  • What data to process
  • How to transform it
  • What the output should look like

Flows are defined in TOML files and processed by Qarion ETL to generate datasets, execution plans, and code.

Flow Types

Qarion ETL supports multiple flow types, each designed for specific use cases. The following table provides a quick overview:

Flow TypePrimary PurposeCategoryKey Use Cases
Change FeedChange DetectionTransformationTrack data changes, audit trails, incremental processing
Delta PublishingTransaction ProcessingIntegrationFinancial transactions, accounting systems, transaction history
Quality CheckData Quality ValidationQualityData quality monitoring, ETL validation, compliance checks
Error ParkingData Quality FilteringQualitySplit data by quality, park failed records, ensure only valid data reaches production
Export FlowData ExportExportExport data to files with multiple modes (full, batch, incremental, changes_only)
SessionizationEvent GroupingEvent ProcessingWeb analytics, user behavior analysis, event grouping
Growth AccountingGrowth AnalysisAnalyticsUser growth metrics, churn analysis, retention
OutboxEvent PublishingIntegrationEvent-driven architectures, reliable message delivery
SCD2Historical TrackingTransformationDimension history, point-in-time queries, data warehousing
StandardCustom PipelinesTransformationFlexible task-based pipelines, SQL processing, custom scripts
OrchestratorFlow CoordinationOrchestrationCoordinate multiple flows, manage dependencies, conditional execution

Flow Categories

Integration Flows: Delta Publishing, Outbox Transformation Flows: Change Feed, SCD2, Standard Quality Flows: Quality Check, Error Parking Export Flows: Export Flow Event Processing Flows: Sessionization Analytics Flows: Growth Accounting Orchestration Flows: Orchestrator

Change Feed Flow

Tracks changes to data over time by comparing batches and classifying records.

Use when:

  • You need to track what changed between data loads
  • You want to identify new, modified, deleted, or unchanged records
  • You need to maintain a history of all changes
  • You're building audit trails or change tracking systems

Example:

# flows/orders_change_feed.toml
id = "orders_change_feed"
name = "Orders Change Feed"
flow_type = "change_feed"

[input]
primary_key = ["order_id"]
columns = ["order_id", "customer_id", "order_date", "total_amount", "status"]

[properties.load]
source_path = "data/orders"
file_pattern = "orders_*.csv"

Delta Publishing Flow

Processes financial transactions with merge operations, supporting incremental processing. Supports both batch-based and timestamp-based incremental detection.

Use when:

  • You're processing financial transactions
  • You need to handle incremental updates
  • You want to maintain transaction history
  • You're building a data warehouse for financial data

Incremental Detection:

  • Batch-based: Uses xt_batch_id or batch_id columns for incremental processing
  • Timestamp-based: Automatically detects timestamp columns (e.g., timestamp, created_at, updated_at, date, datetime) for time-based incremental processing

Example:

# flows/transactions_delta.toml
id = "transactions_delta"
name = "Transactions Delta Publishing"
flow_type = "delta_publishing"

[input]
primary_key = ["transaction_id"]
columns = ["transaction_id", "account_id", "amount", "transaction_date", "type"]

[properties]
namespace = "finance"

Timestamp-based Incremental: If your input data includes timestamp columns, Delta Publishing will automatically detect them and use time-based incremental processing instead of batch-based.

Export Flow

Exports data from existing datasets (change_feed, delta_transaction, etc.) to external destinations with support for different export modes.

Use when:

  • You need to export processed data to files for downstream consumption
  • You want to export only changed data (incremental exports)
  • You need to export specific batches for reprocessing
  • You want to export data in different formats (CSV, JSON, Parquet)
  • You're integrating with external systems that consume files

Export Modes:

  1. Full Export (export_mode: "full"): Exports all data from the source dataset
  2. Batch Export (export_mode: "batch"): Exports only data from a specific batch_id
  3. Incremental Export (export_mode: "incremental"): Exports only new and changed records (for change_feed)
  4. Changes Only Export (export_mode: "changes_only"): Exports only changed records (new, changed, deleted)

Example - Full Export:

# flows/export_orders.toml
id = "export_orders"
name = "Export Orders"
flow_type = "export_flow"
namespace = "public"

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders.csv"
export_mode = "full"
format = "csv"

Example - Incremental Export from Change Feed:

# flows/export_orders_incremental.toml
id = "export_orders_incremental"
name = "Export Orders Incremental"
flow_type = "export_flow"
namespace = "public"

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders_incremental_{batch_id}.csv"
export_mode = "incremental"
format = "csv"
exporter_config = {
delimiter = ","
include_header = true
}

Example - Batch Export:

# flows/export_orders_batch.toml
id = "export_orders_batch"
name = "Export Orders Batch"
flow_type = "export_flow"
namespace = "public"

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders_batch_42.csv"
export_mode = "batch"
batch_id = 42
format = "csv"

Example - Changes Only Export:

# flows/export_orders_changes.toml
id = "export_orders_changes"
name = "Export Orders Changes"
flow_type = "export_flow"
namespace = "public"

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders_changes.parquet"
export_mode = "changes_only"
format = "parquet"
exporter_config = {
compression = "snappy"
}

Integration with Change Feed:

Export flows work seamlessly with change_feed flows. After processing data through a change_feed flow, you can export only the changed records:

# First, create a change_feed flow
id = "orders_change_feed"
name = "Orders Change Feed"
flow_type = "change_feed"

[input]
primary_key = ["order_id"]
columns = ["order_id", "customer_id", "order_date", "total_amount"]

# Then, create an export flow that exports from the change_feed
id = "export_orders_changes"
name = "Export Order Changes"
flow_type = "export_flow"

[properties.export]
source_dataset_id = "orders_change_feed_change_feed" # Change feed dataset ID
file_path = "exports/orders_changes_{batch_id}.csv"
export_mode = "incremental"
format = "csv"

Integration with Delta Publishing:

Export flows can also export delta transactions from delta_publishing flows:

# Export delta transactions
id = "export_delta_transactions"
name = "Export Delta Transactions"
flow_type = "export_flow"

[properties.export]
source_dataset_id = "transactions_delta_delta_transaction" # Delta transaction dataset ID
file_path = "exports/delta_transactions.parquet"
export_mode = "full"
format = "parquet"
exporter_config = {
compression = "snappy"
}

Supported Formats:

  • CSV (.csv)
  • JSON (.json, .jsonl)
  • Parquet (.parquet)

Configuration Options:

  • source_dataset_id: ID of the source dataset to export from (required)
  • file_path or destination: Path to the export file (required)
  • export_mode: Export mode (full, batch, incremental, changes_only) - default: full
  • batch_id: Batch ID for batch mode exports
  • format: Export format (csv, json, parquet) - auto-detected from file extension if not specified
  • exporter_config: Format-specific configuration (delimiter, encoding, compression, etc.)

For complete documentation, see the Export Flow Reference, which covers all export modes, formats, and configuration options in detail.

Quality Check Flow

Purpose: Run systematic data quality validation using Quality Check Suites.

Pattern: Quality validation pattern with reusable suites.

When to Use

Use Quality Check Flow when you need to:

  • Validate data quality across multiple datasets
  • Run reusable quality check suites
  • Track quality check results over time
  • Integrate quality validation into your data pipeline

Use Cases

  • Data Quality Monitoring: Systematic validation of data quality
  • ETL Validation: Validate data after transformations
  • Compliance: Ensure data meets regulatory requirements
  • Data Governance: Maintain data quality standards

How It Works

  1. Quality Check Suites: Define reusable collections of quality checks in data_quality/ directory
  2. Data Sources: Run checks against datasets, queries, subsets, tables, or views
  3. Quality Results Table: Stores all quality check execution results with pass/fail status

Example

# flows/orders_quality_check.toml
id = "orders_quality_check"
name = "Orders Quality Check Flow"
flow_type = "quality_check"

[input]
columns = [] # Quality check flows don't require input columns

[properties]
[properties.quality_suites]
# Reference quality check suites defined in data_quality/ directory

For complete details, see the Quality Check Flow guide, which covers Quality Check Suites, data sources, and configuration in depth.

Error Parking Flow

Automatically splits incoming data based on data quality check results, writing passed records to the normal target table and failed records to an error parking table with error metadata.

Use when:

  • You need to ensure only high-quality data reaches production tables
  • You want to capture and review data quality issues without blocking the pipeline
  • You need to enable data correction workflows for failed records
  • You want to automatically separate valid and invalid records based on quality checks
  • You're building data pipelines where data quality is critical

Key Features:

  • Automatic Splitting: Records are automatically split into passed and failed based on quality check results
  • Error Metadata: Failed records include detailed error information (check ID, check name, check type, error message, timestamp)
  • Non-Blocking: Failed records don't stop the pipeline - valid records continue to production
  • Review and Correction: Error parking table enables review and correction workflows
  • Multiple Quality Checks: Supports multiple quality checks (completeness, uniqueness, range, pattern, etc.)

Generated Datasets:

  1. Staging Dataset ({flow_id}_staging): Landing table for initial data loading

    • All input columns
    • xt_batch_id: Identifies the batch
  2. Target Dataset ({flow_id}_target): Normal target table for passed records

    • All input columns
    • Only contains records that pass all quality checks
  3. Error Parking Dataset ({flow_id}_error_parking): Parking table for failed records

    • All input columns
    • xt_error_check_id: ID of the quality check that failed
    • xt_error_check_name: Name of the quality check that failed
    • xt_error_check_type: Type of quality check (completeness, uniqueness, range, pattern, etc.)
    • xt_error_message: Error message describing the failure
    • xt_error_timestamp: Timestamp when the error was detected
    • xt_batch_id: Batch ID for tracking

Execution Flow:

  1. Ingestion: Load data into staging table
  2. Quality Checks: Run all defined quality checks on staging data
  3. Record Identification: Identify records that fail any quality check
  4. Split Records: Split records into passed and failed sets
  5. Write Passed Records: Write passed records to normal target table
  6. Write Failed Records: Write failed records to error parking table with error metadata

Example:

# flows/orders_error_parking.toml
id = "orders_error_parking"
name = "Orders Error Parking"
flow_type = "error_parking"
namespace = "production"

[input]
primary_key = ["order_id"]
columns = [
{name = "order_id", schema_type = "integer", required = true},
{name = "customer_id", schema_type = "integer", required = true},
{name = "order_date", schema_type = "date", required = true},
{name = "total_amount", schema_type = "decimal", required = false}
]

[properties]
quality_checks = [
{
check_id = "completeness_check"
check_name = "Completeness Check"
check_type = "completeness"
columns = ["order_id", "customer_id", "order_date"]
severity = "error"
config = {threshold = 1.0}
},
{
check_id = "uniqueness_check"
check_name = "Uniqueness Check"
check_type = "uniqueness"
columns = ["order_id"]
severity = "error"
},
{
check_id = "amount_range_check"
check_name = "Amount Range Check"
check_type = "range"
columns = ["total_amount"]
severity = "error"
config = {
min_value = 0
max_value = 1000000
}
}
]

Quality Check Types Supported:

  • Completeness: Check for missing/null values in required columns
  • Uniqueness: Verify unique constraints on specified columns
  • Range: Validate value ranges (min/max)
  • Pattern: Validate data patterns using SQL LIKE patterns

Querying Error Parking Table:

Find all failed records:

SELECT *
FROM orders_error_parking_error_parking
ORDER BY xt_error_timestamp DESC;

Find records by check type:

SELECT *
FROM orders_error_parking_error_parking
WHERE xt_error_check_type = 'completeness';

Count failed records by check:

SELECT
xt_error_check_id,
xt_error_check_name,
COUNT(*) as failed_count
FROM orders_error_parking_error_parking
GROUP BY xt_error_check_id, xt_error_check_name
ORDER BY failed_count DESC;

Data Correction Workflow:

  1. Review Failed Records: Query error parking table to identify issues
  2. Correct Records: Update records in error parking table to fix issues
  3. Extract Corrected Records: Select corrected records (without error metadata columns)
  4. Insert into Target: Insert corrected records into normal target table
  5. Remove from Parking: Delete processed records from error parking table

Best Practices:

  1. Define Clear Quality Rules: Use specific, measurable quality checks with appropriate severity levels
  2. Monitor Error Parking Table: Set up alerts for high failure rates and regularly review failed records
  3. Error Metadata: Use descriptive check names and IDs, include helpful error messages
  4. Performance: Index error parking table on primary key and error check ID, consider partitioning by batch_id
  5. Data Correction Workflow: Establish a process for reviewing and correcting failed records

For complete documentation, see the Error Parking Flow guide, which covers configuration, quality checks, querying, and correction workflows in detail.

Sessionization Flow

Groups events into sessions based on time windows and user identifiers.

Use when:

  • You need to group events into sessions
  • You're analyzing user behavior
  • You're processing web analytics data
  • You want to identify user sessions

Example:

# flows/user_sessions.toml
id = "user_sessions"
name = "User Sessionization"
flow_type = "sessionization"

[input]
primary_key = ["event_id"]
columns = ["event_id", "user_id", "event_time", "event_type", "page_url"]

[properties]
session_timeout_minutes = 30

Growth Accounting Flow

Growth accounting decomposes user growth into actionable components to understand the true drivers of growth. Instead of just tracking total user count, it breaks down growth into:

  • Acquisitions: New users who engage for the first time
  • Churn: Users who have ceased engagement
  • Resurrections: Users who previously churned but have returned
  • Net Growth: Calculated as Acquisitions - Churn + Resurrections

Use when:

  • You need to understand the true drivers of user growth
  • You want to identify which experiences reduce churn
  • You need to discover what brings inactive users back
  • You want to focus on sustainable growth metrics, not just acquisition

Important Notes:

  • Requires user activity data with user identifiers and timestamps
  • Supports daily, weekly, or monthly period analysis
  • Configurable churn threshold (days of inactivity)
  • A 1% improvement in churn or resurrections can have twice the impact of a 1% lift in acquisition

Growth Accounting Formula:

Net Growth = Acquisitions - Churn + Resurrections

Example:

# flows/user_growth.toml
id = "user_growth"
name = "User Growth Analysis"
flow_type = "growth_accounting"
namespace = "analytics"

[input]
columns = [
{name = "user_id", schema_type = "string", required = true},
{name = "timestamp", schema_type = "timestamp", required = true},
{name = "activity_type", schema_type = "string", required = false}
]
primary_key = ["user_id", "timestamp"]

[properties]
user_id_column = "user_id"
timestamp_column = "timestamp"
period_type = "daily" # daily, weekly, or monthly
churn_threshold_days = 30 # Days of inactivity to consider churned

Generated Datasets:

  1. Activity Periods: User activity aggregated by time period
  2. Acquisitions: New users per period
  3. Churn: Users who churned per period
  4. Resurrections: Users who returned per period
  5. Growth Summary: Aggregated metrics with net growth calculation

Key Insights:

Growth accounting shifts focus from just acquisition to understanding:

  • Which experiences reduce churn the most
  • What moments bring inactive users back
  • Where the compounding value really comes from

This enables sustainable growth by focusing on retention and reactivation, not just counting new arrivals.

Outbox Flow

Ensures reliable message delivery in distributed systems by writing events to an outbox table as part of the same transaction as source data. A separate publisher process reads from the outbox and publishes messages.

Use when:

  • You need to build event-driven architectures with reliable message delivery
  • You want to ensure events are never lost (atomic writes with source data)
  • You need to support exactly-once or at-least-once message publishing
  • You're implementing event sourcing patterns
  • You want to decouple event publishing from business logic
  • You need retry mechanisms for failed message publishing

Important Notes:

  • Events are written atomically with source data (same transaction)
  • Events start with status = 'pending'
  • Publisher process is separate from the flow (not included in flow execution)
  • Supports idempotent publishing (check if already published before publishing)
  • Retry logic should be implemented in the publisher process

Key Concepts:

  • Event Type: Type of event (e.g., 'order_created', 'user_updated')
  • Aggregate ID: ID of the aggregate root (e.g., order_id, user_id)
  • Payload: Event data as JSON
  • Status: Tracks event state: 'pending', 'published', 'failed'
  • Retry Count: Number of publish attempts

Generated Datasets:

  1. Landing Table ({flow_id}_landing): Receives raw input data/events

    • All input columns
    • xt_batch_id: Identifies the batch/snapshot
    • timestamp: Ingestion timestamp
  2. Outbox Table ({flow_id}_outbox): Events to be published

    • outbox_id: Surrogate key (PRIMARY KEY, auto-incrementing)
    • event_type: Type of event (extracted from input)
    • aggregate_id: ID of the aggregate root (extracted from input)
    • payload: Event payload as JSON (all or selected input columns)
    • status: Status ('pending', 'published', 'failed')
    • created_at: When the event was created
    • published_at: When the event was published (nullable)
    • retry_count: Number of publish attempts
    • error_message: Error message if publish failed (nullable)
    • xt_batch_id: Batch that created this event

Example:

# flows/order_events_outbox.toml
id = "order_events_outbox"
name = "Order Events Outbox"
flow_type = "outbox"
namespace = "events"

[input]
columns = [
{name = "order_id", schema_type = "string", required = true},
{name = "event_type", schema_type = "string", required = true},
{name = "customer_id", schema_type = "string", required = false},
{name = "amount", schema_type = "float", required = false},
{name = "status", schema_type = "string", required = false}
]

[properties]
event_type_column = "event_type"
aggregate_id_column = "order_id"
payload_columns = [] # Empty means all columns

Publisher Process:

The outbox table is populated by the flow, but a separate publisher process is needed to:

  1. Read pending events: SELECT * FROM {flow_id}_outbox WHERE status = 'pending'
  2. Publish messages to message queue/stream (Kafka, RabbitMQ, SQS, etc.)
  3. Update status: Set status = 'published', published_at = NOW() on success
  4. Handle failures: Set status = 'failed', increment retry_count, store error_message
  5. Implement retry logic with exponential backoff

Querying Outbox Data:

Get pending events:

SELECT * FROM order_events_outbox
WHERE status = 'pending'
ORDER BY created_at
LIMIT 100

Mark event as published:

UPDATE order_events_outbox
SET status = 'published',
published_at = NOW()
WHERE outbox_id = :outbox_id

SCD2 Flow

Tracks historical changes to dimension data by creating new records with effective dates when attributes change, maintaining a complete history.

Use when:

  • You need to track historical changes to dimension data (customer, product, employee, etc.)
  • You want to maintain a complete audit trail of all attribute changes
  • You need to support point-in-time queries (e.g., "What was the customer's address on Jan 1st?")
  • You're building data warehouses with slowly changing dimensions
  • You need to preserve history when business keys change over time

Important Notes:

  • SCD2 flows require a primary_key definition in the input (used as business key)
  • When attributes change, old records are closed (end date set) and new records are created
  • Only one record per business key has is_current = true at any time
  • Historical records are never deleted, only closed with an end date
  • Supports complete audit trail of all changes over time

Key Concepts:

  • Business Key: The natural key from your input (defined by primary_key). Used to identify which records represent the same entity.
  • Surrogate Key: Auto-incrementing scd2_id that uniquely identifies each version of a record.
  • Effective Dates: effective_start_date and effective_end_date define when each version was valid.
  • Current Record: Only one record per business key has is_current = true at any time.

Generated Datasets:

  1. Landing Table ({flow_id}_landing): Receives raw batch data

    • All input columns
    • xt_batch_id: Identifies the batch/snapshot
    • timestamp: Ingestion timestamp
  2. SCD2 Table ({flow_id}_scd2): Tracks historical changes

    • scd2_id: Surrogate key (PRIMARY KEY, auto-incrementing)
    • All input columns (dimension attributes)
    • effective_start_date: When this version became effective
    • effective_end_date: When this version ended (NULL for current records)
    • is_current: Boolean flag indicating if this is the current version
    • xt_batch_id: Batch that created this version

Example:

# flows/customer_scd2.toml
id = "customer_scd2"
name = "Customer SCD2"
flow_type = "scd2"
namespace = "dimensions"

[input]
primary_key = ["customer_id"]
columns = [
{name = "customer_id", schema_type = "string", required = true},
{name = "customer_name", schema_type = "string", required = true},
{name = "email", schema_type = "string", required = false},
{name = "address", schema_type = "string", required = false},
{name = "status", schema_type = "string", required = false}
]

Querying SCD2 Data:

Get current version of all records:

SELECT * FROM customer_scd2 WHERE is_current = true

Get point-in-time snapshot:

SELECT * FROM customer_scd2
WHERE effective_start_date <= '2024-01-15'
AND (effective_end_date IS NULL OR effective_end_date > '2024-01-15')

Get all versions of a specific customer:

SELECT * FROM customer_scd2
WHERE customer_id = 'CUST001'
ORDER BY effective_start_date

Standard Flow

Task-based flow that supports ingestion, SQL processing, and custom script tasks. Tasks are executed directly by engines without conversion to instruction sets.

Use when:

  • You need flexible, task-based data pipelines
  • You want to execute SQL directly on your engine
  • You need custom script execution (engine-specific)
  • You're working with SQL-capable engines (SQLite, PostgreSQL, etc.)
  • You want fine-grained control over each step in your pipeline

Important Notes:

  • Standard flows execute tasks directly on engines, making them engine-specific
  • SQL processing tasks require SQL-capable engines
  • Custom script tasks require engines that support script execution
  • Tasks are not converted to instruction sets, so they only work with compatible engines

Task Types:

  1. Ingestion Tasks: Load data from external sources (CSV, JSON, etc.)
  2. SQL Processing Tasks: Execute SQL queries directly
  3. Custom Script Tasks: Execute custom scripts (engine-specific)

Example:

# flows/data_pipeline.toml
id = "data_pipeline"
name = "Data Processing Pipeline"
flow_type = "standard"
namespace = "raw"

# Flow-level variables (available to all tasks)
[variables]
min_value = 0
status_filter = "active"
date_format = "%Y-%m-%d"

[[tasks]]
id = "ingest_raw_data"
type = "ingestion"
name = "Ingest Raw Data"
description = "Load data from CSV file"
target_dataset = "raw_data"
[tasks.config]
# Variables can be used in config paths
path = "data/input_{{ batch_id }}.csv"
format = "csv"
delimiter = ","
header = true

[[tasks]]
id = "process_data"
type = "sql_processing"
name = "Process Data"
description = "Transform data using SQL"
source_dataset = "raw_data"
target_dataset = "processed_data"
# Task-specific variables (override flow-level variables)
[tasks.variables]
threshold = 100
sql = """
INSERT INTO {{ target_dataset }}
SELECT
id,
name,
UPPER(email) AS email,
created_at
FROM {{ source_dataset }}
WHERE status = '{{ var("status_filter") }}'
AND value > {{ var("threshold") }}
"""

[[tasks]]
id = "aggregate_data"
type = "sql_processing"
name = "Aggregate Data"
description = "Create summary statistics"
source_dataset = "processed_data"
target_dataset = "summary_stats"
sql = """
INSERT INTO {{ target_dataset }}
SELECT
DATE(created_at) AS date,
COUNT(*) AS total_records,
COUNT(DISTINCT email) AS unique_emails
FROM {{ source_dataset }}
WHERE created_at >= DATE('now', '-{{ var("lookback_days", 30) }} days')
GROUP BY DATE(created_at)
"""

Task Dependencies:

Standard flows support both implicit and explicit task dependencies, enabling FAN IN and FAN OUT patterns.

Implicit Dependencies: Tasks automatically depend on datasets produced by other tasks. In the example above:

  • process_data depends on raw_data (produced by ingest_raw_data)
  • aggregate_data depends on processed_data (produced by process_data)

Explicit Dependencies (FAN IN and FAN OUT):

You can explicitly define task dependencies to support complex patterns:

FAN OUT Pattern (One task → Multiple tasks):

[[tasks]]
id = "ingest_data"
type = "ingestion"
target_dataset = "raw_data"
downstream_tasks = ["process_data", "validate_data"] # FAN OUT: feeds multiple tasks
[tasks.config]
path = "data/input.csv"
format = "csv"

[[tasks]]
id = "process_data"
type = "sql_processing"
source_dataset = "raw_data"
target_dataset = "processed_data"
dependencies = ["ingest_data"] # Explicit dependency
sql = "SELECT * FROM {{ source_dataset }}"

[[tasks]]
id = "validate_data"
type = "sql_processing"
source_dataset = "raw_data"
target_dataset = "validated_data"
dependencies = ["ingest_data"] # Explicit dependency
sql = "SELECT * FROM {{ source_dataset }} WHERE status IS NOT NULL"

FAN IN Pattern (Multiple tasks → One task):

[[tasks]]
id = "process_data"
type = "sql_processing"
source_dataset = "raw_data"
target_dataset = "processed_data"
sql = "SELECT * FROM {{ source_dataset }}"

[[tasks]]
id = "validate_data"
type = "sql_processing"
source_dataset = "raw_data"
target_dataset = "validated_data"
sql = "SELECT * FROM {{ source_dataset }} WHERE status IS NOT NULL"

[[tasks]]
id = "merge_data"
type = "sql_processing"
# Multiple source datasets (FAN IN)
source_datasets = ["processed_data", "validated_data"]
target_dataset = "merged_data"
# Explicit dependencies (FAN IN)
dependencies = ["process_data", "validate_data"]
sql = """
SELECT * FROM {{ source_datasets[0] }}
UNION ALL
SELECT * FROM {{ source_datasets[1] }}
"""

Dependency Fields:

  • dependencies: List of task IDs this task depends on (FAN IN support)
  • downstream_tasks: List of task IDs that depend on this task (FAN OUT support)
  • source_datasets: List of source dataset IDs (for multi-source tasks, FAN IN)

Multiple Source Datasets: When a task needs to combine data from multiple sources, use source_datasets:

[[tasks]]
id = "join_data"
type = "sql_processing"
source_datasets = ["table_a", "table_b", "table_c"] # Multiple sources
target_dataset = "joined_data"
dependencies = ["task_a", "task_b", "task_c"] # All upstream tasks
sql = """
SELECT
a.id,
a.name,
b.value,
c.category
FROM {{ source_datasets[0] }} a
JOIN {{ source_datasets[1] }} b ON a.id = b.id
JOIN {{ source_datasets[2] }} c ON a.id = c.id
"""

Accessing Multiple Source Datasets:

You can access datasets by ID using the source_dataset() function:

sql = """
SELECT
a.id,
a.name,
b.value
FROM {{ source_dataset('processed_data') }} a
JOIN {{ source_dataset('validated_data') }} b ON a.id = b.id
"""

Benefits of source_dataset() function:

  • ✅ More readable and maintainable than {{ source_datasets[0] }}
  • ✅ Self-documenting (dataset ID is clear)
  • ✅ Order-independent (no need to worry about list order)

Alternative access methods (for backward compatibility):

  • {{ source_datasets[0] }}, {{ source_datasets[1] }} - Access by index
  • {{ source_datasets_dict['dataset_id'] }} - Access by ID from dictionary

The execution order is automatically determined based on both implicit and explicit dependencies.

Orchestrator Flow

Orchestrates the execution of multiple other flows, managing dependencies, conditional execution, and data passing between flows.

Use when:

  • You need to coordinate execution of multiple flows
  • You want to define dependencies between flows (e.g., flow B depends on flow A)
  • You need conditional execution (e.g., run flow C only if flow A succeeds)
  • You want to pass variables and batch IDs between flows
  • You're building complex multi-flow pipelines
  • You need to execute flows in parallel or sequentially

Key Features:

  • Dependency Management: Define which flows depend on which
  • Conditional Execution: Execute flows based on success/failure of others (always, success, failure)
  • Data Passing: Pass variables and batch IDs between flows
  • Parallel Execution: Execute independent flows in parallel
  • Sequential Execution: Execute flows in a specific order

Example:

# flows/data_pipeline_orchestrator.toml
id = "data_pipeline_orchestrator"
name = "Data Pipeline Orchestrator"
flow_type = "orchestrator"
namespace = "orchestration"

[properties.orchestration]
# Define flows to orchestrate
flows = [
{
flow_id = "ingest_orders"
task_id = "trigger_ingest_orders"
name = "Trigger Orders Ingestion"
description = "Load orders data from source"
wait_for_completion = true
pass_batch_id = true
condition = "success" # Execute if previous flows succeed
dependencies = [] # No dependencies - runs first
variables = {} # Variables to pass to triggered flow
},
{
flow_id = "ingest_customers"
task_id = "trigger_ingest_customers"
name = "Trigger Customers Ingestion"
description = "Load customers data from source"
wait_for_completion = true
pass_batch_id = true
condition = "success"
dependencies = [] # Can run in parallel with orders ingestion
variables = {}
},
{
flow_id = "transform_orders"
task_id = "trigger_transform_orders"
name = "Trigger Orders Transformation"
description = "Transform orders data"
wait_for_completion = true
pass_batch_id = true
condition = "success"
dependencies = ["trigger_ingest_orders"] # Depends on orders ingestion
variables = {
source_batch = "{{ batch_id }}" # Pass batch ID as variable
}
},
{
flow_id = "export_results"
task_id = "trigger_export"
name = "Trigger Export"
description = "Export final results"
wait_for_completion = true
pass_batch_id = true
condition = "success"
dependencies = ["trigger_transform_orders"] # Depends on transformation
variables = {}
},
{
flow_id = "send_notification"
task_id = "trigger_notification"
name = "Trigger Notification"
description = "Send notification on failure"
wait_for_completion = false # Don't wait for notification
pass_batch_id = false
condition = "failure" # Only run if previous flows fail
dependencies = ["trigger_export"] # Depends on export
variables = {
error_message = "Pipeline failed"
}
}
]

Configuration Options:

Each flow in the orchestration configuration supports:

  • flow_id (required): ID of the flow to trigger
  • task_id (optional): Task identifier (defaults to trigger_{flow_id})
  • name (optional): Human-readable name for the task
  • description (optional): Description of what this task does
  • wait_for_completion (optional, default: true): Whether to wait for the triggered flow to complete before continuing
  • pass_batch_id (optional, default: true): Whether to pass the current batch ID to the triggered flow
  • batch_id (optional): Explicit batch ID to pass (overrides pass_batch_id if specified)
  • condition (optional, default: success): When to execute the flow:
    • always: Always execute regardless of previous flow results
    • success: Execute only if all dependencies succeeded
    • failure: Execute only if any dependency failed
  • dependencies (optional, default: []): List of task IDs this flow depends on (defines execution order)
  • variables (optional, default: {}): Variables to pass to the triggered flow (can use template syntax)

Execution Patterns:

  1. Sequential Execution: Flows execute one after another

    flows = [
    { flow_id = "flow_a", dependencies = [] },
    { flow_id = "flow_b", dependencies = ["trigger_flow_a"] },
    { flow_id = "flow_c", dependencies = ["trigger_flow_b"] }
    ]
  2. Parallel Execution: Independent flows execute simultaneously

    flows = [
    { flow_id = "flow_a", dependencies = [] },
    { flow_id = "flow_b", dependencies = [] }, # No dependencies = parallel
    { flow_id = "flow_c", dependencies = [] }
    ]
  3. Fan-in Pattern: Multiple flows feed into one

    flows = [
    { flow_id = "flow_a", dependencies = [] },
    { flow_id = "flow_b", dependencies = [] },
    { flow_id = "flow_c", dependencies = ["trigger_flow_a", "trigger_flow_b"] }
    ]
  4. Conditional Execution: Execute flows based on conditions

    flows = [
    { flow_id = "main_flow", dependencies = [] },
    { flow_id = "success_handler", condition = "success", dependencies = ["trigger_main_flow"] },
    { flow_id = "failure_handler", condition = "failure", dependencies = ["trigger_main_flow"] }
    ]

Integration with Flow Trigger Tasks:

Orchestrator flows use the same flow trigger task mechanism as standard flows. This means:

  • Triggered flows receive the same batch ID (if pass_batch_id = true)
  • Variables can be passed to triggered flows
  • XCom can be used to exchange data between flows
  • Execution metadata (batch_id, execution_date) is available in triggered flows

Best Practices:

  1. Use descriptive task IDs: Make task IDs clear and meaningful
  2. Define clear dependencies: Explicitly define dependencies to ensure correct execution order
  3. Use conditional execution wisely: Use failure condition for error handling, success for normal flow
  4. Pass variables when needed: Use the variables field to pass context between flows
  5. Consider wait_for_completion: Set to false for fire-and-forget flows (like notifications)
  6. Keep orchestrator flows simple: Focus on coordination, not business logic

Data Ingestion

Most flows in Qarion ETL start with an ingestion step that loads data from external sources (files, APIs, databases) into landing tables. Ingestion is automatically configured when you define [properties.load] in your flow definition.

Quick Example

[properties.load]
source_path = "data/orders"
file_pattern = "orders_*.csv"
format = "csv"

This configuration automatically:

  1. Finds files matching the pattern
  2. Loads them into the landing table
  3. Tags all records with xt_batch_id
  4. Makes data available for transformation steps

Ingestion Features

  • File Formats: CSV, JSON, JSONL, Parquet
  • Pattern Matching: Glob or regex patterns for file discovery
  • Batch Processing: Automatic batch ID assignment
  • Directory Loading: Load multiple files from directories
  • Template Variables: Use {{ batch_id }}, {{ ds }}, etc. in file paths

For complete ingestion documentation, see the Data Ingestion Guide.

Batch Processing and Incrementality

Flows in Qarion ETL process data in batches. Each batch represents a snapshot of data at a point in time. Understanding batch processing and incrementality is crucial for building efficient and reliable data pipelines.

Batch Processing Overview

What is a Batch?

  • A batch is a unit of work that processes a snapshot of data
  • Each batch has a unique batch_id (integer)
  • Batches are processed sequentially or in parallel (depending on dependencies)
  • Batch IDs are used to track data lineage and enable reprocessing

How Batches Work:

  1. Ingestion: Data is loaded into landing tables with xt_batch_id
  2. Transformation: Data is transformed between datasets, using batch IDs for filtering
  3. Tracking: Batch IDs enable tracking what data came from which batch
  4. Reprocessing: You can reprocess specific batches if needed

Incremental vs Full Refresh Processing

Qarion ETL supports two processing strategies, determined by the task's processing_type:

Incremental Processing

Default for: Change Feed, SCD2, Delta Publishing, Outbox flows

How It Works:

  • Compares current batch with previous batch
  • Only processes records that are new, changed, or deleted
  • Preserves existing data while updating changes
  • Requires previous_batch_id to determine what's changed

Benefits:

  • ✅ Faster execution (only processes changes)
  • ✅ Lower resource usage
  • ✅ Maintains historical state
  • ✅ Suitable for large datasets

Example:

from qarion_etl.flows.execution import FlowExecutionService

service = FlowExecutionService(engine, transformation_service)

# Incremental execution (default)
result = service.execute_flow(
execution_plan=plan,
batch_id=5,
previous_batch_id=4 # Compares with batch 4
)

Full Refresh Processing

Default for: Standard flows (can be configured)

How It Works:

  • Processes ALL data from source, regardless of previous batches
  • Replaces entire target dataset
  • No dependency on previous batch IDs
  • Treats execution as first batch scenario

When to Use:

  • Initial data load
  • Reprocessing all historical data
  • Fixing data quality issues
  • Schema changes requiring complete rebuild
  • Testing with complete dataset

How to Trigger Full Refresh:

Option 1: Force Full Refresh Flag

# Force full refresh even for incremental tasks
result = service.execute_flow(
execution_plan=plan,
batch_id=1,
force_full_refresh=True # Forces full refresh
)

Option 2: Set previous_batch_id to None

# Triggers first batch scenario (full refresh)
result = service.execute_flow(
execution_plan=plan,
batch_id=1,
previous_batch_id=None # No previous batch = full refresh
)

Option 3: For Code Generation

# Generate code for full refresh
qarion-etl generate-code \
--flow my_flow \
--batch-id 1 \
--previous-batch-id null \
--format sql

Processing Type by Flow Type

Flow TypeDefault Processing TypeStrategyCan Force Full Refresh?
Change FeedINCREMENTALMerge✅ Yes
Delta PublishingINCREMENTALMerge✅ Yes
Export FlowN/AExportN/A (exports from existing datasets)
Error ParkingINCREMENTALSplit✅ Yes
SCD2INCREMENTALMerge✅ Yes
OutboxINCREMENTALInsert✅ Yes
SessionizationFULL_REFRESHReplaceN/A (always full)
Growth AccountingFULL_REFRESHReplaceN/A (always full)
StandardConfigurableConfigurable✅ Yes

Full Historical Processing

To reprocess all historical data from scratch:

1. Ensure Source Data Contains All History

  • Source data must include all historical records, not just current batch
  • For file-based sources, include all historical files
  • For database sources, query all historical data

2. Execute with Full Refresh

# Reprocess all historical data
result = service.execute_flow(
execution_plan=plan,
batch_id=1, # Start from batch 1
force_full_refresh=True # Process all data
)

3. Process Multiple Batches Sequentially

# Process historical batches in sequence
for batch_id in range(1, 11): # Batches 1-10
result = service.execute_flow(
execution_plan=plan,
batch_id=batch_id,
previous_batch_id=batch_id - 1 if batch_id > 1 else None,
force_full_refresh=(batch_id == 1) # Full refresh for first batch
)

Important Considerations

Incremental Processing:

  • ⚠️ Requires previous batch to exist for change detection
  • ⚠️ First batch scenario is automatically detected (no previous batch)
  • ⚠️ Missing batches can cause issues (use full refresh to recover)
  • ✅ Most efficient for ongoing operations

Full Refresh Processing:

  • ⚠️ Processes ALL source data, not just current batch
  • ⚠️ May take significantly longer than incremental
  • ⚠️ Replaces entire target dataset (data loss if source incomplete)
  • ✅ Ensures complete data consistency
  • ✅ Useful for reprocessing after fixes

Best Practices:

  1. Use incremental processing for regular operations
  2. Use full refresh for initial loads and reprocessing
  3. Always validate source data completeness before full refresh
  4. Test with small batches before full historical reprocessing
  5. Monitor execution time and resource usage

Task Types

Flows internally use a task system to define logical operations. Each task type has explicit properties that define its purpose and behavior. For more information about tasks, see the Task System Guide.

Variables and Templating:

Standard flows support variables and templating for dynamic configuration. Variables are organized into categories:

1. Built-in Runtime Variables:

These variables are automatically available in all tasks:

  • {{ source_dataset }}: Source dataset table name (single source)
  • {{ source_datasets }}: List of source dataset table names (multiple sources, FAN IN)
    • Access individual sources: {{ source_datasets[0] }}, {{ source_datasets[1] }}, etc.
  • {{ target_dataset }}: Target dataset table name
  • {{ batch_id }}: Current batch ID for this execution

2. Execution Date and Time Variables:

When an execution date is provided, these date/time variables are automatically available:

  • {{ ds }}: Execution date as YYYY-MM-DD (e.g., "2024-01-15")
  • {{ ds_nodash }}: Execution date as YYYYMMDD (e.g., "20240115")
  • {{ ts }}: Execution timestamp as YYYY-MM-DDTHH:MM:SS (e.g., "2024-01-15T10:30:00")
  • {{ ts_nodash }}: Execution timestamp as YYYYMMDDTHHMMSS (e.g., "20240115T103000")
  • {{ yesterday_ds }}: Previous day as YYYY-MM-DD
  • {{ tomorrow_ds }}: Next day as YYYY-MM-DD
  • {{ prev_ds }}: Alias for yesterday_ds
  • {{ next_ds }}: Alias for tomorrow_ds
  • {{ execution_date }}: Full datetime object

3. Custom Variables:

You can define custom variables at two levels:

  • Flow-level variables: Defined in [variables] section, available to all tasks
  • Task-level variables: Defined in [tasks.variables], override flow-level variables

Variable Syntax:

  • {{ var('variable_name') }}: Access a variable (raises error if not found)
  • {{ var('variable_name', 'default_value') }}: Access with default value
  • {{ variable_name }}: Direct variable reference (shorthand, when variable is in context)

Variable Examples:

# Flow-level variables (available to all tasks)
[variables]
threshold = 100
status = "active"
lookback_days = 30

[[tasks]]
id = "filter_data"
type = "sql_processing"
# Task-specific variables (override flow-level)
[tasks.variables]
threshold = 200
sql = """
SELECT * FROM {{ source_dataset }}
WHERE value > {{ var("threshold") }}
AND status = '{{ var("status") }}'
AND created_at >= DATE('now', '-{{ var("lookback_days") }} days')
"""

Template Features:

  • Jinja2 syntax: Full Jinja2 templating support
  • Built-in filters: upper, lower, default
  • Template rendering: All SQL, scripts, and config paths support templating

Example with Execution Date Variables:

[[tasks]]
id = "daily_aggregation"
type = "sql_processing"
source_dataset = "raw_data"
target_dataset = "daily_stats"
sql = """
INSERT INTO {{ target_dataset }}
SELECT
DATE(created_at) AS date,
COUNT(*) AS records,
SUM(amount) AS total
FROM {{ source_dataset }}
WHERE DATE(created_at) = '{{ ds }}'
GROUP BY DATE(created_at)
"""

Execution Date Use Cases:

Execution date variables are useful for:

  • Date Partitioning: Process data for specific dates
  • Backfilling: Re-run historical data processing
  • Time-based Filtering: Filter data by execution date
  • File Paths: Use in file paths for date-partitioned data
[[tasks]]
id = "ingest_daily_data"
type = "ingestion"
target_dataset = "daily_data"
[tasks.config]
# Use execution date in file path
path = "data/input/{{ ds_nodash }}.csv"
format = "csv"

Custom Script Templates:

Custom script tasks also support templating:

[[tasks]]
id = "custom_processing"
type = "custom_script"
source_dataset = "raw_data"
target_dataset = "processed_data"
[tasks.variables]
multiplier = 2
script = """
# Custom Python script
import pandas as pd
df = context.get_dataframe('{{ source_dataset }}')
df['value'] = df['value'] * {{ var("multiplier") }}
context.save_dataframe('{{ target_dataset }}', df)
"""

Creating a Flow

1. Define the Flow File

Create a TOML file in your flows/ directory:

# flows/my_flow.toml
id = "my_flow"
name = "My Flow"
flow_type = "change_feed"
namespace = "raw"

[input]
primary_key = ["id"]
columns = ["id", "name", "email", "created_at"]

[properties]
# Flow-specific properties

2. Define Input Structure

Specify the input columns and primary key:

[input]
primary_key = ["id"] # Can be a list for composite keys
columns = ["id", "name", "email", "created_at", "updated_at"]

3. Configure Flow Properties

Each flow type has specific properties:

[properties]
# Change feed properties
[properties.load]
source_path = "data/input"
file_pattern = "*.csv"

# Export flow properties
[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders.csv"
export_mode = "incremental"
format = "csv"

# Sessionization properties
session_timeout_minutes = 30

4. Generate Datasets

Qarion ETL automatically generates datasets based on your flow definition:

qarion-etl generate-docs

This creates dataset definitions in your datasets/ directory.

Working with Flows

Generating Code

Generate executable code from your flows:

# Generate SQL
qarion-etl generate-code --format sql --flow my_flow --output-dir output

# Generate DBT
qarion-etl generate-code --format dbt --flow my_flow --output-dir dbt_project --dialect postgres

# Generate both SQL and Python
qarion-etl generate-code --format both --flow my_flow --output-dir output

Building Projects

Build your project to generate datasets and migrations:

qarion-etl build --flow my_flow

Viewing Flow Documentation

Generate and view flow documentation:

qarion-etl generate-docs --output-dir docs_generated

Then open docs_generated/index.html in your browser.

Flow Best Practices

  1. Choose the Right Flow Type: Select the flow type that matches your use case
  2. Define Clear Primary Keys: Ensure primary keys uniquely identify records
  3. Use Namespaces: Organize flows and datasets with namespaces
  4. Document Your Flows: Add descriptions and comments to flow definitions
  5. Test Incrementally: Test flows with small batches first
  6. Understand Processing Types: Know when your flow uses incremental vs full refresh
  7. Use Full Refresh Strategically: Use full refresh for initial loads and reprocessing, incremental for regular operations
  8. Validate Source Data: Ensure source data completeness before full refresh
  9. Monitor Performance: Track execution time and resource usage for both processing types

Common Patterns

Pattern 1: Simple Change Tracking

id = "simple_change_tracking"
flow_type = "change_feed"

[input]
primary_key = ["id"]
columns = ["id", "name", "status"]

Pattern 2: Financial Transactions

id = "financial_transactions"
flow_type = "delta_publishing"

[input]
primary_key = ["transaction_id"]
columns = ["transaction_id", "account_id", "amount", "date"]

Pattern 3: Event Sessionization

id = "event_sessions"
flow_type = "sessionization"

[input]
primary_key = ["event_id"]
columns = ["event_id", "user_id", "timestamp", "event_type"]

[properties]
session_timeout_minutes = 30

Data Validation and Quality

Qarion ETL provides automatic validation and quality checking capabilities that run during flow execution:

Contract Validation After Ingestion

After data is ingested into landing tables, Qarion ETL can automatically validate the data against data contracts if contracts are configured in your dataset properties.

How It Works:

  1. After successful ingestion, Qarion ETL extracts the target tables from load operations
  2. For each loaded dataset, it checks if a contract is configured in properties.contract
  3. If a contract is found, it validates the dataset schema against the contract
  4. Validation results are logged, and in strict mode, failures will stop the flow

Configuring Contracts:

Contracts can be configured in dataset properties in two ways:

  1. Inline Contract Definition:
# datasets/orders.toml
id = "orders"
name = "Orders Dataset"

[properties.contract]
id = "orders_contract"
name = "Orders Data Contract"
mode = "strict" # strict, lenient, or monitor

[[properties.contract.columns]]
name = "order_id"
schema_type = "integer"
required = true
nullable = false

[[properties.contract.columns]]
name = "amount"
schema_type = "float"
required = true
nullable = false
min_value = 0
  1. Contract ID Reference:
# datasets/orders.toml
[properties]
contract = "orders_contract" # Reference to a contract ID

Contract Modes:

  • strict: All violations are errors and will fail the flow
  • lenient: Warnings for non-critical violations, errors for critical ones
  • monitor: Log violations but don't fail the flow

Example:

# flows/orders_flow.toml
id = "orders_flow"
flow_type = "change_feed"

[input]
primary_key = ["order_id"]
columns = ["order_id", "customer_id", "amount", "order_date"]

[properties.load]
source_path = "data/orders"
file_pattern = "orders_*.csv"

With contract validation, if the ingested data doesn't match the contract schema, the flow will fail with detailed error messages.

Automatic Quality Checks After Transformations

After transformations complete, Qarion ETL can automatically run quality checks if they are configured in the target dataset properties.

How It Works:

  1. After a transformation completes successfully, Qarion ETL checks the target dataset for quality check configuration
  2. If properties.quality_checks is defined, it creates a quality check plan
  3. All configured quality checks are executed automatically
  4. Results are logged, and failures can optionally stop the flow

Configuring Automatic Quality Checks:

Add quality checks to your dataset properties:

# datasets/processed_orders.toml
id = "processed_orders"
name = "Processed Orders"

[properties]
# Configure automatic quality checks
quality_stop_on_first_failure = false # Optional: stop on first failure
quality_fail_on_error = false # Optional: fail flow on quality check errors

[[properties.quality_checks]]
check_id = "completeness_check"
check_name = "Completeness Check"
check_type = "completeness"
enabled = true
[properties.quality_checks.config]
columns = ["order_id", "customer_id", "amount"]
threshold = 0.95
allow_null = false

[[properties.quality_checks]]
check_id = "uniqueness_check"
check_name = "Uniqueness Check"
check_type = "uniqueness"
enabled = true
[properties.quality_checks.config]
columns = ["order_id"]

[[properties.quality_checks]]
check_id = "range_check"
check_name = "Amount Range Check"
check_type = "range"
enabled = true
[properties.quality_checks.config]
columns = ["amount"]
min_value = 0
max_value = 1000000

Quality Check Types:

  • completeness: Check for missing/null values
  • uniqueness: Verify unique constraints
  • range: Validate value ranges
  • pattern: Validate data patterns (regex)
  • referential_integrity: Check foreign key relationships

Configuration Options:

  • quality_stop_on_first_failure: If true, stops executing checks after first failure
  • quality_fail_on_error: If true, fails the entire flow when quality checks fail

Example Flow with Automatic Quality Checks:

# flows/orders_processing.toml
id = "orders_processing"
flow_type = "standard"

[input]
primary_key = ["order_id"]
columns = ["order_id", "customer_id", "amount", "order_date"]

[[tasks]]
id = "transform_orders"
type = "transformation"
source_dataset_id = "orders_landing"
target_dataset_id = "processed_orders" # This dataset has quality checks configured

When the transformation to processed_orders completes, all configured quality checks will run automatically.

Benefits:

  • Automatic Validation: No need to manually add quality check nodes
  • Consistent Quality: Ensures data quality at every transformation step
  • Early Detection: Catch data quality issues immediately after transformations
  • Configurable: Control failure behavior per dataset

Execution Tracking and Monitoring

Qarion ETL automatically tracks all flow executions, providing comprehensive visibility into your pipeline operations. Every flow execution is recorded with:

  • Execution metadata: Flow ID, batch ID, execution date, variables
  • Timing information: Start time, end time, duration
  • Results: Success status, rows loaded, rows processed, error messages
  • Task-level tracking: Individual task executions within the flow
  • Operation details: Load operations, transformations, quality checks, exports, etc.

All execution metadata is stored in database tables (prefixed with xt_ by default) and can be queried for monitoring, debugging, and analysis.

For detailed information about accessing and using execution metadata, see the Metadata Tracking Guide.