Skip to main content

Quality Check Flow and Quality Check Suites

Overview

The Quality Check Flow enables you to run data quality checks as a flow, providing a systematic way to validate data quality across datasets, queries, and data subsets. The flow introduces the concept of Quality Check Suites - reusable collections of quality checks that can be executed against various data sources.

What is a Quality Check Suite?

A Quality Check Suite is a reusable unit that defines:

  • A data source to check (dataset, query, subset, table, or view)
  • Multiple quality checks to execute against that source
  • Execution configuration (order, failure handling, etc.)

Suites allow you to:

  • Define quality checks once and reuse them
  • Run checks against different data sources
  • Group related checks together
  • Track quality check execution results

Quality Check Flow Structure

Flow Definition

id = "my_quality_check_flow"
name = "My Quality Check Flow"
flow_type = "quality_check"
namespace = "production"

[input]
columns = [] # Quality check flows don't require input columns

[properties]
quality_suites = [
# One or more quality check suites
]

Quality Check Suite Definition

[properties.quality_suites]
[[properties.quality_suites]]
id = "suite_id"
name = "Suite Name"
description = "Optional description"

[properties.quality_suites.data_source]
source_type = "dataset" # or "dataset_subset", "query", "table", "view"
dataset_id = "my_dataset" # For dataset/dataset_subset
table_name = "my_table" # For table/view
query = "SELECT * FROM ..." # For query
filters = {} # For dataset_subset

[[properties.quality_suites.checks]]
check_id = "check_1"
check_name = "Check Name"
check_type = "completeness"
columns = ["col1", "col2"]
severity = "error"
config = {}
enabled = true

execution_order = "sequential"
stop_on_first_failure = false
enabled = true

Data Source Types

1. Dataset Source

Run checks against a full dataset by ID.

data_source = {
source_type = "dataset"
dataset_id = "orders_landing"
description = "Full orders landing dataset"
}

Use when:

  • You want to check an entire dataset
  • The dataset is already defined in your project
  • You need to check all records in the dataset

2. Dataset Subset Source

Run checks against a filtered subset of a dataset.

data_source = {
source_type = "dataset_subset"
dataset_id = "orders_landing"
filters = {
status = "pending"
order_date = { min = "2024-01-01", max = "2024-12-31" }
amount = { min = 100 }
}
description = "Pending orders from 2024 with amount > 100"
}

Use when:

  • You want to check a specific subset of data
  • You need to validate data matching certain criteria
  • You want to focus quality checks on specific data segments

Filter Syntax:

  • Simple equality: field = "value"
  • Range: field = { min = 0, max = 100 }
  • Multiple conditions: All conditions must match (AND logic)

3. Query Source

Run checks against SQL query results.

data_source = {
source_type = "query"
query = """
SELECT
order_id,
customer_id,
order_date,
amount,
status
FROM orders
WHERE status = 'pending'
AND amount > 1000
ORDER BY order_date DESC
"""
description = "High-value pending orders"
}

Use when:

  • You need complex filtering or joins
  • You want to check aggregated data
  • You need to validate query results before using them
  • You're doing ad-hoc quality analysis

4. Table Source

Run checks against a direct table reference.

data_source = {
source_type = "table"
table_name = "orders_staging"
description = "Staging orders table"
}

Use when:

  • You want to check a table that's not a dataset
  • You're checking temporary or staging tables
  • The table exists in the database but isn't defined as a dataset

5. View Source

Run checks against a database view.

data_source = {
source_type = "view"
table_name = "orders_summary_view"
description = "Orders summary view"
}

Use when:

  • You want to check a database view
  • You're validating aggregated or transformed data in views
  • The view represents a specific data perspective

Quality Check Types

All quality check types are implemented using a plugin-based architecture. Each check type has a corresponding converter plugin that handles the conversion to transformation operations.

Completeness Check

Checks for missing/null values in specified columns. Implemented by CompletenessConverterPlugin.

{
check_id = "completeness_check"
check_name = "Completeness Check"
check_type = "completeness"
columns = ["order_id", "customer_id", "order_date"]
severity = "error"
config = {
threshold = 0.95
allow_null = false
description = "At least 95% of records must have non-null values"
}
enabled = true
}

Configuration:

  • threshold: Minimum percentage of non-null values required (0.0 to 1.0)
  • allow_null: Whether NULL values are allowed (default: false)
  • description: Human-readable description

Uniqueness Check

Verifies that specified columns have unique values. Implemented by UniquenessConverterPlugin.

{
check_id = "uniqueness_check"
check_name = "Uniqueness Check"
check_type = "uniqueness"
columns = ["order_id"]
severity = "error"
config = {
description = "Primary key must be unique"
}
enabled = true
}

Configuration:

  • description: Human-readable description

Note: The columns specified form a composite unique key. All columns together must be unique.

Range Check

Validates that numeric values are within expected ranges. Implemented by RangeConverterPlugin.

{
check_id = "range_check"
check_name = "Range Check"
check_type = "range"
columns = ["amount"]
severity = "warning"
config = {
min_value = 0
max_value = 1000000
description = "Amount must be between 0 and 1,000,000"
}
enabled = true
}

Configuration:

  • min_value: Minimum allowed value (inclusive, optional)
  • max_value: Maximum allowed value (inclusive, optional)
  • description: Human-readable description

Note: At least one of min_value or max_value must be specified.

Pattern Check

Validates data patterns using regex or other pattern matching. Implemented by PatternConverterPlugin.

{
check_id = "pattern_check"
check_name = "Email Pattern Check"
check_type = "pattern"
columns = ["email"]
severity = "error"
config = {
pattern = "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"
description = "Email must match standard email pattern"
}
enabled = true
}

Configuration:

  • pattern: Pattern to match (SQL LIKE pattern or regex, depending on engine)
  • description: Human-readable description

Note: Pattern matching capabilities depend on the database engine. SQLite uses LIKE patterns, while DuckDB supports regex.

Referential Integrity Check

Checks foreign key relationships. Implemented by ReferentialIntegrityConverterPlugin.

{
check_id = "referential_integrity"
check_name = "Referential Integrity Check"
check_type = "referential_integrity"
columns = ["customer_id"]
severity = "error"
config = {
foreign_key_table = "orders"
foreign_key_columns = ["customer_id"]
referenced_table = "customers"
referenced_columns = ["customer_id"]
join_condition = {} # Optional join condition
description = "All customer_id values must exist in customers table"
}
enabled = true
}

Configuration:

  • foreign_key_table: Table containing the foreign key
  • foreign_key_columns: Foreign key column(s)
  • referenced_table: Table being referenced
  • referenced_columns: Referenced column(s)
  • join_condition: Optional join condition dictionary
  • description: Human-readable description

Execution Configuration

Execution Order

Control how quality checks are executed:

Sequential (default):

execution_order = "sequential"
  • Checks run one after another
  • Each check waits for the previous to complete
  • Use when checks have dependencies

Parallel:

execution_order = "parallel"
  • Checks run simultaneously (if supported by engine)
  • Faster execution for independent checks
  • Use when checks are independent

Dependency Ordered:

execution_order = "dependency_ordered"
  • Checks run based on dependency relationships
  • Use when checks have explicit dependencies

Stop on First Failure

Control whether execution stops when a check fails:

stop_on_first_failure = true  # Stop on first failure
stop_on_first_failure = false # Continue even if checks fail

When to stop:

  • Critical quality checks that must pass
  • Early failure detection for performance
  • When subsequent checks depend on previous checks

When to continue:

  • Collecting comprehensive quality metrics
  • Non-critical checks
  • When you want to see all failures

Complete Example

# flows/orders_quality_check.toml
id = "orders_quality_check"
name = "Orders Quality Check Flow"
flow_type = "quality_check"
namespace = "production"
description = "Comprehensive quality checks for orders data"

[input]
columns = []

[properties]
quality_suites = [
{
id = "orders_completeness_suite"
name = "Orders Completeness Suite"
description = "Validates completeness of orders data"

data_source = {
source_type = "dataset"
dataset_id = "orders_landing"
}

checks = [
{
check_id = "orders_completeness"
check_name = "Orders Completeness Check"
check_type = "completeness"
columns = ["order_id", "customer_id", "order_date", "amount"]
severity = "error"
config = {
threshold = 0.95
description = "At least 95% of records must have non-null values"
}
enabled = true
},
{
check_id = "orders_uniqueness"
check_name = "Orders Uniqueness Check"
check_type = "uniqueness"
columns = ["order_id"]
severity = "error"
config = {
description = "Primary key must be unique"
}
enabled = true
},
{
check_id = "orders_amount_range"
check_name = "Orders Amount Range Check"
check_type = "range"
columns = ["amount"]
severity = "warning"
config = {
min_value = 0
max_value = 1000000
description = "Amount must be between 0 and 1,000,000"
}
enabled = true
}
]

execution_order = "sequential"
stop_on_first_failure = false
enabled = true
},
{
id = "high_value_orders_suite"
name = "High Value Orders Suite"
description = "Quality checks for high-value orders"

data_source = {
source_type = "query"
query = "SELECT * FROM orders WHERE amount > 10000"
description = "Orders with amount greater than 10,000"
}

checks = [
{
check_id = "high_value_completeness"
check_name = "High Value Orders Completeness"
check_type = "completeness"
columns = ["order_id", "customer_id", "amount", "status"]
severity = "error"
config = {
threshold = 1.0
description = "All high-value orders must have complete data"
}
enabled = true
}
]

execution_order = "sequential"
stop_on_first_failure = true
enabled = true
}
]

Quality Check Results

Quality check results are stored in a quality results dataset with the following structure:

  • suite_id: ID of the quality check suite
  • suite_name: Name of the quality check suite
  • check_id: ID of the quality check
  • check_name: Name of the quality check
  • check_type: Type of check (completeness, uniqueness, etc.)
  • table_name: Table that was checked
  • passed: Whether the check passed (true/false)
  • severity: Severity level (error, warning)
  • message: Result message
  • records_checked: Number of records checked
  • records_failed: Number of records that failed
  • execution_time_seconds: Time taken to execute
  • execution_timestamp: When the check was executed
  • details: Additional details (JSON string)

Best Practices

Suite Design

  1. Group Related Checks: Put related checks in the same suite
  2. Descriptive Names: Use clear, descriptive names for suites and checks
  3. Documentation: Add descriptions to explain what each suite validates
  4. Enable/Disable: Use the enabled flag to temporarily disable suites or checks

Data Source Selection

  1. Use Datasets: Prefer dataset sources when possible for consistency
  2. Use Queries: Use query sources for complex validation scenarios
  3. Use Subsets: Use subset sources to focus on specific data segments
  4. Document Sources: Always add descriptions to data sources

Check Configuration

  1. Appropriate Severity: Use error for critical checks, warning for informational
  2. Reasonable Thresholds: Set thresholds based on business requirements
  3. Clear Messages: Provide clear descriptions in check configurations
  4. Test Checks: Test checks with sample data before production

Execution Configuration

  1. Sequential for Dependencies: Use sequential order when checks depend on each other
  2. Parallel for Independence: Use parallel order for independent checks
  3. Stop on Critical Failures: Set stop_on_first_failure = true for critical checks
  4. Continue for Metrics: Set stop_on_first_failure = false to collect all metrics

Integration with Other Flows

Quality check flows can be integrated with other flow types:

After Transformation Flows

Run quality checks after transformations to validate results:

# Run quality check flow after change_feed flow
# This validates the change_feed output

Standalone Quality Monitoring

Run quality check flows independently for data quality monitoring:

# Schedule quality check flow to run daily
# Monitor data quality trends over time

Pipeline Integration

Include quality check flows in larger pipelines:

# Pipeline: Ingest → Transform → Quality Check → Export

Complete Configuration Examples

Example 1: Complete Quality Check Flow

Flow Definition:

# flows/orders_quality_check.toml
id = "orders_quality_check"
name = "Orders Quality Check Flow"
flow_type = "quality_check"
namespace = "production"
description = "Comprehensive quality checks for orders data"

[input]
columns = [] # Quality check flows don't require input columns

[properties]
quality_suites = [
{
id = "orders_completeness_suite"
name = "Orders Completeness Suite"
description = "Validates completeness of orders data"

data_source = {
source_type = "dataset"
dataset_id = "orders_landing"
description = "Full orders landing dataset"
}

checks = [
{
check_id = "orders_completeness"
check_name = "Orders Completeness Check"
check_type = "completeness"
columns = ["order_id", "customer_id", "order_date", "amount"]
severity = "error"
config = {
threshold = 0.95
allow_null = false
description = "At least 95% of records must have non-null values"
}
enabled = true
},
{
check_id = "orders_uniqueness"
check_name = "Orders Uniqueness Check"
check_type = "uniqueness"
columns = ["order_id"]
severity = "error"
config = {
description = "Primary key must be unique"
}
enabled = true
},
{
check_id = "orders_amount_range"
check_name = "Orders Amount Range Check"
check_type = "range"
columns = ["amount"]
severity = "warning"
config = {
min_value = 0
max_value = 1000000
description = "Amount must be between 0 and 1,000,000"
}
enabled = true
}
]

execution_order = "sequential"
stop_on_first_failure = false
enabled = true
}
]

Example 2: Multiple Suites with Different Data Sources

Flow Definition:

# flows/comprehensive_quality_check.toml
id = "comprehensive_quality_check"
name = "Comprehensive Quality Check"
flow_type = "quality_check"

[properties]
quality_suites = [
# Suite 1: Full dataset check
{
id = "full_dataset_suite"
name = "Full Dataset Suite"
data_source = {
source_type = "dataset"
dataset_id = "orders_staging"
}
checks = [
{
check_id = "completeness"
check_type = "completeness"
columns = ["order_id", "customer_id", "amount"]
severity = "error"
config = { threshold = 0.95 }
enabled = true
}
]
execution_order = "sequential"
stop_on_first_failure = false
enabled = true
},

# Suite 2: Subset check
{
id = "pending_orders_suite"
name = "Pending Orders Suite"
data_source = {
source_type = "dataset_subset"
dataset_id = "orders_staging"
filters = {
status = "pending"
amount = { min = 1000 }
}
}
checks = [
{
check_id = "pending_completeness"
check_type = "completeness"
columns = ["order_id", "customer_id", "amount", "status"]
severity = "error"
config = { threshold = 1.0 }
enabled = true
}
]
execution_order = "sequential"
stop_on_first_failure = true
enabled = true
},

# Suite 3: Query-based check
{
id = "high_value_orders_suite"
name = "High Value Orders Suite"
data_source = {
source_type = "query"
query = """
SELECT
order_id,
customer_id,
amount,
order_date
FROM orders_staging
WHERE amount > 10000
ORDER BY amount DESC
"""
}
checks = [
{
check_id = "high_value_completeness"
check_type = "completeness"
columns = ["order_id", "customer_id", "amount"]
severity = "error"
config = { threshold = 1.0 }
enabled = true
},
{
check_id = "high_value_range"
check_type = "range"
columns = ["amount"]
severity = "error"
config = {
min_value = 10000
max_value = 10000000
}
enabled = true
}
]
execution_order = "sequential"
stop_on_first_failure = true
enabled = true
}
]

Quick Reference

Creating a Quality Check Flow

  1. Create Flow Definition:
qarion-etl new-flow
# Select 'quality_check' as flow type
  1. Define Quality Suites:
# flows/my_quality_check.toml
id = "my_quality_check"
flow_type = "quality_check"

[properties]
quality_suites = [
{
id = "my_suite"
name = "My Suite"
data_source = { source_type = "dataset", dataset_id = "my_dataset" }
checks = [
{
check_id = "completeness_check"
check_type = "completeness"
columns = ["id", "name"]
severity = "error"
config = { threshold = 0.95 }
enabled = true
}
]
execution_order = "sequential"
stop_on_first_failure = false
enabled = true
}
]
  1. Run Quality Check Flow:
qarion-etl run --flow my_quality_check

Common Patterns

Pattern 1: Dataset Quality Monitoring

data_source = { source_type = "dataset", dataset_id = "production_table" }

Pattern 2: Subset Validation

data_source = {
source_type = "dataset_subset"
dataset_id = "orders"
filters = { status = "pending" }
}

Pattern 3: Query Result Validation

data_source = {
source_type = "query"
query = "SELECT * FROM orders WHERE amount > 1000"
}

Pattern 4: Multiple Suites in One Flow

quality_suites = [
{ id = "suite1", ... },
{ id = "suite2", ... },
{ id = "suite3", ... }
]

Query Consolidation and Optimization

The quality check system automatically consolidates and optimizes queries for better performance:

Automatic Consolidation

Multiple compatible checks on the same table are automatically consolidated into a single query:

  • Completeness checks on the same table are consolidated
  • Range and Pattern checks can be consolidated with Completeness checks
  • Uniqueness checks are typically kept separate

This reduces database round-trips and improves execution performance.

Optimization

The system uses multi-step optimization to:

  • Combine compatible check types into efficient queries
  • Optimize transformation instructions
  • Generate optimized SQL queries