Alerting Tasks
A comprehensive guide to using alerting tasks in Qarion ETL flows.
Overview
Alerting tasks allow you to send notifications via various channels (email, webhook, Slack, Microsoft Teams, etc.) when certain conditions are met during flow execution. This enables proactive monitoring and notification of important events in your data pipelines.
Quick Start
Basic Email Alert
[[tasks]]
id = "send_alert"
type = "alert"
name = "Send Completion Alert"
description = "Notify team when flow completes"
[tasks.properties]
alert_type = "email"
recipients = ["team@example.com"]
subject = "Flow {{ flow_id }} Completed"
message = "Flow {{ flow_id }} completed successfully at {{ execution_date }}"
Alert with Condition
[[tasks]]
id = "error_alert"
type = "alert"
name = "Error Alert"
description = "Alert on errors"
[tasks.properties]
alert_type = "email"
recipients = ["ops@example.com"]
subject = "Flow Error: {{ flow_id }}"
message = "Flow {{ flow_id }} encountered an error"
condition = { type = "on_error" }
Alert Types
Email Alerts
Send alerts via email using SMTP.
Configuration:
[[tasks]]
id = "email_alert"
type = "alert"
[tasks.properties]
alert_type = "email"
recipients = ["user1@example.com", "user2@example.com"]
subject = "Data Pipeline Alert"
message = "Your data pipeline has completed"
[tasks.properties.alert_config]
smtp_host = "smtp.example.com"
smtp_port = 587
smtp_user = "alerts@example.com"
smtp_password = "${credential:smtp_password}"
smtp_use_tls = true
from_address = "alerts@example.com"
Configuration Options:
smtp_host: SMTP server hostname (default:localhost)smtp_port: SMTP server port (default:25)smtp_user: SMTP username (optional)smtp_password: SMTP password (optional, use credential references)smtp_use_tls: Use TLS encryption (default:false)from_address: From email address (required)
Webhook Alerts
Send alerts via HTTP POST to webhook URLs.
Configuration:
[[tasks]]
id = "webhook_alert"
type = "alert"
[tasks.properties]
alert_type = "webhook"
recipients = ["https://api.example.com/webhook"]
subject = "Pipeline Alert"
message = "Flow completed"
[tasks.properties.alert_config]
method = "POST"
headers = { "Authorization" = "Bearer ${credential:webhook_token}" }
timeout = 10
payload = { "source" = "qarion-etl", "priority" = "high" }
Configuration Options:
method: HTTP method (default:POST)headers: Custom HTTP headers (default:Content-Type: application/json)timeout: Request timeout in seconds (default:10)payload: Additional payload data to include
Slack Alerts
Send alerts to Slack channels via webhook.
Configuration:
[[tasks]]
id = "slack_alert"
type = "alert"
[tasks.properties]
alert_type = "slack"
recipients = ["https://hooks.slack.com/services/YOUR/WEBHOOK/URL"]
subject = "Pipeline Alert"
message = "Flow completed successfully"
[tasks.properties.alert_config]
channel = "#alerts"
username = "Qarion ETL Bot"
icon_emoji = ":bell:"
Configuration Options:
channel: Slack channel (default:#general)username: Bot username (default:Qarion ETL)icon_emoji: Bot icon emoji (default::bell:)
Microsoft Teams Alerts
Send alerts to Microsoft Teams channels via webhook.
Configuration:
[[tasks]]
id = "teams_alert"
type = "alert"
[tasks.properties]
alert_type = "teams"
recipients = ["https://outlook.office.com/webhook/YOUR/WEBHOOK/URL"]
subject = "Pipeline Alert"
message = "Flow completed successfully"
[tasks.properties.alert_config]
theme_color = "0078D4"
Configuration Options:
theme_color: Theme color in hex format (default:0078D4)
Alert Conditions
Alerts can be configured to only send when certain conditions are met.
Always Send
[tasks.properties]
condition = { type = "always" }
Send on Error
[tasks.properties]
condition = { type = "on_error" }
Send Based on Row Count
[tasks.properties]
condition = {
type = "row_count"
dataset_id = "orders"
operator = ">"
threshold = 1000
}
Operators:
>: Greater than<: Less than==: Equal to>=: Greater than or equal<=: Less than or equal
Custom Condition
[tasks.properties]
condition = { type = "custom" }
Message Templating
Alert messages support Jinja2-style templating with context variables:
Available Variables:
flow_id: Current flow IDbatch_id: Current batch IDexecution_date: Execution date/timenode_id: Current node/task IDdatasets: List of datasets in the flow
Example:
[tasks.properties]
subject = "Flow {{ flow_id }} - Batch {{ batch_id }}"
message = """
Flow {{ flow_id }} completed at {{ execution_date }}.
Batch ID: {{ batch_id }}
Node: {{ node_id }}
"""
Examples
Example 1: Success Notification
[[tasks]]
id = "success_notification"
type = "alert"
name = "Success Notification"
description = "Notify team on successful completion"
[tasks.properties]
alert_type = "email"
recipients = ["team@example.com"]
subject = "✅ Flow {{ flow_id }} Completed Successfully"
message = """
Flow {{ flow_id }} completed successfully at {{ execution_date }}.
Batch ID: {{ batch_id }}
"""
[tasks.properties.alert_config]
smtp_host = "smtp.example.com"
smtp_port = 587
smtp_use_tls = true
from_address = "noreply@example.com"
Example 2: Error Alert with Slack
[[tasks]]
id = "error_slack_alert"
type = "alert"
name = "Error Slack Alert"
description = "Send Slack alert on errors"
[tasks.properties]
alert_type = "slack"
recipients = ["${credential:slack_webhook_url}"]
subject = "🚨 Flow Error"
message = "Flow {{ flow_id }} encountered an error at {{ execution_date }}"
condition = { type = "on_error" }
[tasks.properties.alert_config]
channel = "#alerts"
username = "Qarion ETL Alert Bot"
Example 3: Data Quality Alert
[[tasks]]
id = "dq_alert"
type = "alert"
name = "Data Quality Alert"
description = "Alert when data quality issues detected"
[tasks.properties]
alert_type = "email"
recipients = ["dq-team@example.com"]
subject = "Data Quality Issue Detected"
message = "Data quality check failed for flow {{ flow_id }}"
condition = {
type = "row_count"
dataset_id = "quality_results"
operator = ">"
threshold = 0
}
[tasks.properties.alert_config]
smtp_host = "smtp.example.com"
from_address = "dq-alerts@example.com"
Example 4: Webhook Alert with Custom Payload
[[tasks]]
id = "webhook_notification"
type = "alert"
name = "Webhook Notification"
[tasks.properties]
alert_type = "webhook"
recipients = ["https://api.example.com/notifications"]
subject = "Pipeline Event"
message = "Flow {{ flow_id }} completed"
[tasks.properties.alert_config]
method = "POST"
headers = {
"Authorization" = "Bearer ${credential:api_token}"
"X-Source" = "qarion-etl"
}
payload = {
"flow_id" = "{{ flow_id }}"
"batch_id" = "{{ batch_id }}"
"status" = "completed"
"timestamp" = "{{ execution_date }}"
}
Best Practices
-
Use Credential References:
smtp_password = "${credential:smtp_password}"Never hardcode passwords or API keys in flow definitions.
-
Use Descriptive Subjects:
subject = "✅ Flow {{ flow_id }} Completed Successfully"Include emojis or prefixes to make alerts easily identifiable.
-
Include Context in Messages:
message = """
Flow: {{ flow_id }}
Batch: {{ batch_id }}
Time: {{ execution_date }}
Status: Success
""" -
Use Conditions Appropriately:
- Use
on_errorfor error alerts - Use
row_countfor data-driven alerts - Use
alwaysfor informational alerts
- Use
-
Test Alert Configuration:
- Test with a small test flow first
- Verify all recipients receive alerts
- Check message formatting
Troubleshooting
Alert Not Sending
-
Check Recipients:
recipients = ["valid@example.com"] # Must not be empty -
Verify Configuration:
- Check SMTP/webhook credentials
- Verify network connectivity
- Check firewall rules
-
Check Condition:
- Ensure condition is met (if specified)
- Use
alwayscondition for testing
-
Check Logs:
# Look for alert execution messages
qarion-etl run --flow-id my_flow --verbose
Email Not Received
-
Check SMTP Configuration:
- Verify SMTP host and port
- Check TLS/SSL settings
- Verify credentials
-
Check Spam Folder:
- Email alerts may be filtered as spam
- Use proper
from_address
-
Test SMTP Connection:
import smtplib
server = smtplib.SMTP('smtp.example.com', 587)
server.starttls()
server.login('user', 'password')
Webhook Not Working
-
Verify URL:
- Check webhook URL is correct
- Ensure URL is accessible
-
Check Headers:
- Verify authentication headers
- Check Content-Type
-
Test Manually:
curl -X POST https://api.example.com/webhook \
-H "Content-Type: application/json" \
-d '{"subject":"Test","message":"Test"}'