Skip to main content

Conditional Branching in Flows

Qarion ETL supports conditional branching in flows, allowing you to execute different tasks based on evaluated conditions. This enables dynamic flow execution patterns like if/else logic, switch statements, and conditional task execution.

Overview

Conditional branching allows flows to:

  • Execute different task branches based on task results (success/failure)
  • Branch based on XCom values from previous tasks
  • Branch based on flow variables
  • Branch based on data conditions (SQL queries)
  • Support multiple branches with default fallback

Conditional Task Type

Conditional tasks are a special task type that evaluates conditions and determines which branch of tasks should be executed.

Task Definition

[[tasks]]
id = "check_quality"
name = "Check Data Quality"
type = "conditional"
[tasks.properties]
condition_type = "task_result"
condition = {
task_id = "quality_check_orders"
result = "success" # or "failure"
}
branches = [
{
name = "success_branch"
condition = { result = "success" }
task_ids = ["export_orders", "send_success_notification"]
},
{
name = "failure_branch"
condition = { result = "failure" }
task_ids = ["park_errors", "send_failure_alert"]
}
]
default_branch = ["log_error"] # Optional: tasks to execute if no branch matches

Condition Types

1. Task Result Conditions

Execute branches based on whether a previous task succeeded or failed.

Use when:

  • You need to handle success/failure scenarios differently
  • You want to trigger different workflows based on task outcomes
  • You need error handling branches

Example:

[[tasks]]
id = "conditional_after_quality"
type = "conditional"
[tasks.properties]
condition_type = "task_result"
condition = {
task_id = "quality_check"
result = "success" # Evaluate if quality_check succeeded
}
branches = [
{
name = "on_success"
condition = { result = "success" }
task_ids = ["publish_data", "update_metrics"]
},
{
name = "on_failure"
condition = { result = "failure" }
task_ids = ["park_errors", "send_alert"]
}
]

2. XCom Value Conditions

Execute branches based on values from XCom (inter-task data exchange).

Use when:

  • You need to branch based on data from previous tasks
  • You want to make decisions based on computed values
  • You need dynamic branching based on task outputs

Example:

[[tasks]]
id = "conditional_on_count"
type = "conditional"
[tasks.properties]
condition_type = "xcom_value"
condition = {
task_id = "count_records"
key = "return_value" # XCom key to pull
operator = "greater_than"
value = 1000
}
branches = [
{
name = "high_volume"
condition = {
task_id = "count_records"
operator = "greater_than"
value = 1000
}
task_ids = ["process_in_batches", "use_parallel_processing"]
},
{
name = "low_volume"
condition = {
task_id = "count_records"
operator = "less_than_or_equal"
value = 1000
}
task_ids = ["process_directly"]
}
]

Supported Operators:

  • equals or ==: Value equals condition
  • not_equals or !=: Value not equals condition
  • greater_than or >: Value greater than condition
  • greater_than_or_equal or >=: Value greater than or equal
  • less_than or <: Value less than condition
  • less_than_or_equal or <=: Value less than or equal
  • contains: String contains substring
  • not_contains: String does not contain substring
  • in: Value in list
  • not_in: Value not in list

3. Variable Conditions

Execute branches based on flow variables.

Use when:

  • You need to branch based on configuration values
  • You want environment-specific behavior
  • You need to make decisions based on runtime parameters

Example:

[variables]
environment = "production"
threshold = 100

[[tasks]]
id = "conditional_on_env"
type = "conditional"
[tasks.properties]
condition_type = "variable"
condition = {
variable = "environment"
operator = "equals"
value = "production"
}
branches = [
{
name = "production_branch"
condition = { variable = "environment", operator = "equals", value = "production" }
task_ids = ["use_production_config", "enable_monitoring"]
},
{
name = "development_branch"
condition = { variable = "environment", operator = "equals", value = "development" }
task_ids = ["use_dev_config", "skip_monitoring"]
}
]

4. Data Conditions

Execute branches based on SQL query results.

Use when:

  • You need to branch based on data state
  • You want to check row counts, aggregations, or data values
  • You need complex conditional logic based on data

Example:

[[tasks]]
id = "conditional_on_data"
type = "conditional"
[tasks.properties]
condition_type = "data"
condition = {
query = "SELECT COUNT(*) FROM orders WHERE status = 'pending'"
operator = "greater_than"
value = 100
}
branches = [
{
name = "high_pending_orders"
condition = {
query = "SELECT COUNT(*) FROM orders WHERE status = 'pending'"
operator = "greater_than"
value = 100
}
task_ids = ["prioritize_processing", "send_alert"]
},
{
name = "normal_processing"
condition = {
query = "SELECT COUNT(*) FROM orders WHERE status = 'pending'"
operator = "less_than_or_equal"
value = 100
}
task_ids = ["normal_processing"]
}
]

Note: The query must return a single value (single row, single column).

5. Always/Never Conditions

Special condition types for unconditional execution or skipping.

Always:

[tasks.properties]
condition_type = "always"
branches = [
{
name = "always_execute"
task_ids = ["task_a", "task_b"]
}
]

Never:

[tasks.properties]
condition_type = "never"
# No branches will execute

Complete Example

Here's a complete flow example with conditional branching:

# flows/orders_conditional_processing.toml
id = "orders_conditional_processing"
name = "Orders Conditional Processing"
flow_type = "standard"
namespace = "production"

[input]
primary_key = ["order_id"]
columns = [
{ name = "order_id", schema_type = "integer" },
{ name = "customer_id", schema_type = "integer" },
{ name = "amount", schema_type = "float" },
{ name = "status", schema_type = "string" }
]

[variables]
min_amount = 100
alert_threshold = 1000

[[tasks]]
id = "ingest_orders"
type = "ingestion"
target_dataset_id = "orders_landing"
[tasks.config]
path = "data/orders.csv"
format = "csv"

[[tasks]]
id = "quality_check"
type = "dq_check"
source_dataset_id = "orders_landing"
dependencies = ["ingest_orders"]
[tasks.properties.quality_checks]
[[tasks.properties.quality_checks]]
check_id = "completeness"
check_type = "completeness"
columns = ["order_id", "customer_id", "amount"]

[[tasks]]
id = "conditional_after_quality"
type = "conditional"
dependencies = ["quality_check"]
[tasks.properties]
condition_type = "task_result"
condition = {
task_id = "quality_check"
result = "success"
}
branches = [
{
name = "quality_passed"
condition = { result = "success" }
task_ids = ["transform_orders", "calculate_metrics"]
},
{
name = "quality_failed"
condition = { result = "failure" }
task_ids = ["park_errors", "send_alert"]
}
]

[[tasks]]
id = "transform_orders"
type = "transformation"
source_dataset_id = "orders_landing"
target_dataset_id = "orders_processed"
dependencies = ["conditional_after_quality"]
[tasks.config]
sql = "SELECT * FROM {{ source_dataset }} WHERE amount > {{ var('min_amount') }}"

[[tasks]]
id = "calculate_metrics"
type = "transformation"
source_dataset_id = "orders_processed"
target_dataset_id = "order_metrics"
dependencies = ["conditional_after_quality"]
[tasks.config]
sql = """
INSERT INTO {{ target_dataset }}
SELECT
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_amount
FROM {{ source_dataset }}
GROUP BY customer_id
"""

[[tasks]]
id = "conditional_on_volume"
type = "conditional"
dependencies = ["calculate_metrics"]
[tasks.properties]
condition_type = "xcom_value"
condition = {
task_id = "calculate_metrics"
key = "total_orders"
operator = "greater_than"
value = 1000
}
branches = [
{
name = "high_volume"
condition = {
task_id = "calculate_metrics"
operator = "greater_than"
value = 1000
}
task_ids = ["batch_export", "send_high_volume_alert"]
},
{
name = "normal_volume"
condition = {
task_id = "calculate_metrics"
operator = "less_than_or_equal"
value = 1000
}
task_ids = ["standard_export"]
}
]
default_branch = ["standard_export"]

[[tasks]]
id = "park_errors"
type = "transformation"
source_dataset_id = "orders_landing"
target_dataset_id = "orders_error_parking"
dependencies = ["conditional_after_quality"]
[tasks.config]
sql = "SELECT * FROM {{ source_dataset }} WHERE order_id IS NULL OR amount IS NULL"

[[tasks]]
id = "send_alert"
type = "alert"
dependencies = ["conditional_after_quality"]
[tasks.properties]
alert_type = "email"
recipients = ["admin@example.com"]
subject = "Data Quality Check Failed"
message = "Orders quality check failed. Records parked in error table."

Execution Flow

When a conditional task is encountered:

  1. Condition Evaluation: The condition is evaluated based on the condition type
  2. Branch Selection: The system finds the matching branch based on the condition result
  3. Task Execution: Only tasks in the selected branch are executed
  4. Default Branch: If no branch matches, the default_branch tasks are executed (if provided)

Best Practices

  1. Always Provide Default Branch: Include a default_branch for safety when conditions might not match
  2. Use Descriptive Branch Names: Name branches clearly to document their purpose
  3. Validate Task IDs: Ensure all task_ids in branches exist in the flow
  4. Test Conditions: Verify conditions work as expected before deploying
  5. Keep Conditions Simple: Complex conditions can be hard to debug
  6. Document Branch Logic: Add comments explaining why branches are needed

Limitations

  • Conditional tasks themselves don't execute any data operations (they only route execution)
  • Task IDs in branches must reference tasks defined in the same flow
  • Conditions are evaluated at runtime, not at flow definition time
  • Data conditions require SQL-capable engines