Skip to main content

Data Ingestion

Data ingestion is the process of loading data from external sources (files, APIs, databases) into Qarion ETL datasets. This guide covers how ingestion works in Qarion ETL and how to configure it for different flow types.

Overview

Ingestion is typically the first step in a data pipeline. It loads raw data from external sources into landing tables, which are then processed by transformation steps.

Ingestion Flow

External Source (Files/API/DB) → Ingestion Step → Landing Table → Transformation Steps

Key Concepts

  • Landing Tables: Initial destination for ingested data. These are typically landing type tables that receive raw data.
  • Load Plans: Execution plans that define how data should be loaded from sources.
  • Batch Processing: All ingested data is tagged with a batch_id for tracking and reprocessing.
  • Source Types: Supports file-based ingestion (CSV, JSON, JSONL, Parquet) and database ingestion (SQL queries).

Supported Source Types

Qarion ETL supports multiple ingestion source types:

  1. File-Based Ingestion: Load data from files (CSV, JSON, JSONL, Parquet)

    • Single file loading
    • Directory-based loading with pattern matching
    • Remote file loading (S3, FTP, SFTP, etc.)
  2. Database Ingestion: Load data directly from databases using SQL queries

    • Execute SQL queries on source databases
    • Support for all Qarion ETL engines (PostgreSQL, MySQL, SQLite, DuckDB, etc.)
    • Incremental loading support
    • See Database Ingestion Guide for details

How Ingestion Works

1. Flow-Level Ingestion (Automatic)

Most flow types (Change Feed, Delta Publishing, SCD2, etc.) automatically generate ingestion steps when you configure [properties.load]:

# flows/orders_change_feed.toml
id = "orders_change_feed"
flow_type = "change_feed"

[properties.load]
source_path = "data/orders"
file_pattern = "orders_*.csv"
format = "csv"

When you execute this flow:

  1. Qarion ETL generates a load plan from the flow definition
  2. The load plan identifies files matching the pattern
  3. Files are loaded into the landing table with xt_batch_id assigned
  4. Transformation steps then process the ingested data

2. Task-Level Ingestion (Standard Flows)

Standard flows support explicit ingestion tasks:

# flows/data_pipeline.toml
id = "data_pipeline"
flow_type = "standard"

[[tasks]]
id = "ingest_orders"
type = "ingestion"
name = "Ingest Orders"
target_dataset = "orders_landing"
[tasks.config]
path = "data/orders_{{ batch_id }}.csv"
format = "csv"
delimiter = ","
header = true

Configuration Options

Source Type Selection

The source_type field determines the ingestion method:

  • file: Load from a single file
  • directory: Load from multiple files in a directory
  • database: Load from a database using SQL queries

File-Based Ingestion

Single File Loading

Load data from a single file:

[properties.load]
source_type = "file"
file_path = "data/orders.csv"
format = "csv"
loader_config = {
delimiter = ","
header = true
encoding = "utf-8"
}

Directory-Based Ingestion

Load data from multiple files in a directory:

[properties.load]
source_type = "directory"
directory_path = "data/orders"
file_pattern = "orders_*.csv"
pattern_type = "glob" # or "regex"
format = "csv"
loader_config = {
delimiter = ","
header = true
}

Pattern Types:

  • glob: Unix-style glob patterns (e.g., *.csv, orders_*.csv)
  • regex: Regular expressions (e.g., ^orders_\d{4}-\d{2}-\d{2}\.csv$)

Database Ingestion

Load data directly from databases by executing SQL queries:

[properties.load]
source_type = "database"
query = "SELECT * FROM source_table WHERE status = 'active'"
source_engine = {
name = "postgresql",
config = {
host = "source-db.example.com",
port = 5432,
database = "source_db",
user = "readonly_user",
password = "${DB_PASSWORD}"
}
}

Key Features:

  • Execute any SQL query on source database
  • Support for all Qarion ETL engines (PostgreSQL, MySQL, SQLite, DuckDB, Pandas, Polars, PySpark, SparkSQL)
  • Incremental loading with automatic WHERE clause generation
  • Chunked processing for large datasets
  • Automatic batch_id assignment

Incremental Loading:

[properties.load]
source_type = "database"
query = "SELECT * FROM orders WHERE order_date >= CURRENT_DATE - INTERVAL '7 days'"
source_engine = { ... }

[properties.load.loader_config]
incremental_column = "order_date"
last_value = "2024-01-01"
chunk_size = 10000

For complete database ingestion documentation, see Database Ingestion Guide.

Supported File Formats

CSV

[properties.load]
file_path = "data/orders.csv"
format = "csv"
loader_config = {
delimiter = ","
header = true
encoding = "utf-8"
skip_rows = 0
quote_char = '"'
escape_char = "\\"
}

JSON

[properties.load]
file_path = "data/orders.json"
format = "json"
loader_config = {
orient = "records" # or "values", "index", "table"
lines = false # true for JSONL format
}

JSONL (JSON Lines)

[properties.load]
file_path = "data/orders.jsonl"
format = "jsonl"
# or use json format with lines=true
format = "json"
loader_config = {
lines = true
}

Parquet

[properties.load]
file_path = "data/orders.parquet"
format = "parquet"
loader_config = {
columns = ["id", "name", "amount"] # Optional: specific columns
use_nullable_dtypes = true
}

Ingestion in Different Flow Types

Change Feed Flow

Change Feed flows automatically create an ingestion step that loads data into a landing table:

id = "orders_change_feed"
flow_type = "change_feed"

[input]
primary_key = ["order_id"]
columns = ["order_id", "customer_id", "amount", "status"]

[properties.load]
source_path = "data/orders"
file_pattern = "orders_*.csv"
format = "csv"

Generated Steps:

  1. Ingestion: Load files → orders_change_feed_landing
  2. Transformation: orders_change_feed_landingorders_change_feed_change_feed

Delta Publishing Flow

Delta Publishing flows also include automatic ingestion:

id = "transactions_delta"
flow_type = "delta_publishing"

[input]
primary_key = ["transaction_id"]
columns = ["transaction_id", "account_id", "amount", "date"]

[properties.load]
source_path = "data/transactions"
file_pattern = "transactions_*.csv"
format = "csv"

Generated Steps:

  1. Ingestion: Load files → transactions_delta_landing
  2. Transformation: transactions_delta_landingtransactions_delta_delta_transaction

Standard Flow

Standard flows support explicit ingestion tasks with full control:

id = "data_pipeline"
flow_type = "standard"

[[tasks]]
id = "ingest_orders"
type = "ingestion"
name = "Ingest Orders from CSV"
target_dataset = "orders_landing"
[tasks.config]
path = "data/orders_{{ batch_id }}.csv"
format = "csv"
delimiter = ","
header = true

[[tasks]]
id = "process_orders"
type = "sql_processing"
source_dataset = "orders_landing"
target_dataset = "orders_processed"
dependencies = ["ingest_orders"]
sql = "SELECT * FROM {{ source_dataset }} WHERE amount > 100"

Batch Processing and Ingestion

All ingested data is tagged with a batch_id for tracking:

  • Batch ID Assignment: Each ingestion operation assigns a batch_id to all loaded records
  • Batch Tracking: The xt_batch_id column is automatically added to landing tables
  • Reprocessing: You can reprocess specific batches by filtering on xt_batch_id

Example: Batch-Specific File Loading

[[tasks]]
id = "ingest_daily_data"
type = "ingestion"
target_dataset = "daily_data_landing"
[tasks.config]
# Use batch_id in file path
path = "data/daily/{{ ds_nodash }}.csv"
format = "csv"

When executing with batch_id=5 and execution_date=2024-01-15:

  • File path becomes: data/daily/20240115.csv
  • All loaded records get xt_batch_id = 5

Execution Flow

Automatic Ingestion (Flow-Level)

  1. Flow Definition: Define [properties.load] in your flow
  2. Load Plan Generation: Qarion ETL generates a load plan during execution planning
  3. File Discovery: System finds files matching the pattern
  4. Load Execution: Files are loaded into landing table
  5. Transformation: Subsequent steps process the ingested data

Manual Ingestion (Task-Level)

  1. Task Definition: Define ingestion task in standard flow
  2. Task Execution: Task executor loads data from configured source
  3. Dependency Resolution: Downstream tasks wait for ingestion to complete
  4. Transformation: Subsequent tasks process the ingested data

Best Practices

1. File Organization

Organize source files consistently:

data/
├── orders/
│ ├── orders_2024-01-01.csv
│ ├── orders_2024-01-02.csv
│ └── orders_2024-01-03.csv
└── transactions/
├── transactions_2024-01-01.csv
└── transactions_2024-01-02.csv

2. Pattern Matching

Use specific patterns to avoid loading unintended files:

# Good: Specific pattern
file_pattern = "orders_*.csv"

# Better: Date-based pattern
file_pattern = "orders_\\d{4}-\\d{2}-\\d{2}\\.csv"
pattern_type = "regex"

3. Error Handling

Handle missing files gracefully:

[properties.load]
file_path = "data/orders_{{ batch_id }}.csv"
format = "csv"
# System will log warning if file doesn't exist

4. Large Files

For large files, consider:

  • Using Parquet format for better compression
  • Splitting files into smaller batches
  • Using directory-based loading with pattern matching

5. Schema Validation

Ensure source files match expected schema:

  • Define input columns in flow definition
  • Use schema validation in loader config
  • Handle missing or extra columns appropriately

Troubleshooting

Files Not Found

Problem: Files matching pattern are not found

Solutions:

  • Verify file paths are correct (absolute vs relative)
  • Check file pattern syntax (glob vs regex)
  • Ensure files exist before execution
  • Check file permissions

Schema Mismatches

Problem: Source file columns don't match flow definition

Solutions:

  • Verify input column definitions match source files
  • Use schema evolution modes (forward, normal)
  • Check for column name case sensitivity
  • Validate data types match

Encoding Issues

Problem: Special characters not loading correctly

Solutions:

  • Specify encoding in loader config: encoding = "utf-8"
  • Check source file encoding
  • Use appropriate quote/escape characters

Performance Issues

Problem: Ingestion is slow

Solutions:

  • Use Parquet format for better performance
  • Split large files into smaller batches
  • Optimize file patterns to reduce file discovery time
  • Consider parallel loading for multiple files

Complete Configuration Examples

Example 1: Simple CSV Ingestion (Change Feed Flow)

Flow Definition:

# flows/orders_ingestion.toml
id = "orders_ingestion"
name = "Orders Ingestion"
flow_type = "change_feed"
namespace = "production"

[input]
primary_key = ["order_id"]
columns = [
{name = "order_id", schema_type = "string", required = true},
{name = "customer_id", schema_type = "string", required = true},
{name = "amount", schema_type = "float", required = true},
{name = "order_date", schema_type = "date", required = true},
{name = "status", schema_type = "string", required = false}
]

[properties.load]
source_path = "data/orders"
file_pattern = "orders_*.csv"
format = "csv"
loader_config = {
delimiter = ","
header = true
encoding = "utf-8"
}

Dataset with Contract Validation:

# datasets/orders_ingestion_landing.toml
id = "orders_ingestion_landing"
name = "Orders Landing Table"
namespace = "production"

[columns]
[columns.order_id]
schema_type = "string"
required = true
primary_key = true

[columns.customer_id]
schema_type = "string"
required = true

[columns.amount]
schema_type = "float"
required = true

[columns.order_date]
schema_type = "date"
required = true

[columns.status]
schema_type = "string"
required = false

[properties]
table_type = "landing"

# Contract validation configuration
[properties.contract]
id = "orders_contract"
mode = "strict"
enabled = true
[[properties.contract.columns]]
name = "order_id"
schema_type = "string"
required = true
nullable = false
[[properties.contract.columns]]
name = "amount"
schema_type = "float"
required = true
nullable = false
min_value = 0
max_value = 1000000

Example 2: Date-Partitioned JSONL Ingestion

Flow Definition:

# flows/events_ingestion.toml
id = "events_ingestion"
name = "Events Ingestion"
flow_type = "change_feed"
namespace = "analytics"

[input]
primary_key = ["event_id"]
columns = [
{name = "event_id", schema_type = "string", required = true},
{name = "user_id", schema_type = "string", required = true},
{name = "event_type", schema_type = "string", required = true},
{name = "timestamp", schema_type = "timestamp", required = true},
{name = "properties", schema_type = "json", required = false}
]

[properties.load]
source_type = "directory"
directory_path = "data/events"
file_pattern = "events_\\d{4}-\\d{2}-\\d{2}\\.jsonl"
pattern_type = "regex"
format = "jsonl"
loader_config = {
encoding = "utf-8"
}

With Template Variables:

[properties.load]
directory_path = "data/events/{{ ds }}/" # Date-based directory
file_pattern = "events_*.jsonl"
format = "jsonl"

Example 3: Standard Flow with Multiple Ingestion Tasks

Complete Flow Definition:

# flows/multi_source_pipeline.toml
id = "multi_source_pipeline"
name = "Multi-Source Pipeline"
flow_type = "standard"
namespace = "production"

[input]
primary_key = ["order_id"]
columns = [
{name = "order_id", schema_type = "string", required = true},
{name = "customer_id", schema_type = "string", required = true},
{name = "amount", schema_type = "float", required = true}
]

[[tasks]]
id = "ingest_orders"
name = "Ingest Orders"
type = "ingestion"
target_dataset_id = "orders_landing"
[tasks.properties]
operation = "ingestion"
target_table_type = "landing"
processing_type = "FULL_REFRESH"
[tasks.config]
path = "data/orders.csv"
format = "csv"
delimiter = ","
header = true
encoding = "utf-8"

[[tasks]]
id = "ingest_customers"
name = "Ingest Customers"
type = "ingestion"
target_dataset_id = "customers_landing"
[tasks.properties]
operation = "ingestion"
target_table_type = "landing"
processing_type = "FULL_REFRESH"
[tasks.config]
path = "data/customers.parquet"
format = "parquet"

[[tasks]]
id = "join_data"
name = "Join Orders and Customers"
type = "transformation"
source_dataset_id = "orders_landing"
target_dataset_id = "orders_with_customers"
dependencies = ["ingest_orders", "ingest_customers"]
[tasks.properties]
operation = "landing_to_staging"
source_table_type = "landing"
target_table_type = "staging"
processing_type = "FULL_REFRESH"
[tasks.config]
sql = """
SELECT
o.order_id,
o.amount,
c.customer_name,
c.email
FROM {{ source_dataset }} o
JOIN customers_landing c
ON o.customer_id = c.customer_id
"""

Example 4: S3 Ingestion with Credential Store

Flow Definition:

# flows/s3_orders_ingestion.toml
id = "s3_orders_ingestion"
name = "S3 Orders Ingestion"
flow_type = "change_feed"
namespace = "production"

[input]
primary_key = ["order_id"]
columns = [
{name = "order_id", schema_type = "string", required = true},
{name = "customer_id", schema_type = "string", required = true},
{name = "amount", schema_type = "float", required = true},
{name = "order_date", schema_type = "date", required = true}
]

[properties.load]
source_path = "s3://my-bucket/data/orders/"
file_pattern = "orders_*.csv"
format = "csv"
# Use credential store (recommended)
credentials = "${credential:aws_prod_creds}"
loader_config = {
delimiter = ","
header = true
encoding = "utf-8"
}

Configuration File:

# qarion-etl.toml
[credential_store]
type = "local_keystore"

[[credentials]]
id = "aws_prod_creds"
name = "AWS Production Credentials"
credential_type = "aws"

Load Operation Tracking

All load operations (file loads, database loads) are automatically tracked in the metadata system. Each load operation is recorded in xt_load_operations with:

  • Load type (file_load, database_load)
  • Source information (file path or SQL query)
  • Target table and dataset
  • Execution timing
  • Results (rows loaded, success status)
  • Loader configuration

You can query load operation history to monitor ingestion:

-- Get recent load operations
SELECT
load_operation_id,
load_type,
source_path,
target_table,
rows_loaded,
status,
start_time
FROM xt_load_operations
ORDER BY start_time DESC
LIMIT 10;

For more information about accessing and using execution metadata, see the Metadata Tracking Guide.