Skip to main content

Getting Started with Qarion ETL

This guide will help you get started with Qarion ETL in minutes.

What is Qarion ETL?

Qarion ETL is a flexible, extensible data transformation framework for building scalable data pipelines. It provides:

  • Flow-Based Architecture: Define data pipelines using declarative flow definitions
  • Plugin System: Extensible architecture for custom flow types, engines, and code generators
  • Multiple Engines: Support for SQLite, Pandas, DuckDB, and more
  • Code Generation: Generate SQL, DBT, or Airflow code from flows
  • Schema Evolution: Manage schema changes with forward/strict compatibility modes

Installation

Prerequisites

  • Python 3.11 or higher
  • pip or poetry for package management

Install from PyPI

pip install qarion-etl

Install from Source

git clone https://github.com/yourorg/qarion-etl.git
cd qarion-etl
pip install -e .

Quick Start

1. Create a New Project

You can create a new project in two ways:

Creates a clean project structure with only directories and configuration:

qarion-etl new-project my_project

This creates:

my_project/
├── qarion-etl.toml # Project configuration
├── data/ # Data directory
├── datasets/ # Dataset definitions (empty)
├── flows/ # Flow definitions (empty)
├── migrations/ # Migration files (empty)
└── plugins/ # Plugin directory (empty)

Creates a project with example datasets and flows for each flow type:

qarion-etl new-project my_project --with-examples

This creates the same structure as above, plus:

  • datasets/example_dataset.toml - Example dataset definition
  • flows/example_*.toml - Example flows for each flow type

Initialize Database

After creating a project, initialize the database:

cd my_project
qarion-etl init

Or initialize during project creation:

qarion-etl new-project my_project --init-db

2. Define a Dataset

Create a dataset definition in datasets/orders.toml:

# datasets/orders.toml
name = "orders"
namespace = "raw"
description = "Customer orders dataset"

[columns]
[columns.id]
schema_type = "integer"
required = true
primary_key = true
description = "Order identifier"

[columns.customer_id]
schema_type = "integer"
required = true
description = "Customer identifier"

[columns.amount]
schema_type = "float"
required = true
description = "Order total amount"

[columns.created_at]
schema_type = "timestamp"
required = false
description = "Order creation timestamp"

[properties]
table_type = "landing"
schema_evolution = { mode = "forward" }

3. Define a Flow

Create a flow definition in flows/process_orders.toml:

# flows/process_orders.toml
id = "process_orders"
name = "Process Orders"
flow_type = "change_feed"
namespace = "raw"
description = "Track changes in order data over time"

[input]
primary_key = ["id"]
columns = [
{ name = "id", schema_type = "integer", required = true },
{ name = "customer_id", schema_type = "integer", required = true },
{ name = "amount", schema_type = "float", required = true },
{ name = "created_at", schema_type = "timestamp", required = false }
]

[properties]
change_detection_columns = ["amount", "customer_id"]

# Optional: Configure ingestion
[properties.load]
source_path = "data/orders"
file_pattern = "orders_*.csv"
format = "csv"
loader_config = {
delimiter = ","
header = true
encoding = "utf-8"
}

# Optional: Add triggers
[[triggers]]
id = "cli_trigger"
type = "cli"
enabled = true
description = "Manual trigger via CLI"

4. Generate Code

Generate SQL code:

qarion-etl generate-code --format sql --flow process_orders --output-dir output

Generate DBT code:

qarion-etl generate-code --format dbt --flow process_orders --output-dir dbt_project --dialect postgres

5. Build Project

Generate datasets from flows and migrations from datasets:

qarion-etl build

This will:

  1. Generate dataset definitions from your flow definitions
  2. Generate migration files from your dataset definitions

6. Apply Migrations

Apply the generated migrations to create database tables:

qarion-etl apply-migrations

Next Steps

Common Use Cases

Change Detection

Track changes in data over time:

flow_type = "change_feed"
[properties]
change_detection_columns = ["status", "amount"]

Delta Publishing

Financial transaction processing:

flow_type = "delta_publishing"
[properties]
delta_method = "merge"

Sessionization

Group events into sessions:

flow_type = "sessionization"
[properties]
user_id_field = "user_id"
timestamp_field = "event_time"
session_timeout = "30 minutes"

Getting Help

  • Documentation: See the Documentation Index
  • Examples: Check the examples/ directory
  • Issues: Report issues on GitHub