Skip to main content

Export Flow

Overview

The export flow (export_flow) is a specialized flow type designed to export data from existing datasets (like change_feed or delta_publishing) to external destinations with support for different export modes.

Key Features

  • Multiple Export Modes: Full export, batch-specific export, incremental export, or changes-only export
  • Integration with Change Feed: Export only new/changed records from change_feed datasets
  • Integration with Delta Publishing: Export delta transactions
  • Flexible Configuration: Support for different file formats (CSV, JSON, Parquet)
  • Query-Based Filtering: Automatic query generation based on export mode and dataset type

Export Modes

1. Full Export (export_mode: "full")

Exports all data from the source dataset.

Use When:

  • Initial data export
  • Complete dataset snapshot
  • Full refresh scenarios

Example:

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders_full.csv"
export_mode = "full"
format = "csv"

2. Batch Export (export_mode: "batch")

Exports only data from a specific batch_id.

Use When:

  • Exporting specific batch data
  • Reprocessing a particular batch
  • Batch-level data sharing

Example:

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders_batch_{batch_id}.csv"
export_mode = "batch"
batch_id = 42
format = "csv"

3. Incremental Export (export_mode: "incremental")

Exports only new and changed records.

For Change Feed Datasets:

  • Exports records where xt_record_type IN ('new', 'changed')

For Other Datasets:

  • Exports records from the latest batch

Use When:

  • Daily incremental exports
  • Syncing only new/changed data
  • Reducing export volume

Example:

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders_incremental_{batch_id}.csv"
export_mode = "incremental"
format = "csv"

4. Changes Only Export (export_mode: "changes_only")

Exports only changed records (new, changed, deleted).

For Change Feed Datasets:

  • Exports records where xt_record_type IN ('new', 'changed', 'deleted')

For Delta Transaction Datasets:

  • Exports all records (they're all changes by definition)

Use When:

  • Exporting only data that changed
  • Minimizing export size
  • Change data capture (CDC) scenarios

Example:

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders_changes_{batch_id}.csv"
export_mode = "changes_only"
format = "csv"

Flow Definition

Basic Export Flow

id = "export_orders"
name = "Export Orders"
flow_type = "export_flow"
namespace = "public"

[input]
# Input is not used for export flows
columns = []

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders.csv"
export_mode = "full"
format = "csv"

Export from Change Feed

Export only new and changed records from a change_feed dataset:

id = "export_orders_changes"
name = "Export Order Changes"
flow_type = "export_flow"
namespace = "public"

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders_changes.csv"
export_mode = "incremental"
format = "csv"
exporter_config = {
delimiter = ","
include_header = true
}

Export Specific Batch

Export data from a specific batch:

id = "export_orders_batch_42"
name = "Export Orders Batch 42"
flow_type = "export_flow"
namespace = "public"

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders_batch_42.csv"
export_mode = "batch"
batch_id = 42
format = "csv"

Export Delta Transactions

Export delta transactions from a delta_publishing flow:

id = "export_delta_transactions"
name = "Export Delta Transactions"
flow_type = "export_flow"
namespace = "public"

[properties.export]
source_dataset_id = "delta_transaction_orders"
file_path = "exports/delta_transactions.parquet"
export_mode = "full"
format = "parquet"
exporter_config = {
compression = "snappy"
}

Integration with Other Flows

Export from Change Feed Flow

  1. Create Change Feed Flow:
id = "orders_change_feed"
name = "Orders Change Feed"
flow_type = "change_feed"
namespace = "public"

[input]
columns = [
{ name = "order_id", type = "integer" },
{ name = "customer_id", type = "integer" },
{ name = "total", type = "decimal" }
]
primary_key = "order_id"
  1. Create Export Flow:
id = "export_orders_changes"
name = "Export Order Changes"
flow_type = "export_flow"
namespace = "public"

[properties.export]
source_dataset_id = "orders_change_feed_change_feed" # Change feed dataset ID
file_path = "exports/orders_changes_{batch_id}.csv"
export_mode = "incremental"
format = "csv"

Export from Delta Publishing Flow

  1. Create Delta Publishing Flow:
id = "orders_delta_publishing"
name = "Orders Delta Publishing"
flow_type = "delta_publishing"
namespace = "public"

[input]
columns = [
{ name = "order_id", type = "integer" },
{ name = "customer_id", type = "integer" },
{ name = "total", type = "decimal" }
]
primary_key = "order_id"
  1. Create Export Flow:
id = "export_delta_transactions"
name = "Export Delta Transactions"
flow_type = "export_flow"
namespace = "public"

[properties.export]
source_dataset_id = "orders_delta_publishing_delta_transaction" # Delta transaction dataset ID
file_path = "exports/delta_transactions.parquet"
export_mode = "full"
format = "parquet"

Configuration Options

Required Fields

  • source_dataset_id: ID of the source dataset to export from
  • file_path or destination: Path to the export file

Optional Fields

  • export_mode: Export mode (full, batch, incremental, changes_only) - default: full
  • batch_id: Batch ID for batch mode exports
  • format: Export format (csv, json, parquet) - auto-detected from file extension if not specified
  • exporter_config: Format-specific configuration (delimiter, encoding, compression, etc.)

Exporter Configuration

CSV Configuration:

# flows/export_orders_csv.toml
[properties.export]
source_dataset_id = "orders_staging"
file_path = "exports/orders.csv"
export_mode = "full"
format = "csv"
exporter_config = {
delimiter = ","
include_header = true
encoding = "utf-8"
columns = ["order_id", "customer_id", "total"]
}

JSON Configuration:

# flows/export_orders_json.toml
[properties.export]
source_dataset_id = "orders_staging"
file_path = "exports/orders.json"
export_mode = "full"
format = "json"
exporter_config = {
indent = 2
jsonl = false
ensure_ascii = false
encoding = "utf-8"
}

JSONL Configuration:

# flows/export_orders_jsonl.toml
[properties.export]
source_dataset_id = "orders_staging"
file_path = "exports/orders.jsonl"
export_mode = "full"
format = "json"
exporter_config = {
jsonl = true
encoding = "utf-8"
}

Parquet Configuration:

# flows/export_orders_parquet.toml
[properties.export]
source_dataset_id = "orders_staging"
file_path = "exports/orders.parquet"
export_mode = "full"
format = "parquet"
exporter_config = {
compression = "snappy"
columns = ["order_id", "customer_id", "total", "order_date"]
index = false
}

Complete Configuration Examples

Example 1: Daily Incremental Export

Export only new and changed orders daily:

Flow Definition:

# flows/daily_orders_export.toml
id = "daily_orders_export"
name = "Daily Orders Export"
flow_type = "export_flow"
namespace = "exports"

[input]
columns = [] # Export flows don't require input columns

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders_incremental_{batch_id}.csv"
export_mode = "incremental"
format = "csv"
exporter_config = {
delimiter = ","
include_header = true
encoding = "utf-8"
}

# Optional: Schedule trigger
[[triggers]]
id = "daily_export_trigger"
type = "schedule"
schedule = "0 1 * * *" # Daily at 1 AM
timezone = "UTC"
enabled = true
description = "Daily incremental export"

Example 2: Complete Export Flow with All Options

Full Configuration:

# flows/export_orders_complete.toml
id = "export_orders_complete"
name = "Complete Orders Export"
flow_type = "export_flow"
namespace = "exports"
description = "Export orders with comprehensive configuration"

[input]
columns = []

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/orders_{batch_id}_{ds_nodash}.csv"
export_mode = "incremental"
format = "csv"
exporter_config = {
delimiter = ","
include_header = true
encoding = "utf-8-sig" # Excel-compatible encoding
columns = ["order_id", "customer_id", "amount", "order_date", "status"]
}

[[triggers]]
id = "daily_export"
type = "schedule"
schedule = "0 2 * * *"
timezone = "UTC"
enabled = true

Usage Examples

Example 1: Daily Incremental Export (Basic)

Export only new and changed orders daily:

id = "daily_orders_export"
name = "Daily Orders Export"
flow_type = "export_flow"
namespace = "public"

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/daily/orders_{batch_id}.csv"
export_mode = "incremental"
format = "csv"
exporter_config = {
delimiter = ","
include_header = true
}

Example 2: Export Changes Only to Parquet

Export only changed records to Parquet for analytical processing:

id = "export_changes_parquet"
name = "Export Changes to Parquet"
flow_type = "export_flow"
namespace = "public"

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/changes/changes_{batch_id}.parquet"
export_mode = "changes_only"
format = "parquet"
exporter_config = {
compression = "snappy"
}

Example 3: Export Specific Batch for Reprocessing

Export a specific batch for reprocessing:

id = "export_batch_42"
name = "Export Batch 42"
flow_type = "export_flow"
namespace = "public"

[properties.export]
source_dataset_id = "change_feed_orders"
file_path = "exports/reprocess/batch_42.csv"
export_mode = "batch"
batch_id = 42
format = "csv"

Query Generation

The export flow automatically generates SQL queries based on the export mode and source dataset type:

Change Feed Datasets

  • incremental: WHERE xt_record_type IN ('new', 'changed')
  • changes_only: WHERE xt_record_type IN ('new', 'changed', 'deleted')
  • batch: WHERE xt_batch_id = {batch_id}

Delta Transaction Datasets

  • full: No filter (all records are changes)
  • changes_only: No filter (all records are changes)
  • batch: WHERE xt_batch_id = {batch_id}

Other Datasets

  • incremental: WHERE xt_batch_id = (SELECT MAX(xt_batch_id) FROM {table})
  • batch: WHERE xt_batch_id = {batch_id}

Best Practices

  1. Use Incremental Mode for Regular Exports: Reduces export volume and processing time
  2. Use Changes Only for CDC: When you only need to sync changed data
  3. Use Batch Mode for Reprocessing: When you need to re-export specific batches
  4. Choose Appropriate Formats: CSV for Excel compatibility, Parquet for analytics, JSON for APIs
  5. Use Template Variables: Use {batch_id} in file paths for batch-specific exports