Data Ingestion
Data ingestion is the process of loading data from external sources (files, APIs, databases) into Qarion ETL datasets. This guide covers how ingestion works in Qarion ETL and how to configure it for different flow types.
Overview
Ingestion is typically the first step in a data pipeline. It loads raw data from external sources into landing tables, which are then processed by transformation steps.
Ingestion Flow
External Source (Files/API/DB) → Ingestion Step → Landing Table → Transformation Steps
Key Concepts
- Landing Tables: Initial destination for ingested data. These are typically
landingtype tables that receive raw data. - Load Plans: Execution plans that define how data should be loaded from sources.
- Batch Processing: All ingested data is tagged with a
batch_idfor tracking and reprocessing. - Source Types: Supports file-based ingestion (CSV, JSON, JSONL, Parquet) and database ingestion (SQL queries).
Supported Source Types
Qarion ETL supports multiple ingestion source types:
-
File-Based Ingestion: Load data from files (CSV, JSON, JSONL, Parquet)
- Single file loading
- Directory-based loading with pattern matching
- Remote file loading (S3, FTP, SFTP, etc.)
-
Database Ingestion: Load data directly from databases using SQL queries
- Execute SQL queries on source databases
- Support for all Qarion ETL engines (PostgreSQL, MySQL, SQLite, DuckDB, etc.)
- Incremental loading support
- See Database Ingestion Guide for details
How Ingestion Works
1. Flow-Level Ingestion (Automatic)
Most flow types (Change Feed, Delta Publishing, SCD2, etc.) automatically generate ingestion steps when you configure [properties.load]:
# flows/orders_change_feed.toml
id = "orders_change_feed"
flow_type = "change_feed"
[properties.load]
source_path = "data/orders"
file_pattern = "orders_*.csv"
format = "csv"
When you execute this flow:
- Qarion ETL generates a load plan from the flow definition
- The load plan identifies files matching the pattern
- Files are loaded into the landing table with
xt_batch_idassigned - Transformation steps then process the ingested data
2. Task-Level Ingestion (Standard Flows)
Standard flows support explicit ingestion tasks:
# flows/data_pipeline.toml
id = "data_pipeline"
flow_type = "standard"
[[tasks]]
id = "ingest_orders"
type = "ingestion"
name = "Ingest Orders"
target_dataset = "orders_landing"
[tasks.config]
path = "data/orders_{{ batch_id }}.csv"
format = "csv"
delimiter = ","
header = true
Configuration Options
Source Type Selection
The source_type field determines the ingestion method:
file: Load from a single filedirectory: Load from multiple files in a directorydatabase: Load from a database using SQL queries
File-Based Ingestion
Single File Loading
Load data from a single file:
[properties.load]
source_type = "file"
file_path = "data/orders.csv"
format = "csv"
loader_config = {
delimiter = ","
header = true
encoding = "utf-8"
}
Directory-Based Ingestion
Load data from multiple files in a directory:
[properties.load]
source_type = "directory"
directory_path = "data/orders"
file_pattern = "orders_*.csv"
pattern_type = "glob" # or "regex"
format = "csv"
loader_config = {
delimiter = ","
header = true
}
Pattern Types:
glob: Unix-style glob patterns (e.g.,*.csv,orders_*.csv)regex: Regular expressions (e.g.,^orders_\d{4}-\d{2}-\d{2}\.csv$)
Database Ingestion
Load data directly from databases by executing SQL queries:
[properties.load]
source_type = "database"
query = "SELECT * FROM source_table WHERE status = 'active'"
source_engine = {
name = "postgresql",
config = {
host = "source-db.example.com",
port = 5432,
database = "source_db",
user = "readonly_user",
password = "${DB_PASSWORD}"
}
}
Key Features:
- Execute any SQL query on source database
- Support for all Qarion ETL engines (PostgreSQL, MySQL, SQLite, DuckDB, Pandas, Polars, PySpark, SparkSQL)
- Incremental loading with automatic WHERE clause generation
- Chunked processing for large datasets
- Automatic batch_id assignment
Incremental Loading:
[properties.load]
source_type = "database"
query = "SELECT * FROM orders WHERE order_date >= CURRENT_DATE - INTERVAL '7 days'"
source_engine = { ... }
[properties.load.loader_config]
incremental_column = "order_date"
last_value = "2024-01-01"
chunk_size = 10000
For complete database ingestion documentation, see Database Ingestion Guide.
Supported File Formats
CSV
[properties.load]
file_path = "data/orders.csv"
format = "csv"
loader_config = {
delimiter = ","
header = true
encoding = "utf-8"
skip_rows = 0
quote_char = '"'
escape_char = "\\"
}
JSON
[properties.load]
file_path = "data/orders.json"
format = "json"
loader_config = {
orient = "records" # or "values", "index", "table"
lines = false # true for JSONL format
}
JSONL (JSON Lines)
[properties.load]
file_path = "data/orders.jsonl"
format = "jsonl"
# or use json format with lines=true
format = "json"
loader_config = {
lines = true
}
Parquet
[properties.load]
file_path = "data/orders.parquet"
format = "parquet"
loader_config = {
columns = ["id", "name", "amount"] # Optional: specific columns
use_nullable_dtypes = true
}
Ingestion in Different Flow Types
Change Feed Flow
Change Feed flows automatically create an ingestion step that loads data into a landing table:
id = "orders_change_feed"
flow_type = "change_feed"
[input]
primary_key = ["order_id"]
columns = ["order_id", "customer_id", "amount", "status"]
[properties.load]
source_path = "data/orders"
file_pattern = "orders_*.csv"
format = "csv"
Generated Steps:
- Ingestion: Load files →
orders_change_feed_landing - Transformation:
orders_change_feed_landing→orders_change_feed_change_feed
Delta Publishing Flow
Delta Publishing flows also include automatic ingestion:
id = "transactions_delta"
flow_type = "delta_publishing"
[input]
primary_key = ["transaction_id"]
columns = ["transaction_id", "account_id", "amount", "date"]
[properties.load]
source_path = "data/transactions"
file_pattern = "transactions_*.csv"
format = "csv"
Generated Steps:
- Ingestion: Load files →
transactions_delta_landing - Transformation:
transactions_delta_landing→transactions_delta_delta_transaction
Standard Flow
Standard flows support explicit ingestion tasks with full control:
id = "data_pipeline"
flow_type = "standard"
[[tasks]]
id = "ingest_orders"
type = "ingestion"
name = "Ingest Orders from CSV"
target_dataset = "orders_landing"
[tasks.config]
path = "data/orders_{{ batch_id }}.csv"
format = "csv"
delimiter = ","
header = true
[[tasks]]
id = "process_orders"
type = "sql_processing"
source_dataset = "orders_landing"
target_dataset = "orders_processed"
dependencies = ["ingest_orders"]
sql = "SELECT * FROM {{ source_dataset }} WHERE amount > 100"
Batch Processing and Ingestion
All ingested data is tagged with a batch_id for tracking:
- Batch ID Assignment: Each ingestion operation assigns a
batch_idto all loaded records - Batch Tracking: The
xt_batch_idcolumn is automatically added to landing tables - Reprocessing: You can reprocess specific batches by filtering on
xt_batch_id
Example: Batch-Specific File Loading
[[tasks]]
id = "ingest_daily_data"
type = "ingestion"
target_dataset = "daily_data_landing"
[tasks.config]
# Use batch_id in file path
path = "data/daily/{{ ds_nodash }}.csv"
format = "csv"
When executing with batch_id=5 and execution_date=2024-01-15:
- File path becomes:
data/daily/20240115.csv - All loaded records get
xt_batch_id = 5
Execution Flow
Automatic Ingestion (Flow-Level)
- Flow Definition: Define
[properties.load]in your flow - Load Plan Generation: Qarion ETL generates a load plan during execution planning
- File Discovery: System finds files matching the pattern
- Load Execution: Files are loaded into landing table
- Transformation: Subsequent steps process the ingested data
Manual Ingestion (Task-Level)
- Task Definition: Define ingestion task in standard flow
- Task Execution: Task executor loads data from configured source
- Dependency Resolution: Downstream tasks wait for ingestion to complete
- Transformation: Subsequent tasks process the ingested data
Best Practices
1. File Organization
Organize source files consistently:
data/
├── orders/
│ ├── orders_2024-01-01.csv
│ ├── orders_2024-01-02.csv
│ └── orders_2024-01-03.csv
└── transactions/
├── transactions_2024-01-01.csv
└── transactions_2024-01-02.csv
2. Pattern Matching
Use specific patterns to avoid loading unintended files:
# Good: Specific pattern
file_pattern = "orders_*.csv"
# Better: Date-based pattern
file_pattern = "orders_\\d{4}-\\d{2}-\\d{2}\\.csv"
pattern_type = "regex"
3. Error Handling
Handle missing files gracefully:
[properties.load]
file_path = "data/orders_{{ batch_id }}.csv"
format = "csv"
# System will log warning if file doesn't exist
4. Large Files
For large files, consider:
- Using Parquet format for better compression
- Splitting files into smaller batches
- Using directory-based loading with pattern matching
5. Schema Validation
Ensure source files match expected schema:
- Define input columns in flow definition
- Use schema validation in loader config
- Handle missing or extra columns appropriately
Troubleshooting
Files Not Found
Problem: Files matching pattern are not found
Solutions:
- Verify file paths are correct (absolute vs relative)
- Check file pattern syntax (glob vs regex)
- Ensure files exist before execution
- Check file permissions
Schema Mismatches
Problem: Source file columns don't match flow definition
Solutions:
- Verify input column definitions match source files
- Use schema evolution modes (forward, normal)
- Check for column name case sensitivity
- Validate data types match
Encoding Issues
Problem: Special characters not loading correctly
Solutions:
- Specify encoding in loader config:
encoding = "utf-8" - Check source file encoding
- Use appropriate quote/escape characters
Performance Issues
Problem: Ingestion is slow
Solutions:
- Use Parquet format for better performance
- Split large files into smaller batches
- Optimize file patterns to reduce file discovery time
- Consider parallel loading for multiple files
Complete Configuration Examples
Example 1: Simple CSV Ingestion (Change Feed Flow)
Flow Definition:
# flows/orders_ingestion.toml
id = "orders_ingestion"
name = "Orders Ingestion"
flow_type = "change_feed"
namespace = "production"
[input]
primary_key = ["order_id"]
columns = [
{name = "order_id", schema_type = "string", required = true},
{name = "customer_id", schema_type = "string", required = true},
{name = "amount", schema_type = "float", required = true},
{name = "order_date", schema_type = "date", required = true},
{name = "status", schema_type = "string", required = false}
]
[properties.load]
source_path = "data/orders"
file_pattern = "orders_*.csv"
format = "csv"
loader_config = {
delimiter = ","
header = true
encoding = "utf-8"
}
Dataset with Contract Validation:
# datasets/orders_ingestion_landing.toml
id = "orders_ingestion_landing"
name = "Orders Landing Table"
namespace = "production"
[columns]
[columns.order_id]
schema_type = "string"
required = true
primary_key = true
[columns.customer_id]
schema_type = "string"
required = true
[columns.amount]
schema_type = "float"
required = true
[columns.order_date]
schema_type = "date"
required = true
[columns.status]
schema_type = "string"
required = false
[properties]
table_type = "landing"
# Contract validation configuration
[properties.contract]
id = "orders_contract"
mode = "strict"
enabled = true
[[properties.contract.columns]]
name = "order_id"
schema_type = "string"
required = true
nullable = false
[[properties.contract.columns]]
name = "amount"
schema_type = "float"
required = true
nullable = false
min_value = 0
max_value = 1000000
Example 2: Date-Partitioned JSONL Ingestion
Flow Definition:
# flows/events_ingestion.toml
id = "events_ingestion"
name = "Events Ingestion"
flow_type = "change_feed"
namespace = "analytics"
[input]
primary_key = ["event_id"]
columns = [
{name = "event_id", schema_type = "string", required = true},
{name = "user_id", schema_type = "string", required = true},
{name = "event_type", schema_type = "string", required = true},
{name = "timestamp", schema_type = "timestamp", required = true},
{name = "properties", schema_type = "json", required = false}
]
[properties.load]
source_type = "directory"
directory_path = "data/events"
file_pattern = "events_\\d{4}-\\d{2}-\\d{2}\\.jsonl"
pattern_type = "regex"
format = "jsonl"
loader_config = {
encoding = "utf-8"
}
With Template Variables:
[properties.load]
directory_path = "data/events/{{ ds }}/" # Date-based directory
file_pattern = "events_*.jsonl"
format = "jsonl"
Example 3: Standard Flow with Multiple Ingestion Tasks
Complete Flow Definition:
# flows/multi_source_pipeline.toml
id = "multi_source_pipeline"
name = "Multi-Source Pipeline"
flow_type = "standard"
namespace = "production"
[input]
primary_key = ["order_id"]
columns = [
{name = "order_id", schema_type = "string", required = true},
{name = "customer_id", schema_type = "string", required = true},
{name = "amount", schema_type = "float", required = true}
]
[[tasks]]
id = "ingest_orders"
name = "Ingest Orders"
type = "ingestion"
target_dataset_id = "orders_landing"
[tasks.properties]
operation = "ingestion"
target_table_type = "landing"
processing_type = "FULL_REFRESH"
[tasks.config]
path = "data/orders.csv"
format = "csv"
delimiter = ","
header = true
encoding = "utf-8"
[[tasks]]
id = "ingest_customers"
name = "Ingest Customers"
type = "ingestion"
target_dataset_id = "customers_landing"
[tasks.properties]
operation = "ingestion"
target_table_type = "landing"
processing_type = "FULL_REFRESH"
[tasks.config]
path = "data/customers.parquet"
format = "parquet"
[[tasks]]
id = "join_data"
name = "Join Orders and Customers"
type = "transformation"
source_dataset_id = "orders_landing"
target_dataset_id = "orders_with_customers"
dependencies = ["ingest_orders", "ingest_customers"]
[tasks.properties]
operation = "landing_to_staging"
source_table_type = "landing"
target_table_type = "staging"
processing_type = "FULL_REFRESH"
[tasks.config]
sql = """
SELECT
o.order_id,
o.amount,
c.customer_name,
c.email
FROM {{ source_dataset }} o
JOIN customers_landing c
ON o.customer_id = c.customer_id
"""
Example 4: S3 Ingestion with Credential Store
Flow Definition:
# flows/s3_orders_ingestion.toml
id = "s3_orders_ingestion"
name = "S3 Orders Ingestion"
flow_type = "change_feed"
namespace = "production"
[input]
primary_key = ["order_id"]
columns = [
{name = "order_id", schema_type = "string", required = true},
{name = "customer_id", schema_type = "string", required = true},
{name = "amount", schema_type = "float", required = true},
{name = "order_date", schema_type = "date", required = true}
]
[properties.load]
source_path = "s3://my-bucket/data/orders/"
file_pattern = "orders_*.csv"
format = "csv"
# Use credential store (recommended)
credentials = "${credential:aws_prod_creds}"
loader_config = {
delimiter = ","
header = true
encoding = "utf-8"
}
Configuration File:
# qarion-etl.toml
[credential_store]
type = "local_keystore"
[[credentials]]
id = "aws_prod_creds"
name = "AWS Production Credentials"
credential_type = "aws"
Load Operation Tracking
All load operations (file loads, database loads) are automatically tracked in the metadata system. Each load operation is recorded in xt_load_operations with:
- Load type (file_load, database_load)
- Source information (file path or SQL query)
- Target table and dataset
- Execution timing
- Results (rows loaded, success status)
- Loader configuration
You can query load operation history to monitor ingestion:
-- Get recent load operations
SELECT
load_operation_id,
load_type,
source_path,
target_table,
rows_loaded,
status,
start_time
FROM xt_load_operations
ORDER BY start_time DESC
LIMIT 10;
For more information about accessing and using execution metadata, see the Metadata Tracking Guide.
Related Documentation
- Database Ingestion Guide - Complete guide to database ingestion with SQL queries
- Metadata Tracking - Tracking and monitoring all operations and executions
- Flows Guide - Understanding flow types and configuration
- Standard Flow Tasks - Task-based ingestion
- Batch Processing - Batch processing concepts
- Core Concepts - Fundamental Qarion ETL concepts
- Configuration - Configuration options