Error Parking Flow
A comprehensive guide to using error parking flows in Qarion ETL to automatically park records that fail data quality checks while passing valid records to the normal target table.
Overview
Error parking flows enable you to automatically split incoming data based on data quality check results:
- Passed records: Records that pass all quality checks are written to the normal target table
- Failed records: Records that fail any quality check are written to an error/parking table with error metadata for review and correction
This pattern is essential for data pipelines where you want to:
- Ensure only high-quality data reaches production tables
- Capture and review data quality issues without blocking the pipeline
- Enable data correction workflows for failed records
Quick Start
Basic Error Parking Flow
# flows/orders_error_parking.toml
id = "orders_error_parking"
name = "Orders Error Parking"
flow_type = "error_parking"
namespace = "production"
[input]
primary_key = ["order_id"]
columns = [
{name = "order_id", schema_type = "integer", required = true},
{name = "customer_id", schema_type = "integer", required = true},
{name = "order_date", schema_type = "date", required = true},
{name = "total_amount", schema_type = "decimal", required = false}
]
[properties]
quality_checks = [
{
check_id = "completeness_check"
check_name = "Completeness Check"
check_type = "completeness"
columns = ["order_id", "customer_id", "order_date"]
severity = "error"
config = {threshold = 1.0}
},
{
check_id = "uniqueness_check"
check_name = "Uniqueness Check"
check_type = "uniqueness"
columns = ["order_id"]
severity = "error"
}
]
Flow Structure
Error parking flows create three datasets:
- Staging Dataset (
{flow_id}_staging): Landing table for initial data loading - Target Dataset (
{flow_id}_target): Normal target table for passed records - Error Parking Dataset (
{flow_id}_error_parking): Parking table for failed records with error metadata
Error Parking Table Schema
The error parking table includes all input columns plus error metadata:
xt_error_check_id: ID of the quality check that failedxt_error_check_name: Name of the quality check that failedxt_error_check_type: Type of quality check (completeness, uniqueness, etc.)xt_error_message: Error message describing the failurext_error_timestamp: Timestamp when the error was detectedxt_batch_id: Batch ID for tracking
Configuration
Quality Checks
Define quality checks in properties.quality_checks:
[properties]
quality_checks = [
{
check_id = "completeness_check"
check_name = "Completeness Check"
check_type = "completeness"
columns = ["id", "name", "email"]
severity = "error"
config = {
threshold = 1.0
allow_null = false
}
},
{
check_id = "uniqueness_check"
check_name = "Uniqueness Check"
check_type = "uniqueness"
columns = ["id"]
severity = "error"
},
{
check_id = "range_check"
check_name = "Amount Range Check"
check_type = "range"
columns = ["amount"]
severity = "error"
config = {
min_value = 0
max_value = 1000000
}
},
{
check_id = "pattern_check"
check_name = "Email Pattern Check"
check_type = "pattern"
columns = ["email"]
severity = "error"
config = {
pattern = "%@%.%"
}
}
]
Quality Suites
You can also use quality suites defined in a quality check repository:
[properties]
quality_suites = [
"orders_quality_suite" # Reference to suite in repository
]
Execution Flow
Error parking flows execute the following steps:
- Ingestion: Load data into staging table
- Quality Checks: Run all defined quality checks on staging data
- Split Records:
- Identify records that fail any quality check
- Split records into passed and failed sets
- Write Passed Records: Write passed records to normal target table
- Write Failed Records: Write failed records to error parking table with error metadata
Examples
Example 1: Basic Completeness and Uniqueness Checks
id = "customer_error_parking"
name = "Customer Error Parking"
flow_type = "error_parking"
namespace = "production"
[input]
primary_key = ["customer_id"]
columns = [
{name = "customer_id", schema_type = "integer", required = true},
{name = "name", schema_type = "string", required = true},
{name = "email", schema_type = "string", required = false},
{name = "phone", schema_type = "string", required = false}
]
[properties]
quality_checks = [
{
check_id = "completeness"
check_name = "Required Fields Completeness"
check_type = "completeness"
columns = ["customer_id", "name"]
severity = "error"
config = {threshold = 1.0}
},
{
check_id = "uniqueness"
check_name = "Customer ID Uniqueness"
check_type = "uniqueness"
columns = ["customer_id"]
severity = "error"
}
]
Example 2: Multiple Quality Checks with Range and Pattern
id = "transaction_error_parking"
name = "Transaction Error Parking"
flow_type = "error_parking"
namespace = "production"
[input]
primary_key = ["transaction_id"]
columns = [
{name = "transaction_id", schema_type = "string", required = true},
{name = "account_id", schema_type = "integer", required = true},
{name = "amount", schema_type = "decimal", required = true},
{name = "transaction_date", schema_type = "timestamp", required = true},
{name = "reference", schema_type = "string", required = false}
]
[properties]
quality_checks = [
{
check_id = "completeness"
check_name = "Required Fields"
check_type = "completeness"
columns = ["transaction_id", "account_id", "amount", "transaction_date"]
severity = "error"
config = {threshold = 1.0}
},
{
check_id = "uniqueness"
check_name = "Transaction ID Uniqueness"
check_type = "uniqueness"
columns = ["transaction_id"]
severity = "error"
},
{
check_id = "amount_range"
check_name = "Amount Range"
check_type = "range"
columns = ["amount"]
severity = "error"
config = {
min_value = 0.01
max_value = 1000000.00
}
},
{
check_id = "reference_pattern"
check_name = "Reference Pattern"
check_type = "pattern"
columns = ["reference"]
severity = "warning"
config = {
pattern = "REF-%"
}
}
]
Example 3: Using Quality Suites
id = "orders_error_parking"
name = "Orders Error Parking"
flow_type = "error_parking"
namespace = "production"
[input]
primary_key = ["order_id"]
columns = [
{name = "order_id", schema_type = "integer", required = true},
{name = "customer_id", schema_type = "integer", required = true},
{name = "order_date", schema_type = "date", required = true},
{name = "status", schema_type = "string", required = true},
{name = "total", schema_type = "decimal", required = true}
]
[properties]
# Use quality suite from repository
quality_suites = [
"orders_quality_suite"
]
Querying Error Parking Table
Find All Failed Records
SELECT *
FROM orders_error_parking_error_parking
ORDER BY xt_error_timestamp DESC;
Find Records by Check Type
SELECT *
FROM orders_error_parking_error_parking
WHERE xt_error_check_type = 'completeness';
Count Failed Records by Check
SELECT
xt_error_check_id,
xt_error_check_name,
COUNT(*) as failed_count
FROM orders_error_parking_error_parking
GROUP BY xt_error_check_id, xt_error_check_name
ORDER BY failed_count DESC;
Find Records That Failed Multiple Checks
SELECT
order_id,
customer_id,
COUNT(DISTINCT xt_error_check_id) as failed_checks_count,
STRING_AGG(xt_error_check_name, ', ') as failed_checks
FROM orders_error_parking_error_parking
GROUP BY order_id, customer_id
HAVING COUNT(DISTINCT xt_error_check_id) > 1;
Correcting and Reprocessing Failed Records
Step 1: Correct Records in Error Parking Table
-- Update records to fix issues
UPDATE orders_error_parking_error_parking
SET customer_id = 12345
WHERE customer_id IS NULL
AND xt_error_check_type = 'completeness';
Step 2: Extract Corrected Records
-- Extract corrected records (without error metadata columns)
SELECT
order_id,
customer_id,
order_date,
total
FROM orders_error_parking_error_parking
WHERE xt_error_check_id = 'completeness_check'
AND customer_id IS NOT NULL;
Step 3: Insert into Target Table
-- Insert corrected records into target table
INSERT INTO orders_error_parking_target
SELECT
order_id,
customer_id,
order_date,
total
FROM orders_error_parking_error_parking
WHERE xt_error_check_id = 'completeness_check'
AND customer_id IS NOT NULL;
Step 4: Remove from Error Parking Table
-- Remove processed records from error parking table
DELETE FROM orders_error_parking_error_parking
WHERE xt_error_check_id = 'completeness_check'
AND customer_id IS NOT NULL;
Best Practices
-
Define Clear Quality Rules:
- Use specific, measurable quality checks
- Set appropriate severity levels (error vs warning)
-
Monitor Error Parking Table:
- Set up alerts for high failure rates
- Regularly review and correct failed records
- Track error trends over time
-
Error Metadata:
- Use descriptive check names and IDs
- Include helpful error messages
- Track batch IDs for reprocessing
-
Performance:
- Index error parking table on primary key and error check ID
- Consider partitioning by batch_id for large volumes
- Archive old error records periodically
-
Data Correction Workflow:
- Establish a process for reviewing and correcting failed records
- Document common error patterns and fixes
- Automate reprocessing where possible
Troubleshooting
No Records in Target Table
- Check if quality checks are too strict
- Verify staging table has data
- Review quality check configuration
All Records Going to Error Parking
- Review quality check thresholds
- Check for data type mismatches
- Verify column names in quality checks match input columns
Performance Issues
- Ensure proper indexing on staging and error parking tables
- Consider batch size for large datasets
- Review quality check query performance