Conditional Branching in Flows
Qarion ETL supports conditional branching in flows, allowing you to execute different tasks based on evaluated conditions. This enables dynamic flow execution patterns like if/else logic, switch statements, and conditional task execution.
Overview
Conditional branching allows flows to:
- Execute different task branches based on task results (success/failure)
- Branch based on XCom values from previous tasks
- Branch based on flow variables
- Branch based on data conditions (SQL queries)
- Support multiple branches with default fallback
Conditional Task Type
Conditional tasks are a special task type that evaluates conditions and determines which branch of tasks should be executed.
Task Definition
[[tasks]]
id = "check_quality"
name = "Check Data Quality"
type = "conditional"
[tasks.properties]
condition_type = "task_result"
condition = {
task_id = "quality_check_orders"
result = "success" # or "failure"
}
branches = [
{
name = "success_branch"
condition = { result = "success" }
task_ids = ["export_orders", "send_success_notification"]
},
{
name = "failure_branch"
condition = { result = "failure" }
task_ids = ["park_errors", "send_failure_alert"]
}
]
default_branch = ["log_error"] # Optional: tasks to execute if no branch matches
Condition Types
1. Task Result Conditions
Execute branches based on whether a previous task succeeded or failed.
Use when:
- You need to handle success/failure scenarios differently
- You want to trigger different workflows based on task outcomes
- You need error handling branches
Example:
[[tasks]]
id = "conditional_after_quality"
type = "conditional"
[tasks.properties]
condition_type = "task_result"
condition = {
task_id = "quality_check"
result = "success" # Evaluate if quality_check succeeded
}
branches = [
{
name = "on_success"
condition = { result = "success" }
task_ids = ["publish_data", "update_metrics"]
},
{
name = "on_failure"
condition = { result = "failure" }
task_ids = ["park_errors", "send_alert"]
}
]
2. XCom Value Conditions
Execute branches based on values from XCom (inter-task data exchange).
Use when:
- You need to branch based on data from previous tasks
- You want to make decisions based on computed values
- You need dynamic branching based on task outputs
Example:
[[tasks]]
id = "conditional_on_count"
type = "conditional"
[tasks.properties]
condition_type = "xcom_value"
condition = {
task_id = "count_records"
key = "return_value" # XCom key to pull
operator = "greater_than"
value = 1000
}
branches = [
{
name = "high_volume"
condition = {
task_id = "count_records"
operator = "greater_than"
value = 1000
}
task_ids = ["process_in_batches", "use_parallel_processing"]
},
{
name = "low_volume"
condition = {
task_id = "count_records"
operator = "less_than_or_equal"
value = 1000
}
task_ids = ["process_directly"]
}
]
Supported Operators:
equalsor==: Value equals conditionnot_equalsor!=: Value not equals conditiongreater_thanor>: Value greater than conditiongreater_than_or_equalor>=: Value greater than or equalless_thanor<: Value less than conditionless_than_or_equalor<=: Value less than or equalcontains: String contains substringnot_contains: String does not contain substringin: Value in listnot_in: Value not in list
3. Variable Conditions
Execute branches based on flow variables.
Use when:
- You need to branch based on configuration values
- You want environment-specific behavior
- You need to make decisions based on runtime parameters
Example:
[variables]
environment = "production"
threshold = 100
[[tasks]]
id = "conditional_on_env"
type = "conditional"
[tasks.properties]
condition_type = "variable"
condition = {
variable = "environment"
operator = "equals"
value = "production"
}
branches = [
{
name = "production_branch"
condition = { variable = "environment", operator = "equals", value = "production" }
task_ids = ["use_production_config", "enable_monitoring"]
},
{
name = "development_branch"
condition = { variable = "environment", operator = "equals", value = "development" }
task_ids = ["use_dev_config", "skip_monitoring"]
}
]
4. Data Conditions
Execute branches based on SQL query results.
Use when:
- You need to branch based on data state
- You want to check row counts, aggregations, or data values
- You need complex conditional logic based on data
Example:
[[tasks]]
id = "conditional_on_data"
type = "conditional"
[tasks.properties]
condition_type = "data"
condition = {
query = "SELECT COUNT(*) FROM orders WHERE status = 'pending'"
operator = "greater_than"
value = 100
}
branches = [
{
name = "high_pending_orders"
condition = {
query = "SELECT COUNT(*) FROM orders WHERE status = 'pending'"
operator = "greater_than"
value = 100
}
task_ids = ["prioritize_processing", "send_alert"]
},
{
name = "normal_processing"
condition = {
query = "SELECT COUNT(*) FROM orders WHERE status = 'pending'"
operator = "less_than_or_equal"
value = 100
}
task_ids = ["normal_processing"]
}
]
Note: The query must return a single value (single row, single column).
5. Always/Never Conditions
Special condition types for unconditional execution or skipping.
Always:
[tasks.properties]
condition_type = "always"
branches = [
{
name = "always_execute"
task_ids = ["task_a", "task_b"]
}
]
Never:
[tasks.properties]
condition_type = "never"
# No branches will execute
Complete Example
Here's a complete flow example with conditional branching:
# flows/orders_conditional_processing.toml
id = "orders_conditional_processing"
name = "Orders Conditional Processing"
flow_type = "standard"
namespace = "production"
[input]
primary_key = ["order_id"]
columns = [
{ name = "order_id", schema_type = "integer" },
{ name = "customer_id", schema_type = "integer" },
{ name = "amount", schema_type = "float" },
{ name = "status", schema_type = "string" }
]
[variables]
min_amount = 100
alert_threshold = 1000
[[tasks]]
id = "ingest_orders"
type = "ingestion"
target_dataset_id = "orders_landing"
[tasks.config]
path = "data/orders.csv"
format = "csv"
[[tasks]]
id = "quality_check"
type = "dq_check"
source_dataset_id = "orders_landing"
dependencies = ["ingest_orders"]
[tasks.properties.quality_checks]
[[tasks.properties.quality_checks]]
check_id = "completeness"
check_type = "completeness"
columns = ["order_id", "customer_id", "amount"]
[[tasks]]
id = "conditional_after_quality"
type = "conditional"
dependencies = ["quality_check"]
[tasks.properties]
condition_type = "task_result"
condition = {
task_id = "quality_check"
result = "success"
}
branches = [
{
name = "quality_passed"
condition = { result = "success" }
task_ids = ["transform_orders", "calculate_metrics"]
},
{
name = "quality_failed"
condition = { result = "failure" }
task_ids = ["park_errors", "send_alert"]
}
]
[[tasks]]
id = "transform_orders"
type = "transformation"
source_dataset_id = "orders_landing"
target_dataset_id = "orders_processed"
dependencies = ["conditional_after_quality"]
[tasks.config]
sql = "SELECT * FROM {{ source_dataset }} WHERE amount > {{ var('min_amount') }}"
[[tasks]]
id = "calculate_metrics"
type = "transformation"
source_dataset_id = "orders_processed"
target_dataset_id = "order_metrics"
dependencies = ["conditional_after_quality"]
[tasks.config]
sql = """
INSERT INTO {{ target_dataset }}
SELECT
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_amount
FROM {{ source_dataset }}
GROUP BY customer_id
"""
[[tasks]]
id = "conditional_on_volume"
type = "conditional"
dependencies = ["calculate_metrics"]
[tasks.properties]
condition_type = "xcom_value"
condition = {
task_id = "calculate_metrics"
key = "total_orders"
operator = "greater_than"
value = 1000
}
branches = [
{
name = "high_volume"
condition = {
task_id = "calculate_metrics"
operator = "greater_than"
value = 1000
}
task_ids = ["batch_export", "send_high_volume_alert"]
},
{
name = "normal_volume"
condition = {
task_id = "calculate_metrics"
operator = "less_than_or_equal"
value = 1000
}
task_ids = ["standard_export"]
}
]
default_branch = ["standard_export"]
[[tasks]]
id = "park_errors"
type = "transformation"
source_dataset_id = "orders_landing"
target_dataset_id = "orders_error_parking"
dependencies = ["conditional_after_quality"]
[tasks.config]
sql = "SELECT * FROM {{ source_dataset }} WHERE order_id IS NULL OR amount IS NULL"
[[tasks]]
id = "send_alert"
type = "alert"
dependencies = ["conditional_after_quality"]
[tasks.properties]
alert_type = "email"
recipients = ["admin@example.com"]
subject = "Data Quality Check Failed"
message = "Orders quality check failed. Records parked in error table."
Execution Flow
When a conditional task is encountered:
- Condition Evaluation: The condition is evaluated based on the condition type
- Branch Selection: The system finds the matching branch based on the condition result
- Task Execution: Only tasks in the selected branch are executed
- Default Branch: If no branch matches, the default_branch tasks are executed (if provided)
Best Practices
- Always Provide Default Branch: Include a
default_branchfor safety when conditions might not match - Use Descriptive Branch Names: Name branches clearly to document their purpose
- Validate Task IDs: Ensure all task_ids in branches exist in the flow
- Test Conditions: Verify conditions work as expected before deploying
- Keep Conditions Simple: Complex conditions can be hard to debug
- Document Branch Logic: Add comments explaining why branches are needed
Limitations
- Conditional tasks themselves don't execute any data operations (they only route execution)
- Task IDs in branches must reference tasks defined in the same flow
- Conditions are evaluated at runtime, not at flow definition time
- Data conditions require SQL-capable engines
Related Documentation
- Task System - Understanding task types
- XCom Guide - Inter-task data exchange
- Flow Variables - Using variables in flows
- Flows Guide - Complete flow documentation