Skip to main content

Error Parking Flow

A comprehensive guide to using error parking flows in Qarion ETL to automatically park records that fail data quality checks while passing valid records to the normal target table.

Overview

Error parking flows enable you to automatically split incoming data based on data quality check results:

  • Passed records: Records that pass all quality checks are written to the normal target table
  • Failed records: Records that fail any quality check are written to an error/parking table with error metadata for review and correction

This pattern is essential for data pipelines where you want to:

  • Ensure only high-quality data reaches production tables
  • Capture and review data quality issues without blocking the pipeline
  • Enable data correction workflows for failed records

Quick Start

Basic Error Parking Flow

# flows/orders_error_parking.toml
id = "orders_error_parking"
name = "Orders Error Parking"
flow_type = "error_parking"
namespace = "production"

[input]
primary_key = ["order_id"]
columns = [
{name = "order_id", schema_type = "integer", required = true},
{name = "customer_id", schema_type = "integer", required = true},
{name = "order_date", schema_type = "date", required = true},
{name = "total_amount", schema_type = "decimal", required = false}
]

[properties]
quality_checks = [
{
check_id = "completeness_check"
check_name = "Completeness Check"
check_type = "completeness"
columns = ["order_id", "customer_id", "order_date"]
severity = "error"
config = {threshold = 1.0}
},
{
check_id = "uniqueness_check"
check_name = "Uniqueness Check"
check_type = "uniqueness"
columns = ["order_id"]
severity = "error"
}
]

Flow Structure

Error parking flows create three datasets:

  1. Staging Dataset ({flow_id}_staging): Landing table for initial data loading
  2. Target Dataset ({flow_id}_target): Normal target table for passed records
  3. Error Parking Dataset ({flow_id}_error_parking): Parking table for failed records with error metadata

Error Parking Table Schema

The error parking table includes all input columns plus error metadata:

  • xt_error_check_id: ID of the quality check that failed
  • xt_error_check_name: Name of the quality check that failed
  • xt_error_check_type: Type of quality check (completeness, uniqueness, etc.)
  • xt_error_message: Error message describing the failure
  • xt_error_timestamp: Timestamp when the error was detected
  • xt_batch_id: Batch ID for tracking

Configuration

Quality Checks

Define quality checks in properties.quality_checks:

[properties]
quality_checks = [
{
check_id = "completeness_check"
check_name = "Completeness Check"
check_type = "completeness"
columns = ["id", "name", "email"]
severity = "error"
config = {
threshold = 1.0
allow_null = false
}
},
{
check_id = "uniqueness_check"
check_name = "Uniqueness Check"
check_type = "uniqueness"
columns = ["id"]
severity = "error"
},
{
check_id = "range_check"
check_name = "Amount Range Check"
check_type = "range"
columns = ["amount"]
severity = "error"
config = {
min_value = 0
max_value = 1000000
}
},
{
check_id = "pattern_check"
check_name = "Email Pattern Check"
check_type = "pattern"
columns = ["email"]
severity = "error"
config = {
pattern = "%@%.%"
}
}
]

Quality Suites

You can also use quality suites defined in a quality check repository:

[properties]
quality_suites = [
"orders_quality_suite" # Reference to suite in repository
]

Execution Flow

Error parking flows execute the following steps:

  1. Ingestion: Load data into staging table
  2. Quality Checks: Run all defined quality checks on staging data
  3. Split Records:
    • Identify records that fail any quality check
    • Split records into passed and failed sets
  4. Write Passed Records: Write passed records to normal target table
  5. Write Failed Records: Write failed records to error parking table with error metadata

Examples

Example 1: Basic Completeness and Uniqueness Checks

id = "customer_error_parking"
name = "Customer Error Parking"
flow_type = "error_parking"
namespace = "production"

[input]
primary_key = ["customer_id"]
columns = [
{name = "customer_id", schema_type = "integer", required = true},
{name = "name", schema_type = "string", required = true},
{name = "email", schema_type = "string", required = false},
{name = "phone", schema_type = "string", required = false}
]

[properties]
quality_checks = [
{
check_id = "completeness"
check_name = "Required Fields Completeness"
check_type = "completeness"
columns = ["customer_id", "name"]
severity = "error"
config = {threshold = 1.0}
},
{
check_id = "uniqueness"
check_name = "Customer ID Uniqueness"
check_type = "uniqueness"
columns = ["customer_id"]
severity = "error"
}
]

Example 2: Multiple Quality Checks with Range and Pattern

id = "transaction_error_parking"
name = "Transaction Error Parking"
flow_type = "error_parking"
namespace = "production"

[input]
primary_key = ["transaction_id"]
columns = [
{name = "transaction_id", schema_type = "string", required = true},
{name = "account_id", schema_type = "integer", required = true},
{name = "amount", schema_type = "decimal", required = true},
{name = "transaction_date", schema_type = "timestamp", required = true},
{name = "reference", schema_type = "string", required = false}
]

[properties]
quality_checks = [
{
check_id = "completeness"
check_name = "Required Fields"
check_type = "completeness"
columns = ["transaction_id", "account_id", "amount", "transaction_date"]
severity = "error"
config = {threshold = 1.0}
},
{
check_id = "uniqueness"
check_name = "Transaction ID Uniqueness"
check_type = "uniqueness"
columns = ["transaction_id"]
severity = "error"
},
{
check_id = "amount_range"
check_name = "Amount Range"
check_type = "range"
columns = ["amount"]
severity = "error"
config = {
min_value = 0.01
max_value = 1000000.00
}
},
{
check_id = "reference_pattern"
check_name = "Reference Pattern"
check_type = "pattern"
columns = ["reference"]
severity = "warning"
config = {
pattern = "REF-%"
}
}
]

Example 3: Using Quality Suites

id = "orders_error_parking"
name = "Orders Error Parking"
flow_type = "error_parking"
namespace = "production"

[input]
primary_key = ["order_id"]
columns = [
{name = "order_id", schema_type = "integer", required = true},
{name = "customer_id", schema_type = "integer", required = true},
{name = "order_date", schema_type = "date", required = true},
{name = "status", schema_type = "string", required = true},
{name = "total", schema_type = "decimal", required = true}
]

[properties]
# Use quality suite from repository
quality_suites = [
"orders_quality_suite"
]

Querying Error Parking Table

Find All Failed Records

SELECT *
FROM orders_error_parking_error_parking
ORDER BY xt_error_timestamp DESC;

Find Records by Check Type

SELECT *
FROM orders_error_parking_error_parking
WHERE xt_error_check_type = 'completeness';

Count Failed Records by Check

SELECT
xt_error_check_id,
xt_error_check_name,
COUNT(*) as failed_count
FROM orders_error_parking_error_parking
GROUP BY xt_error_check_id, xt_error_check_name
ORDER BY failed_count DESC;

Find Records That Failed Multiple Checks

SELECT
order_id,
customer_id,
COUNT(DISTINCT xt_error_check_id) as failed_checks_count,
STRING_AGG(xt_error_check_name, ', ') as failed_checks
FROM orders_error_parking_error_parking
GROUP BY order_id, customer_id
HAVING COUNT(DISTINCT xt_error_check_id) > 1;

Correcting and Reprocessing Failed Records

Step 1: Correct Records in Error Parking Table

-- Update records to fix issues
UPDATE orders_error_parking_error_parking
SET customer_id = 12345
WHERE customer_id IS NULL
AND xt_error_check_type = 'completeness';

Step 2: Extract Corrected Records

-- Extract corrected records (without error metadata columns)
SELECT
order_id,
customer_id,
order_date,
total
FROM orders_error_parking_error_parking
WHERE xt_error_check_id = 'completeness_check'
AND customer_id IS NOT NULL;

Step 3: Insert into Target Table

-- Insert corrected records into target table
INSERT INTO orders_error_parking_target
SELECT
order_id,
customer_id,
order_date,
total
FROM orders_error_parking_error_parking
WHERE xt_error_check_id = 'completeness_check'
AND customer_id IS NOT NULL;

Step 4: Remove from Error Parking Table

-- Remove processed records from error parking table
DELETE FROM orders_error_parking_error_parking
WHERE xt_error_check_id = 'completeness_check'
AND customer_id IS NOT NULL;

Best Practices

  1. Define Clear Quality Rules:

    • Use specific, measurable quality checks
    • Set appropriate severity levels (error vs warning)
  2. Monitor Error Parking Table:

    • Set up alerts for high failure rates
    • Regularly review and correct failed records
    • Track error trends over time
  3. Error Metadata:

    • Use descriptive check names and IDs
    • Include helpful error messages
    • Track batch IDs for reprocessing
  4. Performance:

    • Index error parking table on primary key and error check ID
    • Consider partitioning by batch_id for large volumes
    • Archive old error records periodically
  5. Data Correction Workflow:

    • Establish a process for reviewing and correcting failed records
    • Document common error patterns and fixes
    • Automate reprocessing where possible

Troubleshooting

No Records in Target Table

  • Check if quality checks are too strict
  • Verify staging table has data
  • Review quality check configuration

All Records Going to Error Parking

  • Review quality check thresholds
  • Check for data type mismatches
  • Verify column names in quality checks match input columns

Performance Issues

  • Ensure proper indexing on staging and error parking tables
  • Consider batch size for large datasets
  • Review quality check query performance