Skip to main content

Risk Remediation Guide

This guide covers the API and SDK patterns for creating risk assessments with structured mitigation actions, tracking remediation progress, and integrating risk gates into CI/CD pipelines.

Structured Mitigations

Risk assessments now support structured mitigation actions stored as JSONB. Each action has an owner, deadline, status, priority, and evidence references.

Creating an Assessment with Mitigations

curl -X POST \
"${API_URL}/catalog/spaces/${SPACE}/products/${PRODUCT_ID}/risk-assessments/" \
-H "Authorization: Bearer ${TOKEN}" \
-H "Content-Type: application/json" \
-d '{
"assessment_type": "initial",
"lifecycle_stage": "design",
"evaluations": [
{
"category": "bias",
"severity": 4,
"likelihood": 3,
"impact": 4,
"notes": "Bias detected in training data distribution"
},
{
"category": "robustness",
"severity": 3,
"likelihood": 2,
"impact": 3,
"notes": "Adversarial testing not yet performed"
}
],
"summary": "Initial risk assessment for recommendation model",
"mitigations": [
{
"title": "Retrain with balanced dataset",
"category": "retraining",
"description": "Apply SMOTE oversampling and re-evaluate fairness metrics",
"owner_id": "USER_UUID",
"owner_name": "Jane Smith",
"deadline": "2026-03-15",
"priority": "high"
},
{
"title": "Add adversarial robustness testing",
"category": "robustness_testing",
"description": "Implement FGSM and PGD attacks in test suite",
"owner_id": "USER_UUID",
"owner_name": "Alex Chen",
"deadline": "2026-04-01",
"priority": "medium"
}
]
}'

Each mitigation action auto-receives a unique id, created_at, and updated_at timestamp.

Mitigation Action Schema

FieldTypeRequiredDescription
titlestringShort description
categorystringretraining, bias_mitigation, robustness_testing, privacy_enhancement, explainability, monitoring_improvement, process_change, other
descriptionstringDetailed action description
owner_idUUIDResponsible user
owner_namestringDisplay name (denormalised)
deadlinestringISO 8601 date
statusstringpending (default), in_progress, completed, cancelled
prioritystringlow, medium (default), high, critical
evidencestring[]Free-form references
linked_issue_idUUIDLinked remediation ticket

Updating Individual Mitigations

Update a single mitigation action without modifying the full assessment:

# PATCH .../risk-assessments/{assessment_id}/mitigations/{mitigation_id}

curl -X PATCH \
"${API_URL}/catalog/spaces/${SPACE}/products/${PRODUCT_ID}/risk-assessments/${ASSESSMENT_ID}/mitigations/${MITIGATION_ID}" \
-H "Authorization: Bearer ${TOKEN}" \
-H "Content-Type: application/json" \
-d '{
"status": "in_progress",
"evidence": [
"Fairness report v2: demographic parity delta reduced from 0.15 to 0.04",
"PR #142: SMOTE resampling implementation"
],
"linked_issue_id": "ISSUE_UUID"
}'

Status Transitions

pending → in_progress → completed
→ cancelled

SDK Usage

Creating Assessments with Mitigations

from qarion import QarionClient

client = QarionClient(api_key="...")

assessment = client.risk_assessments.create(
space="my-space",
product_id="product-uuid",
data={
"assessment_type": "pre_deployment",
"evaluations": [
{
"category": "bias",
"severity": 3,
"likelihood": 2,
"impact": 4,
},
],
"mitigations": [
{
"title": "Retrain with balanced dataset",
"category": "retraining",
"owner_name": "Jane Smith",
"deadline": "2026-03-15",
"priority": "high",
},
],
},
)

# Access mitigation IDs for subsequent updates
for mitigation in assessment.mitigations:
print(f"{mitigation['id']}: {mitigation['title']} [{mitigation['status']}]")

Updating Mitigation Status

# Mark a mitigation as completed with evidence
client.risk_assessments.update_mitigation(
space="my-space",
product_id="product-uuid",
assessment_id="assessment-uuid",
mitigation_id="mitigation-uuid",
data={
"status": "completed",
"evidence": [
"Fairness audit passed: demographic_parity_delta=0.02",
"Model retrained on balanced dataset v3",
],
},
)

CI/CD Integration

Risk Gate in GitHub Actions

Use risk assessments as deployment gates — block deployments if open mitigations exist for high-risk items:

name: Risk Gate Check

on:
push:
branches: [main]

jobs:
risk-gate:
runs-on: ubuntu-latest
steps:
- name: Check open mitigations
env:
API_URL: ${{ secrets.QARION_API_URL }}
TOKEN: ${{ secrets.QARION_TOKEN }}
run: |
ASSESSMENT=$(curl -s -H "Authorization: Bearer $TOKEN" \
"$API_URL/catalog/spaces/$SPACE/products/$PRODUCT_ID/risk-assessments/?limit=1")

# Parse latest assessment mitigations
OPEN_COUNT=$(echo "$ASSESSMENT" | \
jq '.[0].mitigations // [] | map(select(.status != "completed" and .status != "cancelled")) | length')

RISK_TIER=$(echo "$ASSESSMENT" | jq -r '.[0].risk_tier')

if [ "$RISK_TIER" = "high" ] || [ "$RISK_TIER" = "unacceptable" ]; then
if [ "$OPEN_COUNT" -gt 0 ]; then
echo "❌ BLOCKED: $OPEN_COUNT open mitigations on $RISK_TIER risk system"
exit 1
fi
fi

echo "✅ Risk gate passed"

Airflow Remediation Pipeline

Trigger automated remediation workflows from monitoring alerts:

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule="@daily", start_date=datetime(2026, 1, 1))
def remediation_pipeline():

@task
def check_drift_alerts():
"""Check for unresolved drift alerts."""
import requests

response = requests.get(
f"{API_URL}/quality/spaces/{SPACE}/alerts",
headers={"Authorization": f"Bearer {TOKEN}"},
params={"severity": "critical,warning", "status": "open"},
)
return response.json()

@task
def create_remediation_assessment(alerts: list):
"""Auto-create assessment with mitigations for drift alerts."""
if not alerts:
return None

import requests

mitigations = []
for alert in alerts:
mitigations.append({
"title": f"Investigate: {alert['title']}",
"category": "monitoring_improvement",
"priority": "high"
if alert["severity"] == "critical"
else "medium",
})

response = requests.post(
f"{API_URL}/catalog/spaces/{SPACE}/products/{PRODUCT_ID}"
f"/risk-assessments/",
headers={"Authorization": f"Bearer {TOKEN}"},
json={
"assessment_type": "incident",
"evaluations": [{
"category": "monitoring",
"severity": 4,
"likelihood": 3,
"impact": 3,
}],
"mitigations": mitigations,
},
)
return response.json()

alerts = check_drift_alerts()
create_remediation_assessment(alerts)

remediation_pipeline()

Automation Configuration

Quality checks support an automation_config JSONB field that enables fully automated alert response.

Config Schema

{
"auto_create_ticket": true,
"auto_risk_reassessment": true,
"reassessment_severity_threshold": "high",
"remediation_workflow_id": "uuid-of-workflow-definition"
}
FieldTypeDefaultDescription
auto_create_ticketbooleanfalseAuto-create remediation ticket on alert
auto_risk_reassessmentbooleanfalseAuto-create incident risk assessment
reassessment_severity_thresholdstring"high"Minimum severity: warning, high, critical
remediation_workflow_idUUIDnullWorkflow to trigger for approval routing

Setting via API

# Update a quality check's automation config
curl -X PATCH \
"${API_URL}/quality/spaces/${SPACE}/checks/${CHECK_ID}" \
-H "Authorization: Bearer ${TOKEN}" \
-H "Content-Type: application/json" \
-d '{
"automation_config": {
"auto_create_ticket": true,
"auto_risk_reassessment": true,
"reassessment_severity_threshold": "warning",
"remediation_workflow_id": null
}
}'

YAML Configuration (CLI)

# quality-checks.yaml
checks:
- name: model_drift_detection
type: sql_metric
configuration:
sql: "SELECT drift_score FROM model_metrics WHERE model_id = '...'"
threshold: 0.1
automation_config:
auto_create_ticket: true
auto_risk_reassessment: true
reassessment_severity_threshold: high

Pipeline Flow

Check Execution
→ FAIL/ERROR status
→ DQAlert created
→ Stakeholder notifications
→ process_automated_response()
├── auto_create_ticket? → AlertTicketService
├── auto_risk_reassessment? → RiskAssessmentService (incident type)
└── remediation_workflow_id? → WorkflowExecutionService

Note: Automated responses are intentionally non-blocking. If any automation step fails, it is logged but does not prevent subsequent steps or the core alert pipeline from completing.