Data Quality Validation
Overview
Qarion ETL provides a comprehensive data quality checking system that allows you to define and execute quality checks on your data. The system supports both individual quality checks and reconciliation checks for comparing datasets.
The quality check system uses a plugin-based architecture where each check type is implemented as a converter plugin that transforms quality check instructions into transformation operations. This provides flexibility, extensibility, and consistency with the transformation framework.
Note: For running quality checks as a flow, see Quality Check Flow. This document covers the general data quality system and inline quality checks.
Architecture
The data quality system follows the same abstraction pattern as transformations:
- Quality Check Instructions: Engine-agnostic descriptions of quality checks
- Quality Check Planning: Plan quality checks before execution
- DQ Check Converter Plugins: Plugin system that converts instructions to transformation operations
- Quality Check Executors: Engine-specific implementations (SQL, Pandas, etc.)
- Quality Service: Orchestrates quality check execution
- Quality Check Pipeline: Complete pipeline with consolidation and optimization
Plugin System
Each quality check type has a corresponding DQ Check Converter Plugin that handles:
- Converting quality check instructions to transformation operations
- Generating appropriate SQL/operations for the check type
- Defining output columns for the check results
Built-in plugins are automatically registered and can be extended with custom plugins.
Quality Check Types
Built-in Check Types
All check types are implemented as plugins in quality/dq_check/:
- Completeness (
CompletenessConverterPlugin): Check for missing/null values - Uniqueness (
UniquenessConverterPlugin): Verify unique constraints - Referential Integrity (
ReferentialIntegrityConverterPlugin): Check foreign key relationships - Range (
RangeConverterPlugin): Validate value ranges - Pattern (
PatternConverterPlugin): Validate data patterns (regex, etc.)
Reconciliation Checks
Reconciliation checks compare two datasets for consistency:
- Row Count: Compare record counts
- Column Sum: Compare aggregated values
- Exact Match: Compare all values
- Key Match: Verify all keys exist in both datasets
Usage
Defining Quality Checks
Quality checks are defined using QualityCheckInstruction objects:
from qarion_etl.quality import (
QualityCheckInstruction,
QualityCheckType,
QualityCheckSeverity
)
instruction = QualityCheckInstruction(
table_name='orders',
check_id='orders_completeness',
check_name='Orders Completeness Check',
check_type=QualityCheckType.COMPLETENESS,
columns=['order_id', 'customer_id', 'amount'],
severity=QualityCheckSeverity.ERROR,
config={
'threshold': 0.95, # 95% completeness required
'allow_null': False
}
)
Planning Quality Checks
Create a quality check plan to organize and execute multiple checks:
from qarion_etl.quality import (
QualityCheckPlan,
QualityCheckOperation,
QualityCheckType
)
plan = QualityCheckPlan(
plan_id='orders_quality_plan',
plan_name='Orders Quality Plan'
)
# Add checks to the plan
plan.add_check(QualityCheckOperation(
check_id='orders_completeness',
check_name='Orders Completeness Check',
table_name='orders',
check_type=QualityCheckType.COMPLETENESS,
config={
'columns': ['order_id', 'customer_id'],
'threshold': 0.95
}
))
plan.add_check(QualityCheckOperation(
check_id='orders_uniqueness',
check_name='Orders Uniqueness Check',
table_name='orders',
check_type=QualityCheckType.UNIQUENESS,
config={
'columns': ['order_id']
}
))
Executing Quality Checks
Execute quality checks using the QualityService:
from qarion_etl.quality import QualityService
from qarion_etl.engines import SQLiteEngine
engine = SQLiteEngine(config={'path': ':memory:'})
engine.connect()
service = QualityService(engine, enable_query_consolidation=True)
result = service.execute_plan(plan)
print(f"All checks passed: {result['all_passed']}")
print(f"Checks executed: {result['checks_executed']}")
print(f"Checks passed: {result['checks_passed']}")
print(f"Checks failed: {result['checks_failed']}")
# Access individual check results
for check_result in result['results']:
print(f"Check {check_result['check_id']}: {check_result['message']}")
Using the Quality Check Pipeline
For advanced use cases, use the QualityCheckPipeline which provides:
- Query consolidation (combining compatible checks)
- Multi-step optimization
- SQL query generation
from qarion_etl.quality import QualityCheckPipeline
from qarion_etl.db_service import DatabaseService
db_service = DatabaseService(engine)
pipeline = QualityCheckPipeline(
engine=engine,
db_service=db_service,
enable_consolidation=True,
enable_optimization=True
)
# Process plan through complete pipeline
result = pipeline.process_plan(plan)
# Access consolidated plan
consolidated_plan = result['consolidated_plan']
# Access transformation instructions
transformation_instructions = result['transformation_instructions']
# Access optimized instructions
optimized_instructions = result['optimized_instructions']
# Access generated SQL queries
sql_queries = result['sql_queries']
Reconciliation Checks
Reconciliation checks compare two datasets:
from qarion_etl.quality import ReconciliationCheck, ReconciliationType
reconciliation = ReconciliationCheck(
reconciliation_id='orders_reconciliation',
reconciliation_name='Orders Reconciliation',
reconciliation_type=ReconciliationType.ROW_COUNT,
source_table='orders_staging',
target_table='orders_production'
)
result = service.execute_reconciliation(reconciliation)
print(f"Reconciliation passed: {result.passed}")
print(f"Source records: {result.source_records}")
print(f"Target records: {result.target_records}")
Check Type Configuration
Completeness Check
QualityCheckInstruction(
table_name='orders',
check_id='completeness_check',
check_name='Completeness Check',
check_type=QualityCheckType.COMPLETENESS,
columns=['order_id', 'customer_id', 'amount'],
config={
'threshold': 0.95, # Minimum completeness ratio (0.0 to 1.0)
'allow_null': False # Whether NULL values are allowed
}
)
Uniqueness Check
QualityCheckInstruction(
table_name='orders',
check_id='uniqueness_check',
check_name='Uniqueness Check',
check_type=QualityCheckType.UNIQUENESS,
columns=['order_id'], # Columns that should be unique together
config={}
)
Range Check
QualityCheckInstruction(
table_name='orders',
check_id='range_check',
check_name='Range Check',
check_type=QualityCheckType.RANGE,
columns=['amount'],
config={
'min_value': 0, # Minimum allowed value (inclusive)
'max_value': 1000000 # Maximum allowed value (inclusive)
}
)
Pattern Check
QualityCheckInstruction(
table_name='users',
check_id='pattern_check',
check_name='Email Pattern Check',
check_type=QualityCheckType.PATTERN,
columns=['email'],
config={
'pattern': '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$'
}
)
Referential Integrity Check
QualityCheckInstruction(
table_name='orders',
check_id='referential_integrity',
check_name='Referential Integrity Check',
check_type=QualityCheckType.REFERENTIAL_INTEGRITY,
columns=['customer_id'],
config={
'foreign_key_table': 'orders',
'foreign_key_columns': ['customer_id'],
'referenced_table': 'customers',
'referenced_columns': ['customer_id'],
'join_condition': {} # Optional join condition
}
)
Query Consolidation and Optimization
The quality check system automatically consolidates and optimizes queries:
Query Consolidation
Multiple compatible checks on the same table are automatically consolidated into a single query:
# These checks will be consolidated into one query
plan.add_check(QualityCheckOperation(
check_id='completeness_1',
table_name='orders',
check_type=QualityCheckType.COMPLETENESS,
config={'columns': ['order_id']}
))
plan.add_check(QualityCheckOperation(
check_id='completeness_2',
table_name='orders',
check_type=QualityCheckType.COMPLETENESS,
config={'columns': ['customer_id']}
))
plan.add_check(QualityCheckOperation(
check_id='range_check',
table_name='orders',
check_type=QualityCheckType.RANGE,
config={'columns': ['amount'], 'min_value': 0, 'max_value': 1000000}
))
# All three checks can be consolidated into a single query
Optimization
The pipeline uses multi-step optimization to:
- Combine compatible check types
- Optimize transformation instructions
- Generate efficient SQL queries
Extending with Custom Plugins
You can create custom DQ check converter plugins:
from qarion_etl.quality.dq_check import DQCheckConverterPlugin, register_converter_plugin
from qarion_etl.quality.base import QualityCheckType
from qarion_etl.quality.instructions import QualityCheckInstruction
from typing import List, Dict, Any
class CustomCheckConverterPlugin(DQCheckConverterPlugin):
"""Custom converter plugin for a new check type."""
@property
def check_type(self) -> QualityCheckType:
return QualityCheckType.CUSTOM
def generate_operations(
self,
instruction: QualityCheckInstruction
) -> List[Dict[str, Any]]:
"""Generate operations for custom check."""
# Implement your custom logic here
return [
{
'type': 'filter',
'condition': instruction.config.get('custom_condition', '1=1')
}
]
def get_output_columns(
self,
instruction: QualityCheckInstruction
) -> List[str]:
"""Define output columns."""
return ['custom_check_result']
# Register the plugin
register_converter_plugin(CustomCheckConverterPlugin())
Quality Results and Metrics Storage
Qarion ETL provides a comprehensive system for storing and tracking quality check results and metrics over time. This enables historical analysis, trend monitoring, and quality dashboards.
Quality Results Store
The QualityResultsStore automatically persists all quality check execution results, including:
- Check execution details (check_id, check_name, check_type)
- Execution results (passed/failed, severity, message)
- Performance metrics (records_checked, records_failed, execution_time)
- Context information (suite_id, table_name, flow_id, batch_id)
- Execution timestamps for historical tracking
Automatic Storage: Quality check results are automatically stored when executed through:
- Quality check flows
- Quality check nodes in standard flows
- Automatic quality checks after transformations
Example: Querying Results
from qarion_etl.quality.stores import QualityResultsStore
from qarion_etl.engines import create_engine
engine = create_engine('sqlite', connection_string=':memory:')
results_store = QualityResultsStore(engine)
# Get recent results for a specific check
results = results_store.get_results(
check_id='orders_completeness',
start_date=datetime(2024, 1, 1),
limit=100
)
# Get failed checks
failed_results = results_store.get_results(
passed=False,
start_date=datetime(2024, 1, 1)
)
Quality Metrics Store
The QualityMetricsStore stores aggregated quality metrics for trend analysis and monitoring:
Metric Types:
- Pass Rate: Percentage of checks that passed
- Failure Count: Number of failed checks
- Average Execution Time: Average time to execute checks
- Total Records Checked: Total number of records validated
Aggregation Periods:
hourly: Metrics aggregated by hourdaily: Metrics aggregated by day (default)weekly: Metrics aggregated by weekmonthly: Metrics aggregated by month
Automatic Calculation: Metrics are automatically calculated and stored when quality checks are executed (if auto_calculate_metrics=True).
Example: Querying Metrics
from qarion_etl.quality.stores import QualityMetricsStore
metrics_store = QualityMetricsStore(engine)
# Get pass rate trends
pass_rate_metrics = metrics_store.get_metrics(
metric_type='pass_rate',
check_id='orders_completeness',
days=30,
aggregation_period='daily'
)
# Get all metrics for a table
table_metrics = metrics_store.get_metrics(
table_name='orders',
start_date=datetime(2024, 1, 1)
)
Quality Store Service
The QualityStoreService provides a unified interface for managing both results and metrics:
Features:
- Store execution results and automatically calculate metrics
- Get quality summaries with results and metrics
- Track quality trends over time
- Generate quality reports
Example: Using Quality Store Service
from qarion_etl.quality.stores import QualityStoreService
store_service = QualityStoreService(
engine=engine,
auto_calculate_metrics=True
)
# Store results (metrics calculated automatically)
store_result = store_service.store_execution_results(
results=quality_results, # List of QualityCheckResult
suite_id='orders_suite',
suite_name='Orders Quality Suite',
table_name='orders',
flow_id='orders_flow',
batch_id=123
)
# Get quality summary
summary = store_service.get_quality_summary(
check_id='orders_completeness',
start_date=datetime(2024, 1, 1),
end_date=datetime.now()
)
# Returns: {
# 'summary': {'total_checks': 100, 'passed_checks': 95, ...},
# 'recent_results': [...],
# 'latest_metrics': {...},
# 'all_metrics': [...]
# }
# Get trends
trends = store_service.get_trends(
metric_type='pass_rate',
check_id='orders_completeness',
days=30,
aggregation_period='daily'
)
Integration with Quality Service
The QualityService can be configured to automatically store results:
from qarion_etl.quality import QualityService
from qarion_etl.quality.stores import QualityStoreService
# Create store service
store_service = QualityStoreService(engine, auto_calculate_metrics=True)
# Create quality service with store service
quality_service = QualityService(
engine=engine,
enable_query_consolidation=True,
store_service=store_service
)
# Execute plan - results automatically stored
result = quality_service.execute_plan(plan)
Database Tables
The stores create the following tables in your metadata engine:
_quality_results: Stores individual quality check execution results
- Indexed by: suite_id, check_id, table_name, flow_id, execution_timestamp, passed
_quality_metrics: Stores aggregated quality metrics
- Indexed by: check_id, suite_id, table_name, flow_id, metric_type, period_start
Configuration
Quality results and metrics storage can be configured in your qarion-etl.toml file:
[quality_store]
enabled = true # Enable/disable automatic storage (default: true)
auto_calculate_metrics = true # Automatically calculate metrics when storing results (default: true)
results_table_name = "_quality_results" # Table name for storing results (default: "_quality_results")
metrics_table_name = "_quality_metrics" # Table name for storing metrics (default: "_quality_metrics")
Configuration Options:
enabled(boolean, default:true): Whether to enable automatic storage of quality check results and metrics. When disabled, results are not persisted.auto_calculate_metrics(boolean, default:true): Whether to automatically calculate and store metrics when storing results. When enabled, metrics are calculated for each execution.results_table_name(string, default:"_quality_results"): Name of the table to store quality check results in the metadata engine.metrics_table_name(string, default:"_quality_metrics"): Name of the table to store aggregated quality metrics in the metadata engine.
Example Configuration:
# qarion-etl.toml
app = "Qarion ETL"
type = "project"
project_name = "MyProject"
[engine]
name = "sqlite"
[engine.config]
path = "data/db.sqlite"
[quality_store]
enabled = true
auto_calculate_metrics = true
results_table_name = "_quality_results"
metrics_table_name = "_quality_metrics"
Integration with Flows
Quality results are automatically stored when quality checks are executed through:
- Quality Check Flows: Results are automatically stored when quality check flows execute
- Quality Check Tasks: Results are automatically stored when quality check tasks execute in standard flows
- Quality Check Nodes: Results are automatically stored when quality check nodes execute
- Automatic Quality Checks: Results are automatically stored for automatic quality checks after transformations
The quality store service is automatically created from configuration and passed to all quality check executions.
Task Type Integration
Quality check tasks in standard flows automatically use the quality store service:
[[tasks]]
id = "check_orders_quality"
type = "quality_check"
name = "Check Orders Quality"
source_dataset_id = "orders"
target_dataset_id = "orders" # Same as source for quality checks
[tasks.properties]
quality_checks = [
{
check_id = "orders_completeness"
check_type = "completeness"
columns = ["order_id", "customer_id", "amount"]
}
]
Results are automatically stored when the task executes, using the configured quality store settings.
Best Practices
- Enable Automatic Storage: Configure
quality_store.enabled = trueinqarion-etl.tomlfor automatic tracking - Automatic Metrics: Set
auto_calculate_metrics = trueto automatically track trends - Regular Metrics Review: Query metrics regularly to identify quality trends
- Historical Analysis: Use date ranges to analyze quality improvements over time
- Alert on Trends: Monitor pass rate trends to catch quality degradation early
- Batch Tracking: Use batch_id to correlate quality results with data loads
- Configure Table Names: Use descriptive table names if you need multiple quality stores
- Disable if Not Needed: Set
enabled = falseif you don't need historical tracking
Integration with Flows
Quality checks can be integrated into flow execution in two ways:
1. Quality Check Flow
Use the quality_check flow type to run quality checks as a dedicated flow. See Quality Check Flow Documentation for complete details.
2. Automatic Quality Checks in Dataset Properties
Configure automatic quality checks in dataset properties. These checks run automatically after transformations complete:
Dataset Configuration Example:
# datasets/orders_staging.toml
id = "orders_staging"
name = "Orders Staging Dataset"
namespace = "staging"
[columns]
[columns.id]
schema_type = "integer"
required = true
[columns.customer_id]
schema_type = "integer"
required = true
[columns.amount]
schema_type = "float"
required = true
[properties]
# Configure automatic quality checks
quality_stop_on_first_failure = false
quality_fail_on_error = false
[[properties.quality_checks]]
check_id = "completeness_check"
check_name = "Completeness Check"
check_type = "completeness"
enabled = true
[properties.quality_checks.config]
columns = ["id", "customer_id", "amount"]
threshold = 0.95
allow_null = false
[[properties.quality_checks]]
check_id = "uniqueness_check"
check_name = "Uniqueness Check"
check_type = "uniqueness"
enabled = true
[properties.quality_checks.config]
columns = ["id"]
[[properties.quality_checks]]
check_id = "range_check"
check_name = "Amount Range Check"
check_type = "range"
enabled = true
[properties.quality_checks.config]
columns = ["amount"]
min_value = 0
max_value = 1000000
[[properties.quality_checks]]
check_id = "pattern_check"
check_name = "Email Pattern Check"
check_type = "pattern"
enabled = true
[properties.quality_checks.config]
columns = ["email"]
pattern = "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"
Configuration Options:
quality_stop_on_first_failure: Iftrue, stops executing checks after first failurequality_fail_on_error: Iftrue, fails the entire flow when quality checks fail- Each quality check can be enabled/disabled individually
- Check configurations are specific to each check type
3. Inline Quality Checks in Standard Flows
Quality checks can be added as tasks in Standard flows:
# flows/my_flow.toml
flow_type = "standard"
[[tasks]]
id = "quality_check"
type = "dq_check"
source_dataset_id = "orders_staging"
dependencies = ["transform_orders"]
[tasks.properties]
operation = "quality_check"
table_type = "staging"
[tasks.properties.quality_checks]
[[tasks.properties.quality_checks]]
check_id = "completeness"
check_type = "completeness"
enabled = true
[tasks.properties.quality_checks.config]
columns = ["id", "customer_id"]
threshold = 0.95
Python API Example:
from qarion_etl.flows.execution import FlowExecutionService
# Quality checks can be added to flow execution plans
execution_plan = service.plan_flow_execution(flow_definition, datasets)
# Quality checks would be executed after transformations
Best Practices
- Use Quality Check Instructions: Always use
QualityCheckInstructionto define checks (not deprecated concrete classes) - Plan Before Execution: Always create a quality check plan before execution
- Use Appropriate Severity: Use ERROR for critical checks, WARNING for informational
- Enable Consolidation: Use query consolidation to optimize performance
- Stop on First Failure: Configure
stop_on_first_failurefor critical pipelines - Use Reconciliation: Use reconciliation checks to compare staging and production datasets
- Leverage Plugins: Create custom plugins for domain-specific quality checks
API Reference
Core Classes
QualityCheckInstruction: Engine-agnostic quality check definitionQualityCheckPlan: Collection of quality checks to executeQualityCheckOperation: Single check operation in a planQualityService: Service for executing quality checksQualityCheckPipeline: Complete pipeline with consolidation and optimization
Plugin System
DQCheckConverterPlugin: Base class for converter pluginsregister_converter_plugin(): Register a custom pluginget_converter_plugin(): Get plugin for a check typelist_converter_plugins(): List all registered plugins
Check Types
QualityCheckType: Enum of available check typesQualityCheckSeverity: Enum of severity levels (ERROR, WARNING, INFO)
Quality Check Execution Tracking
All quality check executions are automatically tracked in the metadata system. Each quality check execution is recorded in xt_quality_check_executions with:
- Check type and configuration
- Execution timing
- Results (passed/failed, rows checked, rows failed, failure rate)
- Detailed check results (JSON)
You can query quality check history to monitor data quality trends over time:
-- Get quality check trends
SELECT
DATE(execution_time) as check_date,
check_type,
COUNT(*) as check_count,
SUM(CASE WHEN passed = 1 THEN 1 ELSE 0 END) as passed_count,
AVG(failure_rate) as avg_failure_rate
FROM xt_quality_check_executions
WHERE execution_time > datetime('now', '-30 days')
GROUP BY DATE(execution_time), check_type
ORDER BY check_date DESC;
For more information about accessing and using execution metadata, see the Metadata Tracking Guide.
Related Documentation
- Quality Check Flow - Running quality checks as flows
- Metadata Tracking - Tracking and monitoring all operations and executions
- Quality Check Integration - Developer integration guide