Quality Check Flow and Quality Check Suites
Overview
The Quality Check Flow enables you to run data quality checks as a flow, providing a systematic way to validate data quality across datasets, queries, and data subsets. The flow introduces the concept of Quality Check Suites - reusable collections of quality checks that can be executed against various data sources.
What is a Quality Check Suite?
A Quality Check Suite is a reusable unit that defines:
- A data source to check (dataset, query, subset, table, or view)
- Multiple quality checks to execute against that source
- Execution configuration (order, failure handling, etc.)
Suites allow you to:
- Define quality checks once and reuse them
- Run checks against different data sources
- Group related checks together
- Track quality check execution results
Quality Check Flow Structure
Flow Definition
id = "my_quality_check_flow"
name = "My Quality Check Flow"
flow_type = "quality_check"
namespace = "production"
[input]
columns = [] # Quality check flows don't require input columns
[properties]
quality_suites = [
# One or more quality check suites
]
Quality Check Suite Definition
[properties.quality_suites]
[[properties.quality_suites]]
id = "suite_id"
name = "Suite Name"
description = "Optional description"
[properties.quality_suites.data_source]
source_type = "dataset" # or "dataset_subset", "query", "table", "view"
dataset_id = "my_dataset" # For dataset/dataset_subset
table_name = "my_table" # For table/view
query = "SELECT * FROM ..." # For query
filters = {} # For dataset_subset
[[properties.quality_suites.checks]]
check_id = "check_1"
check_name = "Check Name"
check_type = "completeness"
columns = ["col1", "col2"]
severity = "error"
config = {}
enabled = true
execution_order = "sequential"
stop_on_first_failure = false
enabled = true
Data Source Types
1. Dataset Source
Run checks against a full dataset by ID.
data_source = {
source_type = "dataset"
dataset_id = "orders_landing"
description = "Full orders landing dataset"
}
Use when:
- You want to check an entire dataset
- The dataset is already defined in your project
- You need to check all records in the dataset
2. Dataset Subset Source
Run checks against a filtered subset of a dataset.
data_source = {
source_type = "dataset_subset"
dataset_id = "orders_landing"
filters = {
status = "pending"
order_date = { min = "2024-01-01", max = "2024-12-31" }
amount = { min = 100 }
}
description = "Pending orders from 2024 with amount > 100"
}
Use when:
- You want to check a specific subset of data
- You need to validate data matching certain criteria
- You want to focus quality checks on specific data segments
Filter Syntax:
- Simple equality:
field = "value" - Range:
field = { min = 0, max = 100 } - Multiple conditions: All conditions must match (AND logic)
3. Query Source
Run checks against SQL query results.
data_source = {
source_type = "query"
query = """
SELECT
order_id,
customer_id,
order_date,
amount,
status
FROM orders
WHERE status = 'pending'
AND amount > 1000
ORDER BY order_date DESC
"""
description = "High-value pending orders"
}
Use when:
- You need complex filtering or joins
- You want to check aggregated data
- You need to validate query results before using them
- You're doing ad-hoc quality analysis
4. Table Source
Run checks against a direct table reference.
data_source = {
source_type = "table"
table_name = "orders_staging"
description = "Staging orders table"
}
Use when:
- You want to check a table that's not a dataset
- You're checking temporary or staging tables
- The table exists in the database but isn't defined as a dataset
5. View Source
Run checks against a database view.
data_source = {
source_type = "view"
table_name = "orders_summary_view"
description = "Orders summary view"
}
Use when:
- You want to check a database view
- You're validating aggregated or transformed data in views
- The view represents a specific data perspective
Quality Check Types
All quality check types are implemented using a plugin-based architecture. Each check type has a corresponding converter plugin that handles the conversion to transformation operations.
Completeness Check
Checks for missing/null values in specified columns. Implemented by CompletenessConverterPlugin.
{
check_id = "completeness_check"
check_name = "Completeness Check"
check_type = "completeness"
columns = ["order_id", "customer_id", "order_date"]
severity = "error"
config = {
threshold = 0.95
allow_null = false
description = "At least 95% of records must have non-null values"
}
enabled = true
}
Configuration:
threshold: Minimum percentage of non-null values required (0.0 to 1.0)allow_null: Whether NULL values are allowed (default: false)description: Human-readable description
Uniqueness Check
Verifies that specified columns have unique values. Implemented by UniquenessConverterPlugin.
{
check_id = "uniqueness_check"
check_name = "Uniqueness Check"
check_type = "uniqueness"
columns = ["order_id"]
severity = "error"
config = {
description = "Primary key must be unique"
}
enabled = true
}
Configuration:
description: Human-readable description
Note: The columns specified form a composite unique key. All columns together must be unique.
Range Check
Validates that numeric values are within expected ranges. Implemented by RangeConverterPlugin.
{
check_id = "range_check"
check_name = "Range Check"
check_type = "range"
columns = ["amount"]
severity = "warning"
config = {
min_value = 0
max_value = 1000000
description = "Amount must be between 0 and 1,000,000"
}
enabled = true
}
Configuration:
min_value: Minimum allowed value (inclusive, optional)max_value: Maximum allowed value (inclusive, optional)description: Human-readable description
Note: At least one of min_value or max_value must be specified.
Pattern Check
Validates data patterns using regex or other pattern matching. Implemented by PatternConverterPlugin.
{
check_id = "pattern_check"
check_name = "Email Pattern Check"
check_type = "pattern"
columns = ["email"]
severity = "error"
config = {
pattern = "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"
description = "Email must match standard email pattern"
}
enabled = true
}
Configuration:
pattern: Pattern to match (SQL LIKE pattern or regex, depending on engine)description: Human-readable description
Note: Pattern matching capabilities depend on the database engine. SQLite uses LIKE patterns, while DuckDB supports regex.
Referential Integrity Check
Checks foreign key relationships. Implemented by ReferentialIntegrityConverterPlugin.
{
check_id = "referential_integrity"
check_name = "Referential Integrity Check"
check_type = "referential_integrity"
columns = ["customer_id"]
severity = "error"
config = {
foreign_key_table = "orders"
foreign_key_columns = ["customer_id"]
referenced_table = "customers"
referenced_columns = ["customer_id"]
join_condition = {} # Optional join condition
description = "All customer_id values must exist in customers table"
}
enabled = true
}
Configuration:
foreign_key_table: Table containing the foreign keyforeign_key_columns: Foreign key column(s)referenced_table: Table being referencedreferenced_columns: Referenced column(s)join_condition: Optional join condition dictionarydescription: Human-readable description
Execution Configuration
Execution Order
Control how quality checks are executed:
Sequential (default):
execution_order = "sequential"
- Checks run one after another
- Each check waits for the previous to complete
- Use when checks have dependencies
Parallel:
execution_order = "parallel"
- Checks run simultaneously (if supported by engine)
- Faster execution for independent checks
- Use when checks are independent
Dependency Ordered:
execution_order = "dependency_ordered"
- Checks run based on dependency relationships
- Use when checks have explicit dependencies
Stop on First Failure
Control whether execution stops when a check fails:
stop_on_first_failure = true # Stop on first failure
stop_on_first_failure = false # Continue even if checks fail
When to stop:
- Critical quality checks that must pass
- Early failure detection for performance
- When subsequent checks depend on previous checks
When to continue:
- Collecting comprehensive quality metrics
- Non-critical checks
- When you want to see all failures
Complete Example
# flows/orders_quality_check.toml
id = "orders_quality_check"
name = "Orders Quality Check Flow"
flow_type = "quality_check"
namespace = "production"
description = "Comprehensive quality checks for orders data"
[input]
columns = []
[properties]
quality_suites = [
{
id = "orders_completeness_suite"
name = "Orders Completeness Suite"
description = "Validates completeness of orders data"
data_source = {
source_type = "dataset"
dataset_id = "orders_landing"
}
checks = [
{
check_id = "orders_completeness"
check_name = "Orders Completeness Check"
check_type = "completeness"
columns = ["order_id", "customer_id", "order_date", "amount"]
severity = "error"
config = {
threshold = 0.95
description = "At least 95% of records must have non-null values"
}
enabled = true
},
{
check_id = "orders_uniqueness"
check_name = "Orders Uniqueness Check"
check_type = "uniqueness"
columns = ["order_id"]
severity = "error"
config = {
description = "Primary key must be unique"
}
enabled = true
},
{
check_id = "orders_amount_range"
check_name = "Orders Amount Range Check"
check_type = "range"
columns = ["amount"]
severity = "warning"
config = {
min_value = 0
max_value = 1000000
description = "Amount must be between 0 and 1,000,000"
}
enabled = true
}
]
execution_order = "sequential"
stop_on_first_failure = false
enabled = true
},
{
id = "high_value_orders_suite"
name = "High Value Orders Suite"
description = "Quality checks for high-value orders"
data_source = {
source_type = "query"
query = "SELECT * FROM orders WHERE amount > 10000"
description = "Orders with amount greater than 10,000"
}
checks = [
{
check_id = "high_value_completeness"
check_name = "High Value Orders Completeness"
check_type = "completeness"
columns = ["order_id", "customer_id", "amount", "status"]
severity = "error"
config = {
threshold = 1.0
description = "All high-value orders must have complete data"
}
enabled = true
}
]
execution_order = "sequential"
stop_on_first_failure = true
enabled = true
}
]
Quality Check Results
Quality check results are stored in a quality results dataset with the following structure:
suite_id: ID of the quality check suitesuite_name: Name of the quality check suitecheck_id: ID of the quality checkcheck_name: Name of the quality checkcheck_type: Type of check (completeness, uniqueness, etc.)table_name: Table that was checkedpassed: Whether the check passed (true/false)severity: Severity level (error, warning)message: Result messagerecords_checked: Number of records checkedrecords_failed: Number of records that failedexecution_time_seconds: Time taken to executeexecution_timestamp: When the check was executeddetails: Additional details (JSON string)
Best Practices
Suite Design
- Group Related Checks: Put related checks in the same suite
- Descriptive Names: Use clear, descriptive names for suites and checks
- Documentation: Add descriptions to explain what each suite validates
- Enable/Disable: Use the
enabledflag to temporarily disable suites or checks
Data Source Selection
- Use Datasets: Prefer dataset sources when possible for consistency
- Use Queries: Use query sources for complex validation scenarios
- Use Subsets: Use subset sources to focus on specific data segments
- Document Sources: Always add descriptions to data sources
Check Configuration
- Appropriate Severity: Use
errorfor critical checks,warningfor informational - Reasonable Thresholds: Set thresholds based on business requirements
- Clear Messages: Provide clear descriptions in check configurations
- Test Checks: Test checks with sample data before production
Execution Configuration
- Sequential for Dependencies: Use sequential order when checks depend on each other
- Parallel for Independence: Use parallel order for independent checks
- Stop on Critical Failures: Set
stop_on_first_failure = truefor critical checks - Continue for Metrics: Set
stop_on_first_failure = falseto collect all metrics
Integration with Other Flows
Quality check flows can be integrated with other flow types:
After Transformation Flows
Run quality checks after transformations to validate results:
# Run quality check flow after change_feed flow
# This validates the change_feed output
Standalone Quality Monitoring
Run quality check flows independently for data quality monitoring:
# Schedule quality check flow to run daily
# Monitor data quality trends over time
Pipeline Integration
Include quality check flows in larger pipelines:
# Pipeline: Ingest → Transform → Quality Check → Export
Complete Configuration Examples
Example 1: Complete Quality Check Flow
Flow Definition:
# flows/orders_quality_check.toml
id = "orders_quality_check"
name = "Orders Quality Check Flow"
flow_type = "quality_check"
namespace = "production"
description = "Comprehensive quality checks for orders data"
[input]
columns = [] # Quality check flows don't require input columns
[properties]
quality_suites = [
{
id = "orders_completeness_suite"
name = "Orders Completeness Suite"
description = "Validates completeness of orders data"
data_source = {
source_type = "dataset"
dataset_id = "orders_landing"
description = "Full orders landing dataset"
}
checks = [
{
check_id = "orders_completeness"
check_name = "Orders Completeness Check"
check_type = "completeness"
columns = ["order_id", "customer_id", "order_date", "amount"]
severity = "error"
config = {
threshold = 0.95
allow_null = false
description = "At least 95% of records must have non-null values"
}
enabled = true
},
{
check_id = "orders_uniqueness"
check_name = "Orders Uniqueness Check"
check_type = "uniqueness"
columns = ["order_id"]
severity = "error"
config = {
description = "Primary key must be unique"
}
enabled = true
},
{
check_id = "orders_amount_range"
check_name = "Orders Amount Range Check"
check_type = "range"
columns = ["amount"]
severity = "warning"
config = {
min_value = 0
max_value = 1000000
description = "Amount must be between 0 and 1,000,000"
}
enabled = true
}
]
execution_order = "sequential"
stop_on_first_failure = false
enabled = true
}
]
Example 2: Multiple Suites with Different Data Sources
Flow Definition:
# flows/comprehensive_quality_check.toml
id = "comprehensive_quality_check"
name = "Comprehensive Quality Check"
flow_type = "quality_check"
[properties]
quality_suites = [
# Suite 1: Full dataset check
{
id = "full_dataset_suite"
name = "Full Dataset Suite"
data_source = {
source_type = "dataset"
dataset_id = "orders_staging"
}
checks = [
{
check_id = "completeness"
check_type = "completeness"
columns = ["order_id", "customer_id", "amount"]
severity = "error"
config = { threshold = 0.95 }
enabled = true
}
]
execution_order = "sequential"
stop_on_first_failure = false
enabled = true
},
# Suite 2: Subset check
{
id = "pending_orders_suite"
name = "Pending Orders Suite"
data_source = {
source_type = "dataset_subset"
dataset_id = "orders_staging"
filters = {
status = "pending"
amount = { min = 1000 }
}
}
checks = [
{
check_id = "pending_completeness"
check_type = "completeness"
columns = ["order_id", "customer_id", "amount", "status"]
severity = "error"
config = { threshold = 1.0 }
enabled = true
}
]
execution_order = "sequential"
stop_on_first_failure = true
enabled = true
},
# Suite 3: Query-based check
{
id = "high_value_orders_suite"
name = "High Value Orders Suite"
data_source = {
source_type = "query"
query = """
SELECT
order_id,
customer_id,
amount,
order_date
FROM orders_staging
WHERE amount > 10000
ORDER BY amount DESC
"""
}
checks = [
{
check_id = "high_value_completeness"
check_type = "completeness"
columns = ["order_id", "customer_id", "amount"]
severity = "error"
config = { threshold = 1.0 }
enabled = true
},
{
check_id = "high_value_range"
check_type = "range"
columns = ["amount"]
severity = "error"
config = {
min_value = 10000
max_value = 10000000
}
enabled = true
}
]
execution_order = "sequential"
stop_on_first_failure = true
enabled = true
}
]
Quick Reference
Creating a Quality Check Flow
- Create Flow Definition:
qarion-etl new-flow
# Select 'quality_check' as flow type
- Define Quality Suites:
# flows/my_quality_check.toml
id = "my_quality_check"
flow_type = "quality_check"
[properties]
quality_suites = [
{
id = "my_suite"
name = "My Suite"
data_source = { source_type = "dataset", dataset_id = "my_dataset" }
checks = [
{
check_id = "completeness_check"
check_type = "completeness"
columns = ["id", "name"]
severity = "error"
config = { threshold = 0.95 }
enabled = true
}
]
execution_order = "sequential"
stop_on_first_failure = false
enabled = true
}
]
- Run Quality Check Flow:
qarion-etl run --flow my_quality_check
Common Patterns
Pattern 1: Dataset Quality Monitoring
data_source = { source_type = "dataset", dataset_id = "production_table" }
Pattern 2: Subset Validation
data_source = {
source_type = "dataset_subset"
dataset_id = "orders"
filters = { status = "pending" }
}
Pattern 3: Query Result Validation
data_source = {
source_type = "query"
query = "SELECT * FROM orders WHERE amount > 1000"
}
Pattern 4: Multiple Suites in One Flow
quality_suites = [
{ id = "suite1", ... },
{ id = "suite2", ... },
{ id = "suite3", ... }
]
Query Consolidation and Optimization
The quality check system automatically consolidates and optimizes queries for better performance:
Automatic Consolidation
Multiple compatible checks on the same table are automatically consolidated into a single query:
- Completeness checks on the same table are consolidated
- Range and Pattern checks can be consolidated with Completeness checks
- Uniqueness checks are typically kept separate
This reduces database round-trips and improves execution performance.
Optimization
The system uses multi-step optimization to:
- Combine compatible check types into efficient queries
- Optimize transformation instructions
- Generate optimized SQL queries
Related Documentation
- Data Quality Checks - General data quality system documentation
- Flow Types - Overview of all flow types
- Quality Check Integration - How quality checks integrate with flows and plugin development