Skip to main content

Alerting Tasks

A comprehensive guide to using alerting tasks in Qarion ETL flows.

Overview

Alerting tasks allow you to send notifications via various channels (email, webhook, Slack, Microsoft Teams, etc.) when certain conditions are met during flow execution. This enables proactive monitoring and notification of important events in your data pipelines.

Quick Start

Basic Email Alert

[[tasks]]
id = "send_alert"
type = "alert"
name = "Send Completion Alert"
description = "Notify team when flow completes"

[tasks.properties]
alert_type = "email"
recipients = ["team@example.com"]
subject = "Flow {{ flow_id }} Completed"
message = "Flow {{ flow_id }} completed successfully at {{ execution_date }}"

Alert with Condition

[[tasks]]
id = "error_alert"
type = "alert"
name = "Error Alert"
description = "Alert on errors"

[tasks.properties]
alert_type = "email"
recipients = ["ops@example.com"]
subject = "Flow Error: {{ flow_id }}"
message = "Flow {{ flow_id }} encountered an error"
condition = { type = "on_error" }

Alert Types

Email Alerts

Send alerts via email using SMTP.

Configuration:

[[tasks]]
id = "email_alert"
type = "alert"

[tasks.properties]
alert_type = "email"
recipients = ["user1@example.com", "user2@example.com"]
subject = "Data Pipeline Alert"
message = "Your data pipeline has completed"

[tasks.properties.alert_config]
smtp_host = "smtp.example.com"
smtp_port = 587
smtp_user = "alerts@example.com"
smtp_password = "${credential:smtp_password}"
smtp_use_tls = true
from_address = "alerts@example.com"

Configuration Options:

  • smtp_host: SMTP server hostname (default: localhost)
  • smtp_port: SMTP server port (default: 25)
  • smtp_user: SMTP username (optional)
  • smtp_password: SMTP password (optional, use credential references)
  • smtp_use_tls: Use TLS encryption (default: false)
  • from_address: From email address (required)

Webhook Alerts

Send alerts via HTTP POST to webhook URLs.

Configuration:

[[tasks]]
id = "webhook_alert"
type = "alert"

[tasks.properties]
alert_type = "webhook"
recipients = ["https://api.example.com/webhook"]
subject = "Pipeline Alert"
message = "Flow completed"

[tasks.properties.alert_config]
method = "POST"
headers = { "Authorization" = "Bearer ${credential:webhook_token}" }
timeout = 10
payload = { "source" = "qarion-etl", "priority" = "high" }

Configuration Options:

  • method: HTTP method (default: POST)
  • headers: Custom HTTP headers (default: Content-Type: application/json)
  • timeout: Request timeout in seconds (default: 10)
  • payload: Additional payload data to include

Slack Alerts

Send alerts to Slack channels via webhook.

Configuration:

[[tasks]]
id = "slack_alert"
type = "alert"

[tasks.properties]
alert_type = "slack"
recipients = ["https://hooks.slack.com/services/YOUR/WEBHOOK/URL"]
subject = "Pipeline Alert"
message = "Flow completed successfully"

[tasks.properties.alert_config]
channel = "#alerts"
username = "Qarion ETL Bot"
icon_emoji = ":bell:"

Configuration Options:

  • channel: Slack channel (default: #general)
  • username: Bot username (default: Qarion ETL)
  • icon_emoji: Bot icon emoji (default: :bell:)

Microsoft Teams Alerts

Send alerts to Microsoft Teams channels via webhook.

Configuration:

[[tasks]]
id = "teams_alert"
type = "alert"

[tasks.properties]
alert_type = "teams"
recipients = ["https://outlook.office.com/webhook/YOUR/WEBHOOK/URL"]
subject = "Pipeline Alert"
message = "Flow completed successfully"

[tasks.properties.alert_config]
theme_color = "0078D4"

Configuration Options:

  • theme_color: Theme color in hex format (default: 0078D4)

Alert Conditions

Alerts can be configured to only send when certain conditions are met.

Always Send

[tasks.properties]
condition = { type = "always" }

Send on Error

[tasks.properties]
condition = { type = "on_error" }

Send Based on Row Count

[tasks.properties]
condition = {
type = "row_count"
dataset_id = "orders"
operator = ">"
threshold = 1000
}

Operators:

  • >: Greater than
  • <: Less than
  • ==: Equal to
  • >=: Greater than or equal
  • <=: Less than or equal

Custom Condition

[tasks.properties]
condition = { type = "custom" }

Message Templating

Alert messages support Jinja2-style templating with context variables:

Available Variables:

  • flow_id: Current flow ID
  • batch_id: Current batch ID
  • execution_date: Execution date/time
  • node_id: Current node/task ID
  • datasets: List of datasets in the flow

Example:

[tasks.properties]
subject = "Flow {{ flow_id }} - Batch {{ batch_id }}"
message = """
Flow {{ flow_id }} completed at {{ execution_date }}.

Batch ID: {{ batch_id }}
Node: {{ node_id }}
"""

Examples

Example 1: Success Notification

[[tasks]]
id = "success_notification"
type = "alert"
name = "Success Notification"
description = "Notify team on successful completion"

[tasks.properties]
alert_type = "email"
recipients = ["team@example.com"]
subject = "✅ Flow {{ flow_id }} Completed Successfully"
message = """
Flow {{ flow_id }} completed successfully at {{ execution_date }}.

Batch ID: {{ batch_id }}
"""

[tasks.properties.alert_config]
smtp_host = "smtp.example.com"
smtp_port = 587
smtp_use_tls = true
from_address = "noreply@example.com"

Example 2: Error Alert with Slack

[[tasks]]
id = "error_slack_alert"
type = "alert"
name = "Error Slack Alert"
description = "Send Slack alert on errors"

[tasks.properties]
alert_type = "slack"
recipients = ["${credential:slack_webhook_url}"]
subject = "🚨 Flow Error"
message = "Flow {{ flow_id }} encountered an error at {{ execution_date }}"
condition = { type = "on_error" }

[tasks.properties.alert_config]
channel = "#alerts"
username = "Qarion ETL Alert Bot"

Example 3: Data Quality Alert

[[tasks]]
id = "dq_alert"
type = "alert"
name = "Data Quality Alert"
description = "Alert when data quality issues detected"

[tasks.properties]
alert_type = "email"
recipients = ["dq-team@example.com"]
subject = "Data Quality Issue Detected"
message = "Data quality check failed for flow {{ flow_id }}"
condition = {
type = "row_count"
dataset_id = "quality_results"
operator = ">"
threshold = 0
}

[tasks.properties.alert_config]
smtp_host = "smtp.example.com"
from_address = "dq-alerts@example.com"

Example 4: Webhook Alert with Custom Payload

[[tasks]]
id = "webhook_notification"
type = "alert"
name = "Webhook Notification"

[tasks.properties]
alert_type = "webhook"
recipients = ["https://api.example.com/notifications"]
subject = "Pipeline Event"
message = "Flow {{ flow_id }} completed"

[tasks.properties.alert_config]
method = "POST"
headers = {
"Authorization" = "Bearer ${credential:api_token}"
"X-Source" = "qarion-etl"
}
payload = {
"flow_id" = "{{ flow_id }}"
"batch_id" = "{{ batch_id }}"
"status" = "completed"
"timestamp" = "{{ execution_date }}"
}

Best Practices

  1. Use Credential References:

    smtp_password = "${credential:smtp_password}"

    Never hardcode passwords or API keys in flow definitions.

  2. Use Descriptive Subjects:

    subject = "✅ Flow {{ flow_id }} Completed Successfully"

    Include emojis or prefixes to make alerts easily identifiable.

  3. Include Context in Messages:

    message = """
    Flow: {{ flow_id }}
    Batch: {{ batch_id }}
    Time: {{ execution_date }}
    Status: Success
    """
  4. Use Conditions Appropriately:

    • Use on_error for error alerts
    • Use row_count for data-driven alerts
    • Use always for informational alerts
  5. Test Alert Configuration:

    • Test with a small test flow first
    • Verify all recipients receive alerts
    • Check message formatting

Troubleshooting

Alert Not Sending

  1. Check Recipients:

    recipients = ["valid@example.com"]  # Must not be empty
  2. Verify Configuration:

    • Check SMTP/webhook credentials
    • Verify network connectivity
    • Check firewall rules
  3. Check Condition:

    • Ensure condition is met (if specified)
    • Use always condition for testing
  4. Check Logs:

    # Look for alert execution messages
    qarion-etl run --flow-id my_flow --verbose

Email Not Received

  1. Check SMTP Configuration:

    • Verify SMTP host and port
    • Check TLS/SSL settings
    • Verify credentials
  2. Check Spam Folder:

    • Email alerts may be filtered as spam
    • Use proper from_address
  3. Test SMTP Connection:

    import smtplib
    server = smtplib.SMTP('smtp.example.com', 587)
    server.starttls()
    server.login('user', 'password')

Webhook Not Working

  1. Verify URL:

    • Check webhook URL is correct
    • Ensure URL is accessible
  2. Check Headers:

    • Verify authentication headers
    • Check Content-Type
  3. Test Manually:

    curl -X POST https://api.example.com/webhook \
    -H "Content-Type: application/json" \
    -d '{"subject":"Test","message":"Test"}'