Skip to main content

Database Ingestion

A comprehensive guide to ingesting data from databases in Qarion ETL, enabling direct SQL-based data loading from source databases.

Overview

Database ingestion allows you to load data directly from source databases by executing SQL queries. This is useful for:

  • Extracting data from operational databases
  • Loading data from data warehouses
  • Incremental data synchronization
  • Cross-database data migration

Quick Start

Basic Database Ingestion

# flows/my_flow.toml
id = "database_ingestion_flow"
name = "Database Ingestion Flow"
flow_type = "standard"

[properties.load]
source_type = "database"
query = "SELECT * FROM source_table WHERE status = 'active'"
source_engine = {
name = "postgresql",
config = {
host = "source-db.example.com",
port = 5432,
database = "source_db",
user = "readonly_user",
password = "${DB_PASSWORD}"
}
}

Configuration

Source Engine Configuration

The source_engine configuration defines how to connect to the source database. It follows the same structure as the main engine configuration:

source_engine = {
name = "postgresql", # Engine type: postgresql, mysql, sqlite, duckdb, etc.
config = {
# Connection parameters depend on engine type
host = "localhost",
port = 5432,
database = "source_db",
user = "username",
password = "password"
}
}

Supported Source Engines

All engines supported by Qarion ETL can be used as source engines:

  • PostgreSQL: postgresql
  • MySQL: mysql
  • SQLite: sqlite
  • DuckDB: duckdb
  • Pandas: pandas (in-memory or local)
  • Polars: polars
  • PySpark: pyspark
  • SparkSQL: sparksql

Query Configuration

The query field contains the SQL query to execute on the source database:

query = """
SELECT
id,
name,
email,
created_at,
updated_at
FROM users
WHERE status = 'active'
ORDER BY created_at DESC
"""

Incremental Loading

Database ingestion supports incremental loading using the incremental_column and last_value configuration:

[properties.load]
source_type = "database"
query = "SELECT * FROM orders WHERE order_date >= CURRENT_DATE - INTERVAL '7 days'"
source_engine = { ... }

[properties.load.loader_config]
incremental_column = "order_date"
last_value = "2024-01-01" # Will be updated automatically
chunk_size = 10000

The loader will automatically append a WHERE clause to filter records where incremental_column > last_value.

Examples

Example 1: Full Table Load

id = "full_table_load"
flow_type = "standard"

[properties.load]
source_type = "database"
query = "SELECT * FROM products"
source_engine = {
name = "postgresql",
config = {
host = "prod-db.example.com",
database = "production",
user = "etl_user",
password = "${PROD_DB_PASSWORD}"
}
}

Example 2: Filtered Load

[properties.load]
source_type = "database"
query = """
SELECT
customer_id,
order_id,
order_date,
total_amount,
status
FROM orders
WHERE order_date >= '2024-01-01'
AND status IN ('completed', 'shipped')
"""
source_engine = { ... }

Example 3: Incremental Load

[properties.load]
source_type = "database"
query = "SELECT * FROM transactions WHERE transaction_date > '2024-01-01'"
source_engine = { ... }

[properties.load.loader_config]
incremental_column = "transaction_date"
last_value = "2024-01-01"
chunk_size = 5000

Example 4: Joined Tables

[properties.load]
source_type = "database"
query = """
SELECT
o.order_id,
o.order_date,
o.total,
c.customer_name,
c.email
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.order_date >= CURRENT_DATE - INTERVAL '30 days'
"""
source_engine = { ... }

Example 5: Aggregated Data

[properties.load]
source_type = "database"
query = """
SELECT
DATE_TRUNC('day', order_date) as order_day,
COUNT(*) as order_count,
SUM(total_amount) as daily_revenue,
AVG(total_amount) as avg_order_value
FROM orders
WHERE order_date >= CURRENT_DATE - INTERVAL '90 days'
GROUP BY DATE_TRUNC('day', order_date)
ORDER BY order_day DESC
"""
source_engine = { ... }

Advanced Configuration

Chunk Size

Control the number of rows inserted per batch:

[properties.load.loader_config]
chunk_size = 10000 # Insert 10,000 rows per batch

Batch ID

The batch_id is automatically assigned to all loaded records:

# In flow execution
batch_id = 123 # All records will have batch_id = 123

Custom Loader Configuration

Additional configuration options:

[properties.load.loader_config]
chunk_size = 5000
incremental_column = "updated_at"
last_value = "2024-01-01 00:00:00"

Flow Definition Example

Complete flow definition with database ingestion:

id = "customer_data_ingestion"
name = "Customer Data Ingestion"
flow_type = "standard"
namespace = "customer"

[input]
columns = [
{ name = "customer_id", type = "integer" },
{ name = "customer_name", type = "string" },
{ name = "email", type = "string" },
{ name = "created_at", type = "timestamp" },
{ name = "updated_at", type = "timestamp" }
]
primary_key = "customer_id"

[properties.load]
source_type = "database"
query = """
SELECT
customer_id,
customer_name,
email,
created_at,
updated_at
FROM customers
WHERE updated_at >= CURRENT_DATE - INTERVAL '1 day'
"""
source_engine = {
name = "postgresql",
config = {
host = "source-db.example.com",
port = 5432,
database = "source_db",
user = "readonly_user",
password = "${SOURCE_DB_PASSWORD}"
}
}

[properties.load.loader_config]
incremental_column = "updated_at"
chunk_size = 10000

Best Practices

  1. Use Read-Only Credentials:

    user = "readonly_user"  # Use read-only database user
  2. Filter at Source:

    -- Good: Filter in query
    SELECT * FROM orders WHERE order_date >= '2024-01-01'

    -- Avoid: Loading all data and filtering later
    SELECT * FROM orders
  3. Use Incremental Loading:

    incremental_column = "updated_at"
    last_value = "2024-01-01"
  4. Optimize Query Performance:

    • Add appropriate WHERE clauses
    • Use indexes on filter columns
    • Limit result set size when possible
  5. Handle Large Datasets:

    chunk_size = 10000  # Process in chunks
  6. Use Connection Pooling:

    • Configure connection pool size in source_engine config
    • Reuse connections when possible

Troubleshooting

Connection Errors

Problem: Cannot connect to source database.

Solution:

  • Verify connection parameters (host, port, database, user, password)
  • Check network connectivity
  • Verify credentials
  • Check firewall rules

Query Errors

Problem: SQL query fails on source database.

Solution:

  • Test query directly on source database
  • Verify table/column names
  • Check SQL syntax compatibility with source engine
  • Ensure user has SELECT permissions

Performance Issues

Problem: Ingestion is slow.

Solution:

  • Increase chunk_size
  • Optimize source query (add indexes, filters)
  • Use incremental loading
  • Consider parallel processing for large datasets

Data Type Mismatches

Problem: Data types don't match target schema.

Solution:

  • Cast columns in source query:
    SELECT
    id::INTEGER,
    name::TEXT,
    created_at::TIMESTAMP
    FROM source_table
  • Update target dataset schema to match source