Skip to main content

Data Export Guide

Overview

Qarion ETL provides a comprehensive export system for exporting data from tables to files in various formats. This complements the file loading system, allowing you to export processed data for downstream consumption, reporting, or integration with other systems.

There are two ways to export data in Qarion ETL:

  1. Export Flow: Declarative flow-based export with multiple export modes (full, batch, incremental, changes_only)
  2. Export Executor API: Programmatic export using Python API

Quick Start

The easiest way to export data is using an Export Flow:

# flows/export_orders.toml
id = "export_orders"
name = "Export Orders"
flow_type = "export_flow"
namespace = "public"

[input]
columns = [] # Export flows don't require input columns

[properties.export]
source_dataset_id = "orders_staging"
file_path = "exports/orders.csv"
export_mode = "full"
format = "csv"
exporter_config = {
delimiter = ","
include_header = true
encoding = "utf-8"
}

Then run the export flow:

qarion-etl run --flow export_orders

For complete Export Flow documentation, see Export Flow Reference.

Using Export Executor API

Export a table to a CSV file programmatically:

from exporters import ExportExecutor
from engines import EngineLoader

# Load and connect to engine
engine = EngineLoader.load({'name': 'sqlite', 'config': {'path': 'data.db'}})
engine.connect()

# Create export executor
executor = ExportExecutor(engine)

# Export table
result = executor.export_table(
source_table='orders',
file_path='exports/orders.csv'
)

print(f"Exported {result.rows_exported} rows")

Supported Formats

CSV

Export to CSV format (comma-separated values):

result = executor.export_table(
source_table='orders',
file_path='exports/orders.csv',
format='csv',
config={
'delimiter': ',',
'include_header': True,
'encoding': 'utf-8'
}
)

Configuration Options:

  • delimiter: Field delimiter (default: ',')
  • include_header: Include header row (default: True)
  • encoding: File encoding (default: 'utf-8')
  • columns: List of columns to export (default: all columns)

JSON

Export to JSON format:

# Standard JSON (array of objects)
result = executor.export_table(
source_table='orders',
file_path='exports/orders.json',
format='json',
config={
'indent': 2, # Pretty printing
'jsonl': False
}
)

# JSONL format (one object per line)
result = executor.export_table(
source_table='orders',
file_path='exports/orders.jsonl',
format='json',
config={'jsonl': True}
)

Configuration Options:

  • indent: Indentation level for pretty printing (default: None)
  • jsonl: Use JSONL format (default: False)
  • columns: List of columns to export (default: all columns)
  • encoding: File encoding (default: 'utf-8')

Parquet

Export to Parquet format (columnar storage):

result = executor.export_table(
source_table='analytics_data',
file_path='exports/analytics.parquet',
format='parquet',
config={
'compression': 'snappy',
'columns': ['id', 'value', 'timestamp']
}
)

Configuration Options:

  • compression: Compression codec (default: 'snappy')
  • columns: List of columns to export (default: all columns)
  • index: Include index (default: False)

Note: Parquet export requires pandas and pyarrow:

pip install pandas pyarrow

Exporting Filtered Data

Use SQL queries to export filtered or transformed data:

# Export recent orders
result = executor.export_table(
source_table='orders',
file_path='exports/recent_orders.csv',
query="SELECT * FROM orders WHERE order_date > '2024-01-01'"
)

# Export aggregated data
result = executor.export_table(
source_table='orders',
file_path='exports/daily_totals.csv',
query="""
SELECT
DATE(order_date) as date,
COUNT(*) as order_count,
SUM(total) as total_revenue
FROM orders
GROUP BY DATE(order_date)
ORDER BY date
"""
)

Exporting Specific Columns

Export only the columns you need:

result = executor.export_table(
source_table='orders',
file_path='exports/orders_summary.csv',
config={
'columns': ['id', 'order_date', 'total', 'status']
}
)

Format Auto-Detection

The export format is automatically detected from the file extension:

# Format detected from extension
executor.export_table('orders', 'exports/orders.csv') # CSV
executor.export_table('orders', 'exports/orders.json') # JSON
executor.export_table('orders', 'exports/orders.parquet') # Parquet

Error Handling

Export operations return results with success status:

result = executor.export_table(
source_table='orders',
file_path='exports/orders.csv'
)

if result.success:
print(f"Successfully exported {result.rows_exported} rows")
print(f"File: {result.file_path}")
else:
print(f"Export failed: {result.error}")

Use Cases

1. Data Sharing

Export processed data for sharing with other teams or systems:

# Export to CSV for Excel users
executor.export_table(
source_table='monthly_report',
file_path='reports/monthly_report.csv',
config={'include_header': True, 'encoding': 'utf-8-sig'}
)

2. API Integration

Export to JSON for API consumption:

executor.export_table(
source_table='api_data',
file_path='api/export.json',
config={'indent': 2, 'ensure_ascii': False}
)

3. Analytical Processing

Export to Parquet for analytical workloads:

executor.export_table(
source_table='analytics_data',
file_path='analytics/data.parquet',
config={'compression': 'snappy'}
)

4. Incremental Exports

Export only new or changed data:

executor.export_table(
source_table='orders',
file_path='exports/new_orders.csv',
query="SELECT * FROM orders WHERE export_status IS NULL"
)

Best Practices

  1. Use Appropriate Formats: Choose formats based on your use case

    • CSV for Excel compatibility
    • JSON for API/web integration
    • Parquet for analytical processing
  2. Filter Before Export: Use queries to filter data rather than exporting entire tables

  3. Select Columns: Export only the columns you need to reduce file size

  4. Handle Errors: Always check result.success and handle errors appropriately

  5. Use Descriptive Paths: Use clear, descriptive file paths for exports

Export Flow Configuration Examples

Full Export

Export all data from a dataset:

# flows/export_orders_full.toml
id = "export_orders_full"
name = "Export Orders (Full)"
flow_type = "export_flow"

[properties.export]
source_dataset_id = "orders_staging"
file_path = "exports/orders_full.csv"
export_mode = "full"
format = "csv"
exporter_config = {
delimiter = ","
include_header = true
encoding = "utf-8"
}

Incremental Export

Export only new/changed records:

# flows/export_orders_incremental.toml
id = "export_orders_incremental"
name = "Export Orders (Incremental)"
flow_type = "export_flow"

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders_incremental_{batch_id}.csv"
export_mode = "incremental"
format = "csv"

Batch Export

Export specific batch:

# flows/export_orders_batch.toml
id = "export_orders_batch"
name = "Export Orders (Batch)"
flow_type = "export_flow"

[properties.export]
source_dataset_id = "orders_staging"
file_path = "exports/orders_batch_{batch_id}.csv"
export_mode = "batch"
batch_id = 42
format = "csv"

Changes Only Export

Export only changed records from change feed:

# flows/export_orders_changes.toml
id = "export_orders_changes"
name = "Export Order Changes"
flow_type = "export_flow"

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders_changes_{batch_id}.csv"
export_mode = "changes_only"
format = "csv"

Parquet Export

Export to Parquet format:

# flows/export_orders_parquet.toml
id = "export_orders_parquet"
name = "Export Orders to Parquet"
flow_type = "export_flow"

[properties.export]
source_dataset_id = "orders_staging"
file_path = "exports/orders.parquet"
export_mode = "full"
format = "parquet"
exporter_config = {
compression = "snappy"
columns = ["id", "customer_id", "amount", "order_date"]
}

JSON Export

Export to JSON format:

# flows/export_orders_json.toml
id = "export_orders_json"
name = "Export Orders to JSON"
flow_type = "export_flow"

[properties.export]
source_dataset_id = "orders_staging"
file_path = "exports/orders.json"
export_mode = "full"
format = "json"
exporter_config = {
indent = 2
jsonl = false
encoding = "utf-8"
}

Export Operation Tracking

All export operations are automatically tracked in the metadata system. Each export operation is recorded in xt_export_operations with:

  • Export type and mode
  • Source dataset and destination path
  • Execution timing
  • Results (rows exported, success status)
  • Export configuration

You can query export history to monitor export operations:

-- Get recent export operations
SELECT
export_operation_id,
export_type,
source_dataset_id,
destination_path,
rows_exported,
status,
start_time
FROM xt_export_operations
ORDER BY start_time DESC
LIMIT 10;

For more information about accessing and using execution metadata, see the Metadata Tracking Guide.