Skip to main content

Drift Detection Guide

This guide shows how to implement continuous monitoring and drift detection for AI systems using Qarion's quality check infrastructure. You'll learn how to define drift checks as YAML config, push model metrics from your ML pipeline, and integrate drift gates into CI/CD.


Architecture Overview

Qarion's drift detection leverages four existing check types:

Check TypeUse CaseInput Method
sql_metricFeature distribution, volume, null ratesSQL against data source
sql_conditionSchema validation, unexpected categoriesSQL against data source
reconciliationCross-stage or cross-source parityDual SQL queries
customModel metrics, latency, error ratesExternal push via API/SDK

All check types support thresholds, scheduled execution, and alert triggering.


YAML Templates

Data Drift Monitoring

Track feature distributions, volume, and completeness:

version: "1.0"
space: ml-production

defaults:
connector: warehouse-snowflake
schedule: "0 6 * * *" # Daily at 6 AM

checks:
# Feature distribution — mean shift detection
- slug: feature-purchase-amount-mean
name: Purchase Amount Mean Drift
type: sql_metric
description: "Alert if mean purchase amount drifts beyond training baseline (μ=45.2)"
product: customer-features
query: "SELECT AVG(purchase_amount) FROM features.customer_transactions WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'"
thresholds:
operator: between
min: 35.0 # ~2σ below baseline
max: 55.0 # ~2σ above baseline

# Feature distribution — stddev shift
- slug: feature-purchase-amount-stddev
name: Purchase Amount StdDev Drift
type: sql_metric
product: customer-features
query: "SELECT STDDEV(purchase_amount) FROM features.customer_transactions WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'"
thresholds:
operator: lte
value: 30.0 # Alert if variance explodes

# Volume anomaly — row count bounds
- slug: daily-transaction-volume
name: Daily Transaction Volume
type: sql_metric
product: customer-features
query: "SELECT COUNT(*) FROM features.customer_transactions WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'"
thresholds:
operator: between
min: 8000 # 20% below normal
max: 15000 # 50% above normal

# Null rate monitoring
- slug: feature-age-null-rate
name: Customer Age Null Rate
type: sql_metric
product: customer-features
query: >
SELECT CAST(SUM(CASE WHEN age IS NULL THEN 1 ELSE 0 END) AS FLOAT)
* 100.0 / NULLIF(COUNT(*), 0)
FROM features.customer_transactions
WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'
thresholds:
operator: lte
value: 5.0 # Max 5% nulls
warn: 2.0 # Warn above 2%

# New category detection
- slug: no-unknown-categories
name: No Unknown Product Categories
type: sql_condition
product: customer-features
query: >
SELECT DISTINCT category
FROM features.customer_transactions
WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'
AND category NOT IN ('electronics', 'clothing', 'food', 'services', 'other')

# Cross-stage reconciliation
- slug: staging-prod-feature-parity
name: Staging vs Production Feature Parity
type: reconciliation
configuration:
source_query: "SELECT COUNT(*) FROM staging.customer_transactions WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'"
target_query: "SELECT COUNT(*) FROM prod.customer_transactions WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'"
comparison_mode: percentage
tolerance: 0.05 # 5% tolerance

Model Performance Monitoring

Model metrics are pushed from your ML pipeline using custom checks:

version: "1.0"
space: ml-production

checks:
# Accuracy tracking
- slug: churn-model-accuracy
name: Churn Model Accuracy
type: custom
description: "Pushed from evaluation pipeline after each batch prediction"
product: churn-prediction-model
thresholds:
operator: gte
value: 0.85 # Fail below 85%
warn: 0.90 # Warn below 90%

# F1 Score
- slug: churn-model-f1
name: Churn Model F1 Score
type: custom
product: churn-prediction-model
thresholds:
operator: gte
value: 0.80

# Inference latency (p95)
- slug: churn-model-latency-p95
name: Inference Latency P95
type: custom
product: churn-prediction-model
schedule: "*/15 * * * *" # Every 15 min
thresholds:
operator: lte
value: 200 # Max 200ms p95
warn: 150

# Error rate
- slug: churn-model-error-rate
name: Inference Error Rate
type: custom
product: churn-prediction-model
schedule: "*/15 * * * *"
thresholds:
operator: lte
value: 1.0 # Max 1% error rate
warn: 0.5

Concept Drift Monitoring

Track prediction-vs-actual divergence using SQL metrics:

version: "1.0"
space: ml-production

defaults:
connector: warehouse-snowflake
schedule: "0 0 * * 1" # Weekly on Monday

checks:
# Prediction error trend
- slug: churn-prediction-error
name: Churn Prediction Error Rate
type: sql_metric
product: churn-prediction-model
query: >
SELECT AVG(ABS(CAST(predicted_churn AS FLOAT) - CAST(actual_churn AS FLOAT)))
FROM ml.churn_predictions
WHERE prediction_date >= CURRENT_DATE - INTERVAL '7 days'
AND actual_churn IS NOT NULL
thresholds:
operator: lte
value: 0.15 # Max 15% average error
warn: 0.10

# Prediction distribution shift
- slug: churn-prediction-rate
name: Predicted Churn Rate
type: sql_metric
product: churn-prediction-model
query: >
SELECT AVG(CAST(predicted_churn AS FLOAT))
FROM ml.churn_predictions
WHERE prediction_date >= CURRENT_DATE - INTERVAL '7 days'
thresholds:
operator: between
min: 0.05 # Baseline churn rate ± bounds
max: 0.25

SDK Integration

Pushing Model Metrics

Use the Qarion SDK to push metrics from your ML pipeline:

from qarion import QarionSyncClient

client = QarionSyncClient(api_key="qk_...")

# After model evaluation
metrics = evaluate_model(model, test_data)

# Push each metric to Qarion
client.quality.push_result(
space="ml-production",
check="churn-model-accuracy",
status="pass" if metrics["accuracy"] >= 0.85 else "fail",
value=metrics["accuracy"],
)

client.quality.push_result(
space="ml-production",
check="churn-model-f1",
status="pass" if metrics["f1"] >= 0.80 else "fail",
value=metrics["f1"],
)

Pushing Performance Metrics

Push operational metrics from your inference service:

from qarion import QarionSyncClient

client = QarionSyncClient(api_key="qk_...")

# From your monitoring stack
latency_p95 = get_latency_percentile(95)
error_rate = get_error_rate()

client.quality.push_result(
space="ml-production",
check="churn-model-latency-p95",
status="pass" if latency_p95 <= 200 else "fail",
value=latency_p95,
)

client.quality.push_result(
space="ml-production",
check="churn-model-error-rate",
status="pass" if error_rate <= 1.0 else "fail",
value=error_rate,
)

Pipeline Integration

Airflow

Add a drift gate as a downstream task in your ML pipeline:

from airflow.operators.python import PythonOperator

def check_drift():
from qarion import QarionSyncClient
from qarion.models.dq_config import DqConfig

config = DqConfig.from_yaml("/opt/airflow/dags/drift-checks.yaml")
client = QarionSyncClient(api_key="qk_...")

results = client.quality.run_config(config)
failed = [r for r in results if not r.is_passed]

if failed:
# Log failures but don't block pipeline
for f in failed:
print(f"DRIFT DETECTED: {f.check_slug} = {f.value}")
# Optionally raise to block pipeline
# raise Exception(f"{len(failed)} drift check(s) failed")

drift_gate = PythonOperator(
task_id="drift_detection_gate",
python_callable=check_drift,
)

# train >> evaluate >> drift_gate >> deploy

CI/CD Quality Gate

Block deployments when drift exceeds thresholds:

name: ML Deployment Gate

on:
push:
branches: [main]
paths:
- "models/**"

jobs:
drift-check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- uses: actions/setup-python@v5
with:
python-version: "3.11"

- run: pip install qarion-cli

- name: Run drift checks
run: qarion quality run-config -f drift-checks.yaml
env:
QARION_API_KEY: ${{ secrets.QARION_API_KEY }}

Re-assessment Triggers

When drift monitoring detects sustained issues, it should trigger a formal risk re-assessment:

from qarion import QarionSyncClient

client = QarionSyncClient(api_key="qk_...")

# After detecting critical drift
assessment = client.request(
"POST",
f"/catalog/spaces/ml-production/products/{product_id}/risk-assessments",
json={
"assessment_type": "automated_drift",
"lifecycle_stage": "monitoring",
"evaluations": [
{
"category": "data_drift",
"severity": 4,
"likelihood": 4,
"impact": 3,
"description": "Feature distribution shift detected in purchase_amount (2.3σ from baseline)",
"evidence": "Mean shifted from 45.2 to 62.1 over 7 days",
}
],
},
)

This creates a formal risk record that is tracked in the product's governance history and can trigger review workflows.