Skip to main content

Core Concepts

Understanding the fundamental concepts of Qarion ETL.

Flows

A flow is a declarative definition of a data transformation pipeline. It describes:

  • What data to process
  • How to transform it
  • What the output should look like

Flows are defined in TOML files and processed by flow plugins.

Flow Types

Qarion ETL supports multiple flow types, each implementing a specific pattern:

  • change_feed: Tracks changes in data over time
  • delta_publishing: Financial transaction processing with merge operations
  • export_flow: Exports data from datasets to external destinations with multiple modes
  • sessionization: Groups events into sessions based on time windows
  • growth_accounting: Analyzes user growth metrics (acquisitions, churn, resurrections)
  • outbox: Ensures reliable message delivery in distributed systems
  • scd2: Tracks historical changes to dimension data with effective dates
  • standard: Task-based flow with direct SQL/script execution (engine-specific)

Flow Definition Structure

Basic Flow Example:

# flows/my_flow.toml
id = "my_flow"
name = "My Flow"
flow_type = "change_feed"
namespace = "raw"
description = "Track changes in order data"

[input]
primary_key = ["id"]
columns = [
{ name = "id", schema_type = "integer", required = true },
{ name = "customer_id", schema_type = "integer", required = true },
{ name = "amount", schema_type = "float", required = true },
{ name = "order_date", schema_type = "timestamp", required = false }
]

[properties]
change_detection_columns = ["amount", "status"]

Flow with Ingestion:

# flows/orders_flow.toml
id = "orders_flow"
name = "Orders Processing Flow"
flow_type = "change_feed"
namespace = "production"

[input]
primary_key = ["order_id"]
columns = [
{ name = "order_id", schema_type = "integer", required = true },
{ name = "customer_id", schema_type = "integer", required = true },
{ name = "amount", schema_type = "float", required = true },
{ name = "order_date", schema_type = "timestamp", required = true },
{ name = "status", schema_type = "string", required = true }
]

[properties]
change_detection_columns = ["amount", "status"]

[properties.load]
source_path = "data/orders"
file_pattern = "orders_*.csv"
format = "csv"

Flow with Triggers:

# flows/orders_flow.toml
id = "orders_flow"
flow_type = "change_feed"

[input]
primary_key = ["order_id"]
columns = ["order_id", "customer_id", "amount", "order_date"]

[properties]
change_detection_columns = ["amount", "status"]

[[triggers]]
id = "daily_trigger"
type = "schedule"
schedule = "0 0 * * *"
enabled = true
description = "Daily execution at midnight"

[[triggers]]
id = "cli_trigger"
type = "cli"
enabled = true
description = "Manual trigger via CLI"

Datasets

A dataset defines the structure of data at a specific point in a pipeline. It includes:

  • Column definitions with types
  • Constraints (primary keys, required fields)
  • Metadata (descriptions, properties)

Dataset Structure

Basic Dataset Example:

# datasets/orders.toml
name = "orders"
namespace = "raw"
description = "Customer orders dataset"

[columns]
[columns.id]
schema_type = "integer"
required = true
primary_key = true
description = "Order identifier"

[columns.customer_id]
schema_type = "integer"
required = true
description = "Customer identifier"

[columns.amount]
schema_type = "float"
required = true
description = "Order total amount"

[columns.order_date]
schema_type = "timestamp"
required = false
description = "Order creation date"

[columns.status]
schema_type = "string"
required = true
description = "Order status"
[columns.status.properties]
enum_values = ["pending", "completed", "cancelled"]

[properties]
table_type = "landing"
schema_evolution = { mode = "forward" }

Dataset with Properties:

# datasets/orders_staging.toml
name = "orders_staging"
namespace = "staging"

[columns]
[columns.id]
schema_type = "integer"
required = true
primary_key = true

[columns.customer_id]
schema_type = "integer"
required = true

[columns.amount]
schema_type = "float"
required = true

[columns.value_category]
schema_type = "string"
required = false

[properties]
table_type = "staging"

# Automatic quality checks configuration
quality_stop_on_first_failure = false
quality_fail_on_error = false

[[properties.quality_checks]]
check_id = "completeness"
check_type = "completeness"
enabled = true
[properties.quality_checks.config]
columns = ["id", "customer_id", "amount"]
threshold = 0.95

# Data contract configuration
[properties.contract]
id = "orders_contract"
mode = "strict"
[[properties.contract.columns]]
name = "id"
schema_type = "integer"
required = true
nullable = false

Dataset Materialization

Datasets are materialized as database tables when migrations are applied. This means:

  • Dataset DefinitionMigration FileDatabase Table

When you define a dataset, Qarion ETL can generate migration files that create the corresponding database tables. The dataset definition serves as the source of truth for the table schema.

How Materialization Works

  1. Define Dataset: Create a dataset definition in TOML format
  2. Generate Migrations: Qarion ETL compares dataset definitions with the current database schema
  3. Apply Migrations: Migration files are executed to create or alter tables
  4. Table Created: The dataset is now materialized as a physical table in the database

Example

# datasets/orders.toml
name = "orders"
namespace = "raw"

[columns]
[columns.id]
schema_type = "integer"
required = true
primary_key = true

[columns.customer_id]
schema_type = "integer"
required = true

[columns.amount]
schema_type = "float"
required = true

This dataset definition will generate a migration that creates a table:

CREATE TABLE raw_orders (
id INTEGER PRIMARY KEY,
customer_id INTEGER NOT NULL,
amount REAL NOT NULL
);

Transformations

A transformation is the process of converting data from one dataset to another. Transformations are:

  • Engine-agnostic: Described using transformation instructions
  • Executable: Can be run on various engines (SQLite, Pandas, etc.)
  • Code-generatable: Can be converted to SQL, DBT, or Airflow code

Transformation Instructions

Transformation instructions describe what to transform without specifying how. In flows, transformations are typically defined through flow types or SQL in Standard flows:

Standard Flow SQL Transformation:

# flows/my_standard_flow.toml
flow_type = "standard"

[[tasks]]
id = "transform_orders"
type = "transformation"
source_dataset_id = "orders_landing"
target_dataset_id = "orders_staging"
[tasks.config]
sql = """
SELECT
id,
customer_id,
amount,
order_date,
CASE
WHEN amount > 1000 THEN 'high_value'
ELSE 'standard'
END as value_category
FROM {{ source_dataset }}
WHERE order_date >= '2024-01-01'
"""

Python API Example:

instruction = TransformationInstruction(
source_table="orders",
target_table="orders_changes",
rules=[
TransformationRule(
operation="select",
columns=["id", "customer_id", "amount"]
),
TransformationRule(
operation="filter",
condition="batch_id = :batch_id"
)
]
)

Flow DAGs

A FlowDAG (Directed Acyclic Graph) represents the execution plan for a flow. It consists of:

  • Nodes: Individual steps (ingestion or transformation)
  • Edges: Dependencies between steps

Node Types

  • Ingestion: Load data from external sources (files, APIs, databases) into landing tables
  • Export: Export data from tables to files
  • Transformation: Transform data between datasets

For detailed information about ingestion, see the Data Ingestion Guide.

DAG Example

[File] → [Ingestion Node] → [Landing Table] → [Export Node] → [File]

[Transformation Node] → [Change Feed Table]

Engines

An engine is the execution environment for transformations. It provides the database or data processing capabilities where your transformations run.

Available Engines

Qarion ETL supports multiple engines:

  • SQLite: File-based SQL database (good for development)
  • Pandas Memory: In-memory Pandas DataFrames (fast, no persistence)
  • Pandas Local: Pandas with Parquet file persistence
  • DuckDB: In-process analytical database (optimized for analytics)

Engine Configuration

Engines are configured in qarion-etl.toml (or config.toml):

SQLite Engine:

# qarion-etl.toml
[engine]
name = "sqlite"
[engine.config]
path = "data/qarion-etl.db"

Pandas In-Memory Engine:

[engine]
name = "pandas_memory"
[engine.config]
# No configuration required for in-memory engine

Pandas Local Storage Engine:

[engine]
name = "pandas_local"
[engine.config]
storage_dir = "data/pandas"

DuckDB Engine:

[engine]
name = "duckdb"
[engine.config]
path = "data/qarion-etl.duckdb"

PostgreSQL Engine:

[engine]
name = "postgresql"
[engine.config]
host = "localhost"
port = 5432
database = "mydb"
user = "myuser"
password = "${credential:db_password}" # Using credential store

Or using connection string:

[engine]
name = "postgresql"
[engine.config]
connection_string = "postgresql://user:password@localhost:5432/mydb"

Note: PostgreSQL engine requires psycopg2 or psycopg2-binary:

pip install psycopg2-binary

Separate Metadata Engine:

# Processing engine
[engine]
name = "pandas_memory"

# Metadata engine (for storing metadata in database)
[metadata_engine]
name = "sqlite"
[metadata_engine.config]
path = "data/metadata.db"

# Use database storage for metadata
dataset_storage = "database"
flow_storage = "database"
schema_storage = "database"

Engine Abstraction

Engines are abstracted through:

  • BaseEngine: Common interface for all engines
  • BaseDialectAdapter: Converts transformation instructions to engine-specific code

See Engines and Storage for detailed engine configuration and selection guidance.

Plugins

Qarion ETL uses a plugin architecture for extensibility:

Flow Plugins

Define flow-specific logic:

  • Dataset generation
  • DAG generation
  • Transformation instruction generation

Engine Plugins

Provide engine implementations:

  • Connection management
  • Query execution
  • Data retrieval

Code Generator Plugins

Generate code in various formats:

  • SQL files
  • DBT projects
  • Airflow DAGs

Repository Plugins

Provide storage backends:

  • Local file storage
  • Database storage

Code Generation

Qarion ETL can generate executable code from flows:

SQL Generation

Generates SQL files with parameterized queries:

INSERT INTO orders_changes
SELECT * FROM orders
WHERE batch_id = :batch_id

DBT Generation

Generates complete DBT projects:

  • Model files (.sql)
  • Schema definitions (schema.yml)
  • Project configuration (dbt_project.yml)
  • Documentation (.md files)

Example Generated DBT Project Structure:

dbt_project/
├── dbt_project.yml
├── models/
│ ├── staging/
│ │ ├── stg_orders.sql
│ │ └── schema.yml
│ └── marts/
│ ├── orders_changes.sql
│ └── schema.yml
└── README.md

Generated Model Example:

-- models/staging/stg_orders.sql
{{ config(materialized='incremental', unique_key=['id', 'xt_batch_id']) }}

SELECT
id,
customer_id,
amount,
order_date,
xt_batch_id
FROM {{ ref('orders_landing') }}
{% if is_incremental() %}
WHERE xt_batch_id > (SELECT MAX(xt_batch_id) FROM {{ this }})
{% endif %}

Airflow Generation

Generates Airflow DAGs:

  • Task definitions
  • Dependencies
  • Execution functions

Schema Evolution

Qarion ETL supports schema evolution with different modes:

Strict Mode

No schema changes allowed. Raises errors on schema mismatches.

Forward Mode

Allows adding new columns. Existing columns cannot be removed or changed.

Normal Mode

Allows schema changes with validation.

Batch Processing

Qarion ETL processes data in batches:

  • Batch ID: Unique identifier for each batch
  • Previous Batch ID: Reference to the previous batch for incremental processing
  • Batch Filtering: Automatic filtering by batch ID

Processing Types

Each task has a processing type that determines how data is processed:

  • FULL_REFRESH: Rebuilds entire dataset from scratch. Processes all data regardless of previous batches.
  • INCREMENTAL: Processes only new/changed data since last run. Requires previous batch for comparison.

Forcing Full Refresh:

  • Set force_full_refresh=True in execution API
  • Set previous_batch_id=None when executing
  • Useful for reprocessing all historical data

See Batch Processing and Incrementality for detailed information.

Migrations

Migrations are the mechanism for managing database schema changes. They ensure that your database schema stays in sync with your dataset definitions.

What Are Migrations?

Migrations are files that contain DDL (Data Definition Language) operations to:

  • Create new tables from dataset definitions
  • Alter existing tables when datasets change
  • Track schema history and changes over time

How Migrations Work

  1. Dataset Definitions: You define datasets in TOML files
  2. Migration Generation: Qarion ETL compares dataset definitions with the current database schema
  3. Migration Files: JSON files are generated containing the necessary DDL operations
  4. Migration Execution: Migration files are applied to the database to create or update tables

Migration Generation

Migrations are automatically generated from dataset definitions:

# Generate migrations from all datasets
qarion-etl build

# Or specifically generate migrations
qarion-etl generate-migrations

Qarion ETL will:

  • Scan all dataset definition files
  • Compare them with the current database schema
  • Generate migration files for any differences
  • Create migration files in the migrations/ directory

Migration Files

Migration files are JSON files with a timestamp prefix:

migrations/
20241205120000_create_orders.json
20241205120001_alter_orders_add_status.json

Each migration file contains:

  • Metadata: Dataset ID, name, action (CREATE/ALTER), version
  • UP DDL: Operations to apply the migration (create/alter table)
  • DOWN DDL: Operations to rollback the migration (optional)

Migration Actions

Migrations can have different actions:

  • CREATE: Create a new table from a dataset definition
  • ALTER: Modify an existing table when a dataset definition changes
  • NO_CHANGE: Dataset matches current schema, no migration needed

Schema Evolution Modes

Migrations respect the schema evolution mode configured for each dataset:

  • Strict Mode: No schema changes allowed (raises errors)
  • Forward Mode: Only allows adding new columns
  • Normal Mode: Allows all schema changes with validation

Configuration Example:

# datasets/orders.toml
name = "orders"
namespace = "raw"

[schema_evolution]
mode = "forward" # strict, forward, or normal
allow_column_addition = true
allow_column_removal = false
allow_type_changes = false

Applying Migrations

After generating migrations, apply them to the database:

# Apply all pending migrations
qarion-etl apply-migrations

# Dry run (validate without executing)
qarion-etl apply-migrations --dry-run

Migration History

Qarion ETL tracks migration history to:

  • Prevent duplicate migrations
  • Ensure migrations are applied in order
  • Support rollback operations
  • Track schema evolution over time

Best Practices

  1. Version Control: Commit migration files to version control
  2. Review Before Apply: Always review generated migrations before applying
  3. Test First: Test migrations in a development environment
  4. Backup: Backup your database before applying migrations in production
  5. Incremental Changes: Make small, incremental schema changes rather than large ones

Execution and Monitoring

Qarion ETL provides comprehensive execution tracking and monitoring capabilities:

  • Automatic Operation Tracking: All flow executions, task runs, and operations are automatically tracked
  • Execution History: Complete history of all executions with timing, status, and results
  • Performance Metrics: Track rows processed, execution times, and success rates
  • Error Tracking: Detailed error information for failed operations
  • Queryable Metadata: All execution metadata is stored in database tables for analysis

For detailed information about accessing execution metadata and monitoring your pipelines, see the Metadata Tracking Guide.