Drift Detection Guide
This guide shows how to implement continuous monitoring and drift detection for AI systems using Qarion's quality check infrastructure. You'll learn how to define drift checks as YAML config, push model metrics from your ML pipeline, and integrate drift gates into CI/CD.
Architecture Overview
Qarion's drift detection leverages four existing check types:
| Check Type | Use Case | Input Method |
|---|---|---|
sql_metric | Feature distribution, volume, null rates | SQL against data source |
sql_condition | Schema validation, unexpected categories | SQL against data source |
reconciliation | Cross-stage or cross-source parity | Dual SQL queries |
custom | Model metrics, latency, error rates | External push via API/SDK |
All check types support thresholds, scheduled execution, and alert triggering.
YAML Templates
Data Drift Monitoring
Track feature distributions, volume, and completeness:
version: "1.0"
space: ml-production
defaults:
connector: warehouse-snowflake
schedule: "0 6 * * *" # Daily at 6 AM
checks:
# Feature distribution — mean shift detection
- slug: feature-purchase-amount-mean
name: Purchase Amount Mean Drift
type: sql_metric
description: "Alert if mean purchase amount drifts beyond training baseline (μ=45.2)"
product: customer-features
query: "SELECT AVG(purchase_amount) FROM features.customer_transactions WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'"
thresholds:
operator: between
min: 35.0 # ~2σ below baseline
max: 55.0 # ~2σ above baseline
# Feature distribution — stddev shift
- slug: feature-purchase-amount-stddev
name: Purchase Amount StdDev Drift
type: sql_metric
product: customer-features
query: "SELECT STDDEV(purchase_amount) FROM features.customer_transactions WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'"
thresholds:
operator: lte
value: 30.0 # Alert if variance explodes
# Volume anomaly — row count bounds
- slug: daily-transaction-volume
name: Daily Transaction Volume
type: sql_metric
product: customer-features
query: "SELECT COUNT(*) FROM features.customer_transactions WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'"
thresholds:
operator: between
min: 8000 # 20% below normal
max: 15000 # 50% above normal
# Null rate monitoring
- slug: feature-age-null-rate
name: Customer Age Null Rate
type: sql_metric
product: customer-features
query: >
SELECT CAST(SUM(CASE WHEN age IS NULL THEN 1 ELSE 0 END) AS FLOAT)
* 100.0 / NULLIF(COUNT(*), 0)
FROM features.customer_transactions
WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'
thresholds:
operator: lte
value: 5.0 # Max 5% nulls
warn: 2.0 # Warn above 2%
# New category detection
- slug: no-unknown-categories
name: No Unknown Product Categories
type: sql_condition
product: customer-features
query: >
SELECT DISTINCT category
FROM features.customer_transactions
WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'
AND category NOT IN ('electronics', 'clothing', 'food', 'services', 'other')
# Cross-stage reconciliation
- slug: staging-prod-feature-parity
name: Staging vs Production Feature Parity
type: reconciliation
configuration:
source_query: "SELECT COUNT(*) FROM staging.customer_transactions WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'"
target_query: "SELECT COUNT(*) FROM prod.customer_transactions WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'"
comparison_mode: percentage
tolerance: 0.05 # 5% tolerance
Model Performance Monitoring
Model metrics are pushed from your ML pipeline using custom checks:
version: "1.0"
space: ml-production
checks:
# Accuracy tracking
- slug: churn-model-accuracy
name: Churn Model Accuracy
type: custom
description: "Pushed from evaluation pipeline after each batch prediction"
product: churn-prediction-model
thresholds:
operator: gte
value: 0.85 # Fail below 85%
warn: 0.90 # Warn below 90%
# F1 Score
- slug: churn-model-f1
name: Churn Model F1 Score
type: custom
product: churn-prediction-model
thresholds:
operator: gte
value: 0.80
# Inference latency (p95)
- slug: churn-model-latency-p95
name: Inference Latency P95
type: custom
product: churn-prediction-model
schedule: "*/15 * * * *" # Every 15 min
thresholds:
operator: lte
value: 200 # Max 200ms p95
warn: 150
# Error rate
- slug: churn-model-error-rate
name: Inference Error Rate
type: custom
product: churn-prediction-model
schedule: "*/15 * * * *"
thresholds:
operator: lte
value: 1.0 # Max 1% error rate
warn: 0.5
Concept Drift Monitoring
Track prediction-vs-actual divergence using SQL metrics:
version: "1.0"
space: ml-production
defaults:
connector: warehouse-snowflake
schedule: "0 0 * * 1" # Weekly on Monday
checks:
# Prediction error trend
- slug: churn-prediction-error
name: Churn Prediction Error Rate
type: sql_metric
product: churn-prediction-model
query: >
SELECT AVG(ABS(CAST(predicted_churn AS FLOAT) - CAST(actual_churn AS FLOAT)))
FROM ml.churn_predictions
WHERE prediction_date >= CURRENT_DATE - INTERVAL '7 days'
AND actual_churn IS NOT NULL
thresholds:
operator: lte
value: 0.15 # Max 15% average error
warn: 0.10
# Prediction distribution shift
- slug: churn-prediction-rate
name: Predicted Churn Rate
type: sql_metric
product: churn-prediction-model
query: >
SELECT AVG(CAST(predicted_churn AS FLOAT))
FROM ml.churn_predictions
WHERE prediction_date >= CURRENT_DATE - INTERVAL '7 days'
thresholds:
operator: between
min: 0.05 # Baseline churn rate ± bounds
max: 0.25
SDK Integration
Pushing Model Metrics
Use the Qarion SDK to push metrics from your ML pipeline:
from qarion import QarionSyncClient
client = QarionSyncClient(api_key="qk_...")
# After model evaluation
metrics = evaluate_model(model, test_data)
# Push each metric to Qarion
client.quality.push_result(
space="ml-production",
check="churn-model-accuracy",
status="pass" if metrics["accuracy"] >= 0.85 else "fail",
value=metrics["accuracy"],
)
client.quality.push_result(
space="ml-production",
check="churn-model-f1",
status="pass" if metrics["f1"] >= 0.80 else "fail",
value=metrics["f1"],
)
Pushing Performance Metrics
Push operational metrics from your inference service:
from qarion import QarionSyncClient
client = QarionSyncClient(api_key="qk_...")
# From your monitoring stack
latency_p95 = get_latency_percentile(95)
error_rate = get_error_rate()
client.quality.push_result(
space="ml-production",
check="churn-model-latency-p95",
status="pass" if latency_p95 <= 200 else "fail",
value=latency_p95,
)
client.quality.push_result(
space="ml-production",
check="churn-model-error-rate",
status="pass" if error_rate <= 1.0 else "fail",
value=error_rate,
)
Pipeline Integration
Airflow
Add a drift gate as a downstream task in your ML pipeline:
from airflow.operators.python import PythonOperator
def check_drift():
from qarion import QarionSyncClient
from qarion.models.dq_config import DqConfig
config = DqConfig.from_yaml("/opt/airflow/dags/drift-checks.yaml")
client = QarionSyncClient(api_key="qk_...")
results = client.quality.run_config(config)
failed = [r for r in results if not r.is_passed]
if failed:
# Log failures but don't block pipeline
for f in failed:
print(f"DRIFT DETECTED: {f.check_slug} = {f.value}")
# Optionally raise to block pipeline
# raise Exception(f"{len(failed)} drift check(s) failed")
drift_gate = PythonOperator(
task_id="drift_detection_gate",
python_callable=check_drift,
)
# train >> evaluate >> drift_gate >> deploy
CI/CD Quality Gate
Block deployments when drift exceeds thresholds:
name: ML Deployment Gate
on:
push:
branches: [main]
paths:
- "models/**"
jobs:
drift-check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- run: pip install qarion-cli
- name: Run drift checks
run: qarion quality run-config -f drift-checks.yaml
env:
QARION_API_KEY: ${{ secrets.QARION_API_KEY }}
Re-assessment Triggers
When drift monitoring detects sustained issues, it should trigger a formal risk re-assessment:
from qarion import QarionSyncClient
client = QarionSyncClient(api_key="qk_...")
# After detecting critical drift
assessment = client.request(
"POST",
f"/catalog/spaces/ml-production/products/{product_id}/risk-assessments",
json={
"assessment_type": "automated_drift",
"lifecycle_stage": "monitoring",
"evaluations": [
{
"category": "data_drift",
"severity": 4,
"likelihood": 4,
"impact": 3,
"description": "Feature distribution shift detected in purchase_amount (2.3σ from baseline)",
"evidence": "Mean shifted from 45.2 to 62.1 over 7 days",
}
],
},
)
This creates a formal risk record that is tracked in the product's governance history and can trigger review workflows.
Related
- DQ Config (YAML) — Full YAML config specification
- Quality Framework — Quality dimensions and check types
- SDK Quality Resource — Python SDK method reference
- CLI Quality Commands — CLI command reference