Skip to main content

Airflow Integration

This tutorial shows how to integrate Qarion into your Apache Airflow DAGs to register data products, track refreshes, run quality checks (blocking and non-blocking), and enforce data governance policies as part of your pipeline orchestration.

By the end, you'll have a production-ready DAG that registers products in the catalog, validates data quality after each pipeline run, and tracks refresh events — all wired together with blocking and non-blocking quality gates.

Prerequisites

Before you start, make sure you have:

  • Qarion SDK installed in your Airflow environment:
    pip install qarion-sdk
  • An API key — generate one from the Qarion platform (Settings → API Keys).
  • Airflow 2.x+ with the TaskFlow API.

Store your API key as an Airflow variable or connection:

from airflow.models import Variable

QARION_API_KEY = Variable.get("qarion_api_key")

Register or Update a Data Product

Use register_product to create or update a data product definition in the Qarion catalog. This method uses upsert logic: if a product with the given slug already exists, it updates the fields you provide; otherwise, it creates a new product.

This is the recommended entrypoint for pipelines that own their product definitions — call it at the start of each DAG run to keep the catalog in sync with your pipeline code.

Basic Registration

@task()
def register_product(space_slug: str) -> str:
"""Register (or update) a data product in the Qarion catalog."""
from qarion import QarionSyncClient
from airflow.models import Variable

client = QarionSyncClient(api_key=Variable.get("qarion_api_key"))

product = client.products.register_product(
space_slug,
name="Orders Table",
slug="orders-table",
description_markdown="Aggregated order data refreshed daily by Airflow.",
product_type="Table",
tags=["airflow-managed", "finance"],
)

client.close()
return str(product.id)

How it works: On the first run, register_product creates the product. On subsequent runs, it updates the description, type, and tags to match the values in your DAG code — keeping the catalog definition aligned with your pipeline without manual intervention.

Tip: Always provide an explicit slug to ensure stability across runs. If omitted, the slug is derived from the name, which can break if the name changes.

Register Multiple Products

For pipelines that produce several outputs, register all products in a single task:

@task()
def register_products(space_slug: str) -> dict[str, str]:
"""Register multiple data products and return their IDs."""
from qarion import QarionSyncClient
from airflow.models import Variable

client = QarionSyncClient(api_key=Variable.get("qarion_api_key"))

products_config = [
{
"name": "Orders Table",
"slug": "orders-table",
"product_type": "Table",
"tags": ["airflow-managed", "finance"],
"description_markdown": "Aggregated order data refreshed daily.",
},
{
"name": "Customer Profiles",
"slug": "customer-profiles",
"product_type": "Table",
"tags": ["airflow-managed", "marketing"],
"description_markdown": "Enriched customer profiles from CRM + warehouse.",
},
]

ids = {}
for config in products_config:
product = client.products.register_product(space_slug, **config)
ids[config["slug"]] = str(product.id)

client.close()
return ids

Register Schema Fields

After registering a product, you can push its field definitions so the catalog reflects the current table schema:

@task()
def register_schema(space_slug: str, product_id: str):
"""Push the current schema to the Qarion catalog."""
import requests
from airflow.models import Variable

api_base = Variable.get("qarion_api_url")
headers = {"Authorization": f"Bearer {Variable.get('qarion_api_key')}"}

# Define fields to register (e.g., from a schema introspection step)
fields = [
{"name": "order_id", "field_type": "INTEGER", "description": "Primary key"},
{"name": "customer_id", "field_type": "INTEGER", "description": "FK to customers"},
{"name": "total_amount", "field_type": "DECIMAL", "description": "Order total in USD"},
{"name": "created_at", "field_type": "TIMESTAMP", "description": "Order creation time"},
]

for field in fields:
requests.post(
f"{api_base}/catalog/spaces/{space_slug}/products/{product_id}/fields",
headers=headers,
json=field,
)

Register a Data Refresh

After your ETL pipeline completes, register the refresh event in Qarion so the catalog tracks when each product was last updated, who triggered it, and whether it succeeded. This powers freshness monitoring and visibility in the data catalog.

@task()
def register_refresh(space_slug: str, product_id: str, trigger_source: str = "airflow"):
"""Register a data product refresh event in Qarion."""
from qarion import QarionSyncClient
from airflow.models import Variable

client = QarionSyncClient(api_key=Variable.get("qarion_api_key"))

refresh = client.products.register_refresh(
space_slug,
product_id,
trigger_source=trigger_source,
status="success",
)

client.close()
print(f"Refresh registered: {refresh.id}")

Handling Failures

If your ETL fails, register the failure so the catalog reflects the broken state:

@task(trigger_rule="all_failed")
def register_failed_refresh(space_slug: str, product_id: str):
"""Register a failed refresh when ETL fails."""
from qarion import QarionSyncClient
from airflow.models import Variable

client = QarionSyncClient(api_key=Variable.get("qarion_api_key"))

client.products.register_refresh(
space_slug,
product_id,
trigger_source="airflow",
status="failed",
)

client.close()

With Row Count and Duration

The integration resource provides additional metadata fields for richer observability:

@task()
def register_refresh_with_metadata(space_slug: str, product_id: str, etl_result: dict):
"""Register a refresh with row count and duration metadata."""
from qarion import QarionSyncClient
from airflow.models import Variable

client = QarionSyncClient(api_key=Variable.get("qarion_api_key"))

client.integrations.register_refresh(
space_slug,
product_id,
source="airflow",
status="success",
row_count=etl_result.get("rows_processed"),
duration_seconds=etl_result.get("duration_seconds"),
)

client.close()

Wiring It into Your DAG

Combine refresh registration with quality checks so they run in sequence:

@dag(schedule="0 6 * * *", start_date=datetime(2026, 1, 1), catchup=False)
def etl_with_qarion():
run_etl_task = run_etl()
refresh_task = register_refresh("warehouse-prod", "product-uuid")
dq_task = run_quality_checks("warehouse-prod")

run_etl_task >> refresh_task >> dq_task

etl_with_qarion()

Quality Checks

Qarion supports three approaches to running data quality checks from Airflow — each suited to different pipeline architectures. Choose the one that fits your setup:

ApproachWhen to use
API-DrivenChecks are defined in the Qarion UI; Airflow triggers them
Config FileChecks are defined as YAML in your repo; Airflow applies and runs them
External ResultsYour pipeline runs its own validation; Airflow pushes results to Qarion

Pattern 1: API-Driven Quality Checks

Use this when your quality checks are already defined in the Qarion platform (through the UI or SDK) and you want Airflow to fetch their configuration and trigger execution after your ETL completes.

from airflow.decorators import dag, task
from datetime import datetime

@dag(
schedule="0 6 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["data-quality", "qarion"],
)
def qarion_api_driven_dq():
"""Fetch quality checks from Qarion and run them after ETL."""

@task()
def run_quality_checks(space_slug: str) -> dict:
from qarion import QarionSyncClient
from airflow.models import Variable

client = QarionSyncClient(api_key=Variable.get("qarion_api_key"))

# Fetch all active checks for this space
checks = client.quality.list(space_slug)

results = {"passed": [], "failed": [], "errors": []}

for check in checks:
try:
execution = client.quality.trigger(
space_slug,
check.slug,
)
if execution.is_passed:
results["passed"].append(check.slug)
else:
results["failed"].append(check.slug)
except Exception as e:
results["errors"].append({"check": check.slug, "error": str(e)})

client.close()

if results["failed"]:
raise ValueError(
f"{len(results['failed'])} check(s) failed: "
f"{', '.join(results['failed'])}"
)

return results

run_quality_checks(space_slug="warehouse-prod")

qarion_api_driven_dq()

How it works: Airflow fetches the current check definitions from the Qarion API, triggers each check against your data source, and fails the task if any check doesn't pass. Because check definitions live in Qarion, data stewards can update thresholds, add new checks, or change schedules through the UI without touching the DAG code.

Tip: For product-scoped checks, use client.quality.list_for_product(space_slug, product_slug) to trigger only the checks linked to the product your DAG just refreshed.


Pattern 2: Config File Quality Checks

Use this when you define quality checks in a YAML configuration file alongside your transformation code. This is ideal for checks-as-code workflows where rules are version-controlled and reviewed through pull requests.

@task()
def apply_and_run_dq_config(config_path: str) -> dict:
"""Sync check definitions from YAML and execute them."""
from qarion import QarionSyncClient
from qarion.models.dq_config import DqConfig
from airflow.models import Variable

client = QarionSyncClient(api_key=Variable.get("qarion_api_key"))

# Parse the YAML config
config = DqConfig.from_yaml(config_path)

# Step 1: Validate — catch structural errors early
errors = client.quality.validate_config(config)
if errors:
raise ValueError(f"Config validation failed: {errors}")

# Step 2: Apply — upsert check definitions (idempotent)
summary = client.quality.apply_config(config)
print(f"Created: {len(summary['created'])}, "
f"Updated: {len(summary['updated'])}, "
f"Unchanged: {len(summary['unchanged'])}")

# Step 3: Run — execute all checks and record results
results = client.quality.run_config(config)

client.close()

failed = [r for r in results if not r.is_passed]
if failed:
raise ValueError(f"{len(failed)} quality check(s) failed")

return {"total": len(results), "passed": len(results) - len(failed)}

Your YAML config file (qarion-dq.yaml) lives in your repository:

version: "1.0"
space: warehouse-prod

defaults:
connector: warehouse-snowflake
schedule: "0 6 * * *"

checks:
- slug: orders-row-count
name: Orders Row Count
type: sql_metric
query: "SELECT COUNT(*) FROM analytics.orders"
product: orders-table
thresholds:
operator: gte
value: 1000

- slug: orders-freshness
name: Orders Freshness
type: freshness_check
product: orders-table
configuration:
field_name: updated_at
table_name: analytics.orders
max_age_hours: 24

- slug: users-email-not-null
name: Users Email Not Null
type: null_check
product: users-table
configuration:
field_name: email
table_name: analytics.users

See the full DQ Config reference for all supported check types, thresholds, and parameters.


Pattern 3: Push External Results

Use this when your pipeline already performs its own data validation (e.g., Great Expectations, dbt tests, custom scripts) and you want to register the results in Qarion for centralized visibility, alerting, and trend tracking.

There are two methods depending on whether the check already exists in Qarion:

Push to an existing check

If the check is already registered in Qarion (via UI or config file), use push_result:

@task()
def push_dbt_test_results(space_slug: str, dbt_results: dict):
"""Push dbt test results to Qarion as quality check executions."""
from qarion import QarionSyncClient
from airflow.models import Variable

client = QarionSyncClient(api_key=Variable.get("qarion_api_key"))

for test in dbt_results["results"]:
# Map dbt test name to a Qarion check slug
check_slug = test["unique_id"].replace(".", "-")

client.quality.push_result(
space_slug,
check_slug,
status="passed" if test["status"] == "pass" else "failed",
value=test.get("failures", 0),
metadata={
"source": "dbt",
"execution_time": test.get("execution_time"),
"compiled_sql": test.get("compiled_sql"),
},
)

client.close()

Register and push in one call

If the check might not exist yet, use register_external_check — it creates the check on first call and pushes the result in a single operation:

@task()
def register_custom_validation(space_slug: str, product_id: str):
"""Run custom validation and register results in Qarion."""
from qarion import QarionSyncClient
from airflow.models import Variable

client = QarionSyncClient(api_key=Variable.get("qarion_api_key"))

# Your custom validation logic
row_count = run_row_count_query()
is_valid = row_count >= 1000

# Upsert check + push result in a single call
execution = client.quality.register_external_check(
space_slug,
"airflow-orders-row-count",
status="passed" if is_valid else "failed",
value=float(row_count),
name="Airflow Orders Row Count",
description="Row count validated by Airflow DAG",
product_ids=[product_id],
)

client.close()

if not execution.is_passed:
raise ValueError(f"Validation failed: row_count={row_count}")

register_external_check is idempotent — calling it with the same slug always pushes a new execution result. It creates the check definition only on the first call.


Blocking vs Non-Blocking Quality Checks

Not every quality check failure should stop your pipeline. Qarion supports two patterns for wiring checks into your DAG:

ModeBehaviorWhen to use
BlockingTask fails → downstream tasks are skippedCritical checks (row count, schema, freshness)
Non-blockingResults are recorded but the pipeline continuesAdvisory checks (distribution drift, naming conventions)

Blocking Checks (Pipeline-Stopping)

A blocking check raises an exception when it fails, which makes the Airflow task fail and prevents downstream tasks from running:

@task()
def run_blocking_checks(space_slug: str, product_slug: str):
"""Run critical quality checks — fail the pipeline if any check fails."""
from qarion import QarionSyncClient
from airflow.models import Variable

client = QarionSyncClient(api_key=Variable.get("qarion_api_key"))

checks = client.quality.list_for_product(space_slug, product_slug)
failed = []

for check in checks:
execution = client.quality.trigger(space_slug, check.slug)
if not execution.is_passed:
failed.append(f"{check.slug} (value={execution.value})")

client.close()

if failed:
raise ValueError(
f"Blocking quality gate failed — {len(failed)} check(s): "
+ ", ".join(failed)
)

Non-Blocking Checks (Warning-Only)

A non-blocking check records results in Qarion for visibility and alerting, but never raises an exception — the pipeline always continues:

@task()
def run_non_blocking_checks(space_slug: str, product_slug: str) -> dict:
"""Run advisory quality checks — log warnings but don't fail the pipeline."""
from qarion import QarionSyncClient
from airflow.models import Variable
import logging

log = logging.getLogger(__name__)
client = QarionSyncClient(api_key=Variable.get("qarion_api_key"))

checks = client.quality.list_for_product(space_slug, product_slug)
results = {"passed": [], "warned": []}

for check in checks:
try:
execution = client.quality.trigger(space_slug, check.slug)
if execution.is_passed:
results["passed"].append(check.slug)
else:
results["warned"].append(check.slug)
log.warning(
"Non-blocking check '%s' did not pass (value=%s)",
check.slug,
execution.value,
)
except Exception as e:
log.error("Check '%s' errored: %s", check.slug, e)
results["warned"].append(check.slug)

client.close()
return results

Quality Gate with Threshold

Instead of checking individual checks, use check_quality_gate to evaluate the overall pass rate for a product. This is useful when you want to allow a small number of advisory checks to fail while still enforcing an overall quality bar:

@task()
def evaluate_quality_gate(space_slug: str, product_slug: str, threshold: float = 90.0):
"""Fail the pipeline if the product's quality score drops below the threshold."""
from qarion import QarionSyncClient
from airflow.models import Variable

client = QarionSyncClient(api_key=Variable.get("qarion_api_key"))

gate = client.integrations.check_quality_gate(
space_slug,
product_slug,
threshold=threshold,
)

client.close()

print(f"Quality gate: {gate['gate']} "
f"(score={gate['score']:.1f}%, threshold={gate['threshold']}%, "
f"checks={gate['checks_passed']}/{gate['checks_total']})")

if gate["gate"] != "pass":
raise ValueError(
f"Quality gate failed: score {gate['score']:.1f}% "
f"< threshold {gate['threshold']}%"
)

Quality Gate Sensor

The qarion-airflow package includes a QarionQualityGateSensor that polls the quality gate until it passes. This is useful when quality checks are run asynchronously (e.g., by another DAG or an external system) and your pipeline needs to wait for them to complete:

from qarion_airflow.sensors import QarionQualityGateSensor

wait_for_quality = QarionQualityGateSensor(
task_id="wait_for_quality_gate",
space_slug="warehouse-prod",
product_slug="orders-table",
threshold=100.0,
poke_interval=60, # check every 60 seconds
timeout=600, # give up after 10 minutes
mode="reschedule", # free up the worker slot between pokes
)

Combining Blocking and Non-Blocking in a DAG

Wire both check types into a single DAG using Airflow's >> dependency syntax. Blocking checks gate the pipeline; non-blocking checks run in parallel but never stop downstream tasks:

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule="0 6 * * *", start_date=datetime(2026, 1, 1), catchup=False)
def etl_with_quality_gates():
etl = run_etl()
refresh = register_refresh("warehouse-prod", "orders-table")

blocking = run_blocking_checks("warehouse-prod", "orders-table")
non_blocking = run_non_blocking_checks("warehouse-prod", "orders-table")

publish = publish_to_downstream()

# Blocking checks gate the pipeline — publish only runs if they pass.
# Non-blocking checks run after refresh but don't affect publish.
etl >> refresh >> blocking >> publish
refresh >> non_blocking

etl_with_quality_gates()

Tip: Tag your checks in Qarion with blocking or non-blocking labels, then filter by tag in your task to dynamically decide which checks stop the pipeline.


PII Anonymization and Restricted Access

Qarion's metadata and governance APIs let you automate the classification of sensitive fields and enforce access restrictions directly from your Airflow DAGs. This is useful for pipelines that discover or produce datasets containing personally identifiable information (PII).

Classify Sensitive Fields

Use the Metadata API to tag individual fields with a sensitivity level (e.g., PII, Confidential, Internal) and the Tags API to apply product-level sensitivity labels.

@task()
def classify_pii_fields(space_slug: str, product_slug: str, pii_columns: list[str]):
"""Tag PII fields with sensitivity classification and apply product-level tags."""
import requests
from airflow.models import Variable

api_base = Variable.get("qarion_api_url")
headers = {"Authorization": f"Bearer {Variable.get('qarion_api_key')}"}

# Step 1: Resolve the product and its fields
product = requests.get(
f"{api_base}/catalog/spaces/{space_slug}/products/by-slug/{product_slug}",
headers=headers,
).json()

fields = requests.get(
f"{api_base}/catalog/spaces/{space_slug}/products/{product['id']}/fields",
headers=headers,
).json()

# Step 2: Set field-level sensitivity for each PII column
for field in fields:
if field["name"] in pii_columns:
requests.patch(
f"{api_base}/fields/{field['id']}/sensitivity",
headers=headers,
params={"sensitivity": "PII"},
)
print(f" Classified {field['name']} as PII")

# Step 3: Tag the product as containing PII
requests.post(
f"{api_base}/catalog/spaces/{space_slug}/products/{product['id']}/tags",
headers=headers,
json={"slug": "pii"},
)
print(f"Tagged {product_slug} with 'pii' tag")

Example: Auto-Detect and Classify PII

Combine column-name heuristics with Qarion classification in a single task:

PII_PATTERNS = {
"email", "phone", "ssn", "social_security", "date_of_birth",
"address", "passport", "national_id", "credit_card", "ip_address",
}

@task()
def auto_classify_pii(space_slug: str, product_slug: str):
"""Auto-detect PII columns by name pattern and classify them."""
from qarion import QarionSyncClient
from airflow.models import Variable
import requests

api_base = Variable.get("qarion_api_url")
headers = {"Authorization": f"Bearer {Variable.get('qarion_api_key')}"}

# Get product fields
product = requests.get(
f"{api_base}/catalog/spaces/{space_slug}/products/by-slug/{product_slug}",
headers=headers,
).json()

fields = requests.get(
f"{api_base}/catalog/spaces/{space_slug}/products/{product['id']}/fields",
headers=headers,
).json()

# Detect PII columns by name
pii_fields = [
f for f in fields
if any(pattern in f["name"].lower() for pattern in PII_PATTERNS)
]

# Classify detected fields
for field in pii_fields:
requests.patch(
f"{api_base}/fields/{field['id']}/sensitivity",
headers=headers,
params={"sensitivity": "PII"},
)

print(f"Auto-classified {len(pii_fields)} PII field(s) in {product_slug}")
return [f["name"] for f in pii_fields]

Define Restricted Access

Use permission rules to restrict who can access PII-tagged products. Combined with the access request workflow, this ensures that PII data requires explicit approval:

@task()
def enforce_pii_access_policy(space_slug: str):
"""Ensure PII-tagged products require explicit access approval."""
import requests
from airflow.models import Variable

api_base = Variable.get("qarion_api_url")
headers = {"Authorization": f"Bearer {Variable.get('qarion_api_key')}"}

# Get all products tagged as PII
products = requests.get(
f"{api_base}/catalog/spaces/{space_slug}/products",
headers=headers,
params={"tags": "pii"},
).json()

for product in products.get("items", []):
# Set product sensitivity to ensure it's classified
requests.patch(
f"{api_base}/products/{product['id']}/sensitivity",
headers=headers,
params={"sensitivity": "Confidential"},
)

# Set high criticality to surface it in governance dashboards
requests.patch(
f"{api_base}/products/{product['id']}/criticality",
headers=headers,
params={"criticality": "Critical"},
)

print(f"Enforced access policy on {len(products.get('items', []))} PII products")

Tip: Combine this with the access automation tutorial to build a complete workflow where PII products are auto-classified, access is restricted, and requests are routed to the data owner for approval.


Full Example DAG

Here's a complete DAG that combines all patterns — product registration, refresh tracking, blocking and non-blocking quality checks, and PII classification — into a single pipeline:

from airflow.decorators import dag, task
from datetime import datetime

@dag(
dag_id="qarion_full_integration",
schedule="0 6 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["etl", "data-quality", "qarion"],
)
def qarion_full_integration():
"""Complete Airflow + Qarion integration pipeline."""

SPACE = "warehouse-prod"
PRODUCT_SLUG = "orders-table"

@task()
def run_etl():
"""Your ETL logic here."""
print("Running ETL pipeline...")
return {"status": "success", "rows_processed": 50000}

@task()
def upsert_product():
"""Register or update the product definition in the catalog."""
from qarion import QarionSyncClient
from airflow.models import Variable

client = QarionSyncClient(api_key=Variable.get("qarion_api_key"))

product = client.products.register_product(
SPACE,
name="Orders Table",
slug=PRODUCT_SLUG,
description_markdown="Aggregated order data refreshed daily by Airflow.",
product_type="Table",
tags=["airflow-managed", "finance"],
)

client.close()
return str(product.id)

@task()
def register_refresh_event(etl_result: dict, product_id: str):
"""Record a successful refresh in the catalog."""
from qarion import QarionSyncClient
from airflow.models import Variable

client = QarionSyncClient(api_key=Variable.get("qarion_api_key"))

client.products.register_refresh(
SPACE, product_id, trigger_source="airflow", status="success",
)
client.close()

@task()
def blocking_quality_gate(product_id: str):
"""Run critical checks — fail the pipeline on any failure."""
from qarion import QarionSyncClient
from airflow.models import Variable

client = QarionSyncClient(api_key=Variable.get("qarion_api_key"))

gate = client.integrations.check_quality_gate(
SPACE, PRODUCT_SLUG, threshold=100.0,
)
client.close()

if gate["gate"] != "pass":
raise ValueError(
f"Blocking quality gate failed: "
f"score {gate['score']:.1f}% < threshold {gate['threshold']}%"
)

@task()
def non_blocking_quality_checks():
"""Run advisory checks — log warnings but don't fail."""
from qarion import QarionSyncClient
from airflow.models import Variable
import logging

log = logging.getLogger(__name__)
client = QarionSyncClient(api_key=Variable.get("qarion_api_key"))

checks = client.quality.list_for_product(SPACE, PRODUCT_SLUG)
for check in checks:
try:
execution = client.quality.trigger(SPACE, check.slug)
if not execution.is_passed:
log.warning("Advisory check '%s' did not pass", check.slug)
except Exception as e:
log.error("Check '%s' errored: %s", check.slug, e)

client.close()

@task()
def classify_sensitive_fields(product_id: str):
"""Auto-classify PII fields by column name."""
import requests
from airflow.models import Variable

api_base = Variable.get("qarion_api_url")
headers = {"Authorization": f"Bearer {Variable.get('qarion_api_key')}"}
pii_patterns = {"email", "phone", "ssn", "address", "date_of_birth"}

fields = requests.get(
f"{api_base}/catalog/spaces/{SPACE}/products/{product_id}/fields",
headers=headers,
).json()

for field in fields:
if any(p in field["name"].lower() for p in pii_patterns):
requests.patch(
f"{api_base}/fields/{field['id']}/sensitivity",
headers=headers,
params={"sensitivity": "PII"},
)

# DAG dependency chain
product_id = upsert_product()
etl_result = run_etl()
refresh = register_refresh_event(etl_result, product_id)

blocking = blocking_quality_gate(product_id)
non_blocking = non_blocking_quality_checks()
classify = classify_sensitive_fields(product_id)

# Blocking gate must pass before PII classification.
# Non-blocking checks run independently.
refresh >> blocking >> classify
refresh >> non_blocking

qarion_full_integration()