Skip to main content

Flow Trigger Tasks

Overview

Flow Trigger Tasks allow you to trigger execution of other flows from within a flow, enabling flow composition and orchestration of complex multi-flow pipelines. This is similar to Airflow's SubDagOperator.

Key Features

  • Flow Composition: Trigger other flows as part of your pipeline
  • Conditional Execution: Control when to trigger based on conditions
  • Batch ID Passing: Pass batch IDs between flows
  • Variable Passing: Pass variables to triggered flows
  • Wait for Completion: Optionally wait for triggered flow to complete
  • XCom Integration: Results from triggered flows can be accessed via XCom

Configuration

Basic Flow Trigger Task

# flows/parent_flow.toml
id = "parent_flow"
name = "Parent Flow"
flow_type = "standard"
namespace = "production"

[input]
primary_key = ["id"]
columns = [
{name = "id", schema_type = "integer", required = true},
{name = "value", schema_type = "float", required = true}
]

[[tasks]]
id = "process_data"
name = "Process Data"
type = "transformation"
source_dataset_id = "data_landing"
target_dataset_id = "data_staging"
[tasks.config]
sql = "SELECT * FROM {{ source_dataset }} WHERE value > 0"

[[tasks]]
id = "trigger_child_flow"
name = "Trigger Child Flow"
type = "flow_trigger"
dependencies = ["process_data"]
[tasks.properties]
target_flow_id = "child_flow"
wait_for_completion = true
pass_batch_id = true
condition = "success"

Advanced Configuration

[[tasks]]
id = "trigger_analytics_flow"
name = "Trigger Analytics Flow"
type = "flow_trigger"
dependencies = ["process_data"]
[tasks.properties]
target_flow_id = "analytics_flow"
trigger_id = "cli_trigger" # Optional: specific trigger to use
wait_for_completion = true
pass_batch_id = true
batch_id = 5 # Optional: specific batch_id (overrides pass_batch_id)
execution_date = "2024-01-15T10:00:00" # Optional: ISO format
condition = "success" # 'success', 'failure', or 'always'
variables = {
min_value = 100
max_value = 1000
environment = "production"
}

Task Properties

Required Properties

  • target_flow_id: ID of the flow to trigger (required)

Optional Properties

  • trigger_id: Specific trigger ID to use (default: uses flow's default trigger)
  • wait_for_completion: Whether to wait for triggered flow to complete (default: true)
  • pass_batch_id: Whether to pass current batch_id to triggered flow (default: true)
  • batch_id: Specific batch_id to use (overrides pass_batch_id if set)
  • execution_date: Execution date for triggered flow (ISO format string)
  • variables: Variables to pass to triggered flow (dictionary)
  • condition: Condition for triggering (success, failure, always) (default: success)

Condition Modes

Success Condition (Default)

Trigger only if upstream tasks succeed:

[tasks.properties]
target_flow_id = "child_flow"
condition = "success" # Only trigger if parent flow succeeds

Failure Condition

Trigger only if upstream tasks fail:

[tasks.properties]
target_flow_id = "error_handling_flow"
condition = "failure" # Only trigger if parent flow fails

Always Condition

Always trigger regardless of upstream result:

[tasks.properties]
target_flow_id = "logging_flow"
condition = "always" # Always trigger

Batch ID Handling

Pass Current Batch ID (Default)

[tasks.properties]
target_flow_id = "child_flow"
pass_batch_id = true # Pass current batch_id to child flow

Use Specific Batch ID

[tasks.properties]
target_flow_id = "child_flow"
pass_batch_id = false
batch_id = 42 # Use specific batch_id

Variable Passing

Pass variables to the triggered flow:

[[tasks]]
id = "trigger_with_vars"
type = "flow_trigger"
[tasks.properties]
target_flow_id = "child_flow"
variables = {
min_amount = 1000
max_amount = 10000
environment = "production"
date_filter = "2024-01-15"
}

The triggered flow can access these variables using {{ var('variable_name') }} in templates.

Wait for Completion

Wait for Completion (Default)

Wait for triggered flow to complete before continuing:

[tasks.properties]
target_flow_id = "child_flow"
wait_for_completion = true # Wait for child flow to finish

Behavior:

  • Parent flow waits for child flow to complete
  • If child flow fails and condition = "success", parent flow fails
  • If child flow fails and condition = "failure", parent flow continues
  • If child flow fails and condition = "always", parent flow continues

Fire and Forget

Don't wait for triggered flow to complete:

[tasks.properties]
target_flow_id = "async_flow"
wait_for_completion = false # Don't wait, continue immediately

Behavior:

  • Parent flow continues immediately after triggering
  • Child flow runs asynchronously
  • Parent flow doesn't check child flow result

Complete Examples

Example 1: Simple Flow Chain

Parent Flow:

# flows/parent_flow.toml
id = "parent_flow"
name = "Parent Flow"
flow_type = "standard"

[input]
primary_key = ["id"]
columns = [
{name = "id", schema_type = "integer", required = true},
{name = "value", schema_type = "float", required = true}
]

[[tasks]]
id = "ingest_data"
name = "Ingest Data"
type = "ingestion"
target_dataset_id = "data_landing"
[tasks.config]
path = "data/input.csv"
format = "csv"

[[tasks]]
id = "trigger_processing"
name = "Trigger Processing Flow"
type = "flow_trigger"
dependencies = ["ingest_data"]
[tasks.properties]
target_flow_id = "processing_flow"
wait_for_completion = true
condition = "success"

Child Flow:

# flows/processing_flow.toml
id = "processing_flow"
name = "Processing Flow"
flow_type = "standard"

[input]
primary_key = ["id"]
columns = [
{name = "id", schema_type = "integer", required = true},
{name = "value", schema_type = "float", required = true}
]

[[tasks]]
id = "process_data"
name = "Process Data"
type = "transformation"
source_dataset_id = "data_landing"
target_dataset_id = "data_processed"
[tasks.config]
sql = """
SELECT
id,
value * 1.1 as adjusted_value,
CASE
WHEN value > 1000 THEN 'high'
ELSE 'low'
END as category
FROM {{ source_dataset }}
"""

Example 2: Conditional Flow Triggering

Flow with Conditional Triggers:

# flows/main_flow.toml
id = "main_flow"
flow_type = "standard"

[[tasks]]
id = "process_orders"
name = "Process Orders"
type = "transformation"
source_dataset_id = "orders_landing"
target_dataset_id = "orders_staging"
[tasks.config]
sql = "SELECT * FROM {{ source_dataset }} WHERE amount > 0"

[[tasks]]
id = "trigger_success_flow"
name = "Trigger Success Handler"
type = "flow_trigger"
dependencies = ["process_orders"]
[tasks.properties]
target_flow_id = "success_handler_flow"
condition = "success" # Only if process_orders succeeds
wait_for_completion = true

[[tasks]]
id = "trigger_error_flow"
name = "Trigger Error Handler"
type = "flow_trigger"
dependencies = ["process_orders"]
[tasks.properties]
target_flow_id = "error_handler_flow"
condition = "failure" # Only if process_orders fails
wait_for_completion = true

Example 3: Parallel Flow Execution

Flow Triggering Multiple Child Flows:

# flows/orchestrator_flow.toml
id = "orchestrator_flow"
flow_type = "standard"

[[tasks]]
id = "prepare_data"
name = "Prepare Data"
type = "transformation"
source_dataset_id = "raw_data"
target_dataset_id = "prepared_data"
[tasks.config]
sql = "SELECT * FROM {{ source_dataset }}"

[[tasks]]
id = "trigger_flow_a"
name = "Trigger Flow A"
type = "flow_trigger"
dependencies = ["prepare_data"]
[tasks.properties]
target_flow_id = "flow_a"
wait_for_completion = false # Fire and forget

[[tasks]]
id = "trigger_flow_b"
name = "Trigger Flow B"
type = "flow_trigger"
dependencies = ["prepare_data"]
[tasks.properties]
target_flow_id = "flow_b"
wait_for_completion = false # Fire and forget

[[tasks]]
id = "trigger_flow_c"
name = "Trigger Flow C"
type = "flow_trigger"
dependencies = ["prepare_data"]
[tasks.properties]
target_flow_id = "flow_c"
wait_for_completion = true # Wait for this one

Example 4: Flow with Variables

Parent Flow:

# flows/parent_flow.toml
id = "parent_flow"
flow_type = "standard"

[[tasks]]
id = "calculate_stats"
name = "Calculate Statistics"
type = "transformation"
source_dataset_id = "data"
target_dataset_id = "stats"
[tasks.config]
sql = """
SELECT
AVG(value) as avg_value,
MAX(value) as max_value,
MIN(value) as min_value
FROM {{ source_dataset }}
"""

[[tasks]]
id = "trigger_reporting"
name = "Trigger Reporting Flow"
type = "flow_trigger"
dependencies = ["calculate_stats"]
[tasks.properties]
target_flow_id = "reporting_flow"
variables = {
min_threshold = 100
max_threshold = 1000
report_type = "daily"
}

Child Flow (uses variables):

# flows/reporting_flow.toml
id = "reporting_flow"
flow_type = "standard"

[[tasks]]
id = "generate_report"
name = "Generate Report"
type = "transformation"
source_dataset_id = "stats"
target_dataset_id = "report"
[tasks.config]
sql = """
SELECT *
FROM {{ source_dataset }}
WHERE avg_value >= {{ var('min_threshold') }}
AND avg_value <= {{ var('max_threshold') }}
"""

Integration with XCom

Flow trigger tasks automatically push results to XCom:

[[tasks]]
id = "trigger_flow"
type = "flow_trigger"
[tasks.properties]
target_flow_id = "child_flow"

Accessing Triggered Flow Results:

[[tasks]]
id = "use_triggered_result"
type = "transformation"
dependencies = ["trigger_flow"]
[tasks.config]
sql = """
SELECT
*,
{{ xcom_pull('trigger_flow', key='flow_trigger_result') }} as trigger_result
FROM {{ source_dataset }}
"""

Best Practices

  1. Use Descriptive Task IDs: Use clear names for flow trigger tasks

    id = "trigger_analytics_flow"  # Good
    id = "trigger1" # Avoid
  2. Set Appropriate Conditions: Choose conditions based on use case

    • success: For dependent flows that must succeed
    • failure: For error handling flows
    • always: For logging/monitoring flows
  3. Batch ID Management: Use pass_batch_id = true for related data processing

    pass_batch_id = true  # Child flow processes same batch
  4. Wait for Critical Flows: Always wait for flows that produce data needed downstream

    wait_for_completion = true  # Wait for critical flows
  5. Fire and Forget for Async: Use wait_for_completion = false for independent async flows

    wait_for_completion = false  # For async/background flows
  6. Variable Naming: Use clear, descriptive variable names

    variables = {
    min_value = 100 # Clear
    mv = 100 # Avoid
    }

Limitations

  • Flow Repository Access: Currently uses default flow repository locations. In production, repositories should be passed through execution context.
  • Error Handling: Flow trigger failures are handled based on condition setting
  • Circular Dependencies: Ensure no circular dependencies between flows (flow A triggers flow B, which triggers flow A)