Skip to main content

Automating Quality Checks

This tutorial walks through setting up quality monitoring programmatically — from defining reusable check templates to integrating quality gates into CI/CD pipelines and building dashboards that aggregate results across your catalog.

By the end, you'll have a complete quality automation pipeline that creates checks for new products, triggers them as part of your deployment process, monitors results, and handles alerts programmatically.

Step 1: Define Check Templates

Rather than configuring each quality check from scratch, define templates that encode your organization's quality standards. Templates make it easy to apply a consistent set of checks to every new product:

CHECK_TEMPLATES = {
"freshness_24h": {
"check_type": "freshness",
"config": {
"timestamp_column": "updated_at",
"max_age_hours": 24
},
"schedule": "0 */6 * * *",
"severity": "critical"
},
"row_count_minimum": {
"check_type": "row_count",
"config": {
"min_rows": 100,
"max_rows": None
},
"schedule": "0 8 * * *",
"severity": "warning"
},
"id_uniqueness": {
"check_type": "uniqueness",
"config": {
"column_name": "id"
},
"schedule": "0 8 * * *",
"severity": "critical"
},
"id_not_null": {
"check_type": "not_null",
"config": {
"column_name": "id"
},
"schedule": "0 8 * * *",
"severity": "critical"
}
}

Each template captures the check type, configuration parameters, schedule, and severity level. The freshness check runs every 6 hours (because detecting stale data quickly is important), while the structural checks run daily at 8 AM (since schema issues are typically introduced by deployments rather than runtime events).

Step 2: Create Checks for Products

With templates defined, writing a function that applies them to any product becomes straightforward. The function below creates all checks from a set of selected templates for a given product:

import requests

API_BASE = "https://api.qarion.com"
API_KEY = "your-api-key"
HEADERS = {"Authorization": f"Bearer {API_KEY}"}

def create_checks_for_product(product_id, templates=None):
"""Apply quality check templates to a product."""
if templates is None:
templates = CHECK_TEMPLATES.keys()

created_checks = []

for template_name in templates:
template = CHECK_TEMPLATES[template_name]

check_data = {
"name": f"{template_name}",
"product_id": product_id,
**template
}

response = requests.post(
f"{API_BASE}/quality/checks",
headers=HEADERS,
json=check_data
)

if response.status_code == 201:
created_checks.append(response.json())
print(f" Created: {template_name}")
else:
print(f" Failed: {template_name} - {response.text}")

return created_checks

You can apply all templates to a product, or select specific ones. For example, a staging table might only need freshness and row count checks, while a production-critical dimension table should have the full suite.

Step 3: Bulk Apply to All Products

To bootstrap quality monitoring across your entire catalog, iterate through all products in a space and apply the template checks to each one:

def setup_quality_for_space(space_slug):
"""Apply standard quality checks to all products in a space."""
response = requests.get(
f"{API_BASE}/catalog/spaces/{space_slug}/products?size=100",
headers=HEADERS
)
products = response.json()["items"]

for product in products:
print(f"\nSetting up checks for: {product['name']}")
create_checks_for_product(product["id"])

print(f"\nComplete: {len(products)} products configured")

For large catalogs, consider adding pagination handling (see the Pagination guide) and rate limit awareness to this loop.

Step 4: CI/CD Quality Gates

One of the most powerful uses of quality automation is integrating checks into your deployment pipeline. By triggering quality checks after each deployment and gating promotion on the results, you can catch data issues before they reach production:

import time

def run_quality_gate(product_id, timeout=300):
"""Run all checks for a product and wait for results."""
# Get all checks for this product
checks_response = requests.get(
f"{API_BASE}/quality/checks?product_id={product_id}",
headers=HEADERS
)
checks = checks_response.json()["items"]

# Trigger all checks
executions = []
for check in checks:
response = requests.post(
f"{API_BASE}/quality/checks/{check['id']}/run",
headers=HEADERS
)
executions.append({
"check_id": check["id"],
"check_name": check["name"]
})

# Wait for all checks to complete
start_time = time.time()
while time.time() - start_time < timeout:
all_complete = True
for execution in executions:
status_response = requests.get(
f"{API_BASE}/quality/checks/{execution['check_id']}/latest",
headers=HEADERS
)
result = status_response.json()
execution["status"] = result.get("status")
execution["passed"] = result.get("passed")

if result.get("status") == "running":
all_complete = False

if all_complete:
break

time.sleep(10)

# Return results
passed = all(e.get("passed") for e in executions)
return {"passed": passed, "checks": executions}

In a CI/CD pipeline, you'd call run_quality_gate after deploying your data transformations and fail the pipeline if any critical check doesn't pass:

# GitHub Actions example
- name: Run Quality Gate
run: |
result=$(python scripts/quality_gate.py --product-id $PRODUCT_ID)
if [ "$result" != "passed" ]; then
echo "Quality gate failed!"
exit 1
fi

Step 5: Monitor Results

Beyond CI/CD gates, you'll want ongoing visibility into quality trends. The following function fetches recent check results and summarizes them by status:

def get_quality_summary(space_slug):
"""Get a summary of quality check results for a space."""
response = requests.get(
f"{API_BASE}/quality/checks?space={space_slug}&size=100",
headers=HEADERS
)
checks = response.json()["items"]

summary = {"passing": 0, "failing": 0, "no_data": 0}

for check in checks:
latest = requests.get(
f"{API_BASE}/quality/checks/{check['id']}/latest",
headers=HEADERS
).json()

if latest.get("passed") is True:
summary["passing"] += 1
elif latest.get("passed") is False:
summary["failing"] += 1
else:
summary["no_data"] += 1

summary["total"] = len(checks)
summary["health"] = (
summary["passing"] / summary["total"] * 100
if summary["total"] > 0 else 0
)

return summary

Step 6: Handle Alerts Programmatically

When a quality check fails, it creates an alert in Qarion. For automated pipelines, you may want to handle these alerts programmatically — acknowledging them, creating issues for investigation, or notifying external systems:

def process_quality_alerts(space_slug):
"""Process open quality alerts and take action."""
response = requests.get(
f"{API_BASE}/alerts?space={space_slug}&status=open",
headers=HEADERS
)
alerts = response.json()["items"]

for alert in alerts:
# Auto-acknowledge
requests.patch(
f"{API_BASE}/alerts/{alert['id']}",
headers=HEADERS,
json={"status": "acknowledged"}
)

# Create issue for critical alerts
if alert["severity"] == "critical":
requests.post(
f"{API_BASE}/issues",
headers=HEADERS,
json={
"title": f"Quality Alert: {alert['message']}",
"description": f"Auto-created from alert {alert['id']}",
"priority": "high",
"space_id": alert["space_id"]
}
)

# Notify external system
send_notification(alert)

This pattern is useful for connecting Qarion's quality monitoring to your existing incident management workflow — automatically creating tickets in Jira, sending PagerDuty alerts for critical failures, or posting summaries to Slack.

Step 7: Quality Dashboard

Combining the summary and alert processing into a periodic job gives you a living quality dashboard that runs continuously and surfaces problems proactively:

def quality_dashboard_job():
"""Periodic job to generate quality dashboards."""
spaces = requests.get(
f"{API_BASE}/spaces",
headers=HEADERS
).json()

for space in spaces:
summary = get_quality_summary(space["slug"])

print(f"\n{'='*50}")
print(f"Space: {space['name']}")
print(f"Health: {summary['health']:.1f}%")
print(f"Passing: {summary['passing']}/{summary['total']}")
print(f"Failing: {summary['failing']}")

if summary["health"] < 80:
print("⚠️ Health below threshold!")
process_quality_alerts(space["slug"])

Schedule this job to run hourly or daily (for example, via cron or a workflow orchestrator) to maintain continuous visibility across all your spaces.