Database Ingestion
A comprehensive guide to ingesting data from databases in Qarion ETL, enabling direct SQL-based data loading from source databases.
Overview
Database ingestion allows you to load data directly from source databases by executing SQL queries. This is useful for:
- Extracting data from operational databases
- Loading data from data warehouses
- Incremental data synchronization
- Cross-database data migration
Quick Start
Basic Database Ingestion
# flows/my_flow.toml
id = "database_ingestion_flow"
name = "Database Ingestion Flow"
flow_type = "standard"
[properties.load]
source_type = "database"
query = "SELECT * FROM source_table WHERE status = 'active'"
source_engine = {
name = "postgresql",
config = {
host = "source-db.example.com",
port = 5432,
database = "source_db",
user = "readonly_user",
password = "${DB_PASSWORD}"
}
}
Configuration
Source Engine Configuration
The source_engine configuration defines how to connect to the source database. It follows the same structure as the main engine configuration:
source_engine = {
name = "postgresql", # Engine type: postgresql, mysql, sqlite, duckdb, etc.
config = {
# Connection parameters depend on engine type
host = "localhost",
port = 5432,
database = "source_db",
user = "username",
password = "password"
}
}
Supported Source Engines
All engines supported by Qarion ETL can be used as source engines:
- PostgreSQL:
postgresql - MySQL:
mysql - SQLite:
sqlite - DuckDB:
duckdb - Pandas:
pandas(in-memory or local) - Polars:
polars - PySpark:
pyspark - SparkSQL:
sparksql
Query Configuration
The query field contains the SQL query to execute on the source database:
query = """
SELECT
id,
name,
email,
created_at,
updated_at
FROM users
WHERE status = 'active'
ORDER BY created_at DESC
"""
Incremental Loading
Database ingestion supports incremental loading using the incremental_column and last_value configuration:
[properties.load]
source_type = "database"
query = "SELECT * FROM orders WHERE order_date >= CURRENT_DATE - INTERVAL '7 days'"
source_engine = { ... }
[properties.load.loader_config]
incremental_column = "order_date"
last_value = "2024-01-01" # Will be updated automatically
chunk_size = 10000
The loader will automatically append a WHERE clause to filter records where incremental_column > last_value.
Examples
Example 1: Full Table Load
id = "full_table_load"
flow_type = "standard"
[properties.load]
source_type = "database"
query = "SELECT * FROM products"
source_engine = {
name = "postgresql",
config = {
host = "prod-db.example.com",
database = "production",
user = "etl_user",
password = "${PROD_DB_PASSWORD}"
}
}
Example 2: Filtered Load
[properties.load]
source_type = "database"
query = """
SELECT
customer_id,
order_id,
order_date,
total_amount,
status
FROM orders
WHERE order_date >= '2024-01-01'
AND status IN ('completed', 'shipped')
"""
source_engine = { ... }
Example 3: Incremental Load
[properties.load]
source_type = "database"
query = "SELECT * FROM transactions WHERE transaction_date > '2024-01-01'"
source_engine = { ... }
[properties.load.loader_config]
incremental_column = "transaction_date"
last_value = "2024-01-01"
chunk_size = 5000
Example 4: Joined Tables
[properties.load]
source_type = "database"
query = """
SELECT
o.order_id,
o.order_date,
o.total,
c.customer_name,
c.email
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.order_date >= CURRENT_DATE - INTERVAL '30 days'
"""
source_engine = { ... }
Example 5: Aggregated Data
[properties.load]
source_type = "database"
query = """
SELECT
DATE_TRUNC('day', order_date) as order_day,
COUNT(*) as order_count,
SUM(total_amount) as daily_revenue,
AVG(total_amount) as avg_order_value
FROM orders
WHERE order_date >= CURRENT_DATE - INTERVAL '90 days'
GROUP BY DATE_TRUNC('day', order_date)
ORDER BY order_day DESC
"""
source_engine = { ... }
Advanced Configuration
Chunk Size
Control the number of rows inserted per batch:
[properties.load.loader_config]
chunk_size = 10000 # Insert 10,000 rows per batch
Batch ID
The batch_id is automatically assigned to all loaded records:
# In flow execution
batch_id = 123 # All records will have batch_id = 123
Custom Loader Configuration
Additional configuration options:
[properties.load.loader_config]
chunk_size = 5000
incremental_column = "updated_at"
last_value = "2024-01-01 00:00:00"
Flow Definition Example
Complete flow definition with database ingestion:
id = "customer_data_ingestion"
name = "Customer Data Ingestion"
flow_type = "standard"
namespace = "customer"
[input]
columns = [
{ name = "customer_id", type = "integer" },
{ name = "customer_name", type = "string" },
{ name = "email", type = "string" },
{ name = "created_at", type = "timestamp" },
{ name = "updated_at", type = "timestamp" }
]
primary_key = "customer_id"
[properties.load]
source_type = "database"
query = """
SELECT
customer_id,
customer_name,
email,
created_at,
updated_at
FROM customers
WHERE updated_at >= CURRENT_DATE - INTERVAL '1 day'
"""
source_engine = {
name = "postgresql",
config = {
host = "source-db.example.com",
port = 5432,
database = "source_db",
user = "readonly_user",
password = "${SOURCE_DB_PASSWORD}"
}
}
[properties.load.loader_config]
incremental_column = "updated_at"
chunk_size = 10000
Best Practices
-
Use Read-Only Credentials:
user = "readonly_user" # Use read-only database user -
Filter at Source:
-- Good: Filter in query
SELECT * FROM orders WHERE order_date >= '2024-01-01'
-- Avoid: Loading all data and filtering later
SELECT * FROM orders -
Use Incremental Loading:
incremental_column = "updated_at"
last_value = "2024-01-01" -
Optimize Query Performance:
- Add appropriate WHERE clauses
- Use indexes on filter columns
- Limit result set size when possible
-
Handle Large Datasets:
chunk_size = 10000 # Process in chunks -
Use Connection Pooling:
- Configure connection pool size in source_engine config
- Reuse connections when possible
Troubleshooting
Connection Errors
Problem: Cannot connect to source database.
Solution:
- Verify connection parameters (host, port, database, user, password)
- Check network connectivity
- Verify credentials
- Check firewall rules
Query Errors
Problem: SQL query fails on source database.
Solution:
- Test query directly on source database
- Verify table/column names
- Check SQL syntax compatibility with source engine
- Ensure user has SELECT permissions
Performance Issues
Problem: Ingestion is slow.
Solution:
- Increase
chunk_size - Optimize source query (add indexes, filters)
- Use incremental loading
- Consider parallel processing for large datasets
Data Type Mismatches
Problem: Data types don't match target schema.
Solution:
- Cast columns in source query:
SELECT
id::INTEGER,
name::TEXT,
created_at::TIMESTAMP
FROM source_table - Update target dataset schema to match source