Flow Trigger Tasks
Overview
Flow Trigger Tasks allow you to trigger execution of other flows from within a flow, enabling flow composition and orchestration of complex multi-flow pipelines. This is similar to Airflow's SubDagOperator.
Key Features
- Flow Composition: Trigger other flows as part of your pipeline
- Conditional Execution: Control when to trigger based on conditions
- Batch ID Passing: Pass batch IDs between flows
- Variable Passing: Pass variables to triggered flows
- Wait for Completion: Optionally wait for triggered flow to complete
- XCom Integration: Results from triggered flows can be accessed via XCom
Configuration
Basic Flow Trigger Task
# flows/parent_flow.toml
id = "parent_flow"
name = "Parent Flow"
flow_type = "standard"
namespace = "production"
[input]
primary_key = ["id"]
columns = [
{name = "id", schema_type = "integer", required = true},
{name = "value", schema_type = "float", required = true}
]
[[tasks]]
id = "process_data"
name = "Process Data"
type = "transformation"
source_dataset_id = "data_landing"
target_dataset_id = "data_staging"
[tasks.config]
sql = "SELECT * FROM {{ source_dataset }} WHERE value > 0"
[[tasks]]
id = "trigger_child_flow"
name = "Trigger Child Flow"
type = "flow_trigger"
dependencies = ["process_data"]
[tasks.properties]
target_flow_id = "child_flow"
wait_for_completion = true
pass_batch_id = true
condition = "success"
Advanced Configuration
[[tasks]]
id = "trigger_analytics_flow"
name = "Trigger Analytics Flow"
type = "flow_trigger"
dependencies = ["process_data"]
[tasks.properties]
target_flow_id = "analytics_flow"
trigger_id = "cli_trigger" # Optional: specific trigger to use
wait_for_completion = true
pass_batch_id = true
batch_id = 5 # Optional: specific batch_id (overrides pass_batch_id)
execution_date = "2024-01-15T10:00:00" # Optional: ISO format
condition = "success" # 'success', 'failure', or 'always'
variables = {
min_value = 100
max_value = 1000
environment = "production"
}
Task Properties
Required Properties
target_flow_id: ID of the flow to trigger (required)
Optional Properties
trigger_id: Specific trigger ID to use (default: uses flow's default trigger)wait_for_completion: Whether to wait for triggered flow to complete (default:true)pass_batch_id: Whether to pass current batch_id to triggered flow (default:true)batch_id: Specific batch_id to use (overridespass_batch_idif set)execution_date: Execution date for triggered flow (ISO format string)variables: Variables to pass to triggered flow (dictionary)condition: Condition for triggering (success,failure,always) (default:success)
Condition Modes
Success Condition (Default)
Trigger only if upstream tasks succeed:
[tasks.properties]
target_flow_id = "child_flow"
condition = "success" # Only trigger if parent flow succeeds
Failure Condition
Trigger only if upstream tasks fail:
[tasks.properties]
target_flow_id = "error_handling_flow"
condition = "failure" # Only trigger if parent flow fails
Always Condition
Always trigger regardless of upstream result:
[tasks.properties]
target_flow_id = "logging_flow"
condition = "always" # Always trigger
Batch ID Handling
Pass Current Batch ID (Default)
[tasks.properties]
target_flow_id = "child_flow"
pass_batch_id = true # Pass current batch_id to child flow
Use Specific Batch ID
[tasks.properties]
target_flow_id = "child_flow"
pass_batch_id = false
batch_id = 42 # Use specific batch_id
Variable Passing
Pass variables to the triggered flow:
[[tasks]]
id = "trigger_with_vars"
type = "flow_trigger"
[tasks.properties]
target_flow_id = "child_flow"
variables = {
min_amount = 1000
max_amount = 10000
environment = "production"
date_filter = "2024-01-15"
}
The triggered flow can access these variables using {{ var('variable_name') }} in templates.
Wait for Completion
Wait for Completion (Default)
Wait for triggered flow to complete before continuing:
[tasks.properties]
target_flow_id = "child_flow"
wait_for_completion = true # Wait for child flow to finish
Behavior:
- Parent flow waits for child flow to complete
- If child flow fails and
condition = "success", parent flow fails - If child flow fails and
condition = "failure", parent flow continues - If child flow fails and
condition = "always", parent flow continues
Fire and Forget
Don't wait for triggered flow to complete:
[tasks.properties]
target_flow_id = "async_flow"
wait_for_completion = false # Don't wait, continue immediately
Behavior:
- Parent flow continues immediately after triggering
- Child flow runs asynchronously
- Parent flow doesn't check child flow result
Complete Examples
Example 1: Simple Flow Chain
Parent Flow:
# flows/parent_flow.toml
id = "parent_flow"
name = "Parent Flow"
flow_type = "standard"
[input]
primary_key = ["id"]
columns = [
{name = "id", schema_type = "integer", required = true},
{name = "value", schema_type = "float", required = true}
]
[[tasks]]
id = "ingest_data"
name = "Ingest Data"
type = "ingestion"
target_dataset_id = "data_landing"
[tasks.config]
path = "data/input.csv"
format = "csv"
[[tasks]]
id = "trigger_processing"
name = "Trigger Processing Flow"
type = "flow_trigger"
dependencies = ["ingest_data"]
[tasks.properties]
target_flow_id = "processing_flow"
wait_for_completion = true
condition = "success"
Child Flow:
# flows/processing_flow.toml
id = "processing_flow"
name = "Processing Flow"
flow_type = "standard"
[input]
primary_key = ["id"]
columns = [
{name = "id", schema_type = "integer", required = true},
{name = "value", schema_type = "float", required = true}
]
[[tasks]]
id = "process_data"
name = "Process Data"
type = "transformation"
source_dataset_id = "data_landing"
target_dataset_id = "data_processed"
[tasks.config]
sql = """
SELECT
id,
value * 1.1 as adjusted_value,
CASE
WHEN value > 1000 THEN 'high'
ELSE 'low'
END as category
FROM {{ source_dataset }}
"""
Example 2: Conditional Flow Triggering
Flow with Conditional Triggers:
# flows/main_flow.toml
id = "main_flow"
flow_type = "standard"
[[tasks]]
id = "process_orders"
name = "Process Orders"
type = "transformation"
source_dataset_id = "orders_landing"
target_dataset_id = "orders_staging"
[tasks.config]
sql = "SELECT * FROM {{ source_dataset }} WHERE amount > 0"
[[tasks]]
id = "trigger_success_flow"
name = "Trigger Success Handler"
type = "flow_trigger"
dependencies = ["process_orders"]
[tasks.properties]
target_flow_id = "success_handler_flow"
condition = "success" # Only if process_orders succeeds
wait_for_completion = true
[[tasks]]
id = "trigger_error_flow"
name = "Trigger Error Handler"
type = "flow_trigger"
dependencies = ["process_orders"]
[tasks.properties]
target_flow_id = "error_handler_flow"
condition = "failure" # Only if process_orders fails
wait_for_completion = true
Example 3: Parallel Flow Execution
Flow Triggering Multiple Child Flows:
# flows/orchestrator_flow.toml
id = "orchestrator_flow"
flow_type = "standard"
[[tasks]]
id = "prepare_data"
name = "Prepare Data"
type = "transformation"
source_dataset_id = "raw_data"
target_dataset_id = "prepared_data"
[tasks.config]
sql = "SELECT * FROM {{ source_dataset }}"
[[tasks]]
id = "trigger_flow_a"
name = "Trigger Flow A"
type = "flow_trigger"
dependencies = ["prepare_data"]
[tasks.properties]
target_flow_id = "flow_a"
wait_for_completion = false # Fire and forget
[[tasks]]
id = "trigger_flow_b"
name = "Trigger Flow B"
type = "flow_trigger"
dependencies = ["prepare_data"]
[tasks.properties]
target_flow_id = "flow_b"
wait_for_completion = false # Fire and forget
[[tasks]]
id = "trigger_flow_c"
name = "Trigger Flow C"
type = "flow_trigger"
dependencies = ["prepare_data"]
[tasks.properties]
target_flow_id = "flow_c"
wait_for_completion = true # Wait for this one
Example 4: Flow with Variables
Parent Flow:
# flows/parent_flow.toml
id = "parent_flow"
flow_type = "standard"
[[tasks]]
id = "calculate_stats"
name = "Calculate Statistics"
type = "transformation"
source_dataset_id = "data"
target_dataset_id = "stats"
[tasks.config]
sql = """
SELECT
AVG(value) as avg_value,
MAX(value) as max_value,
MIN(value) as min_value
FROM {{ source_dataset }}
"""
[[tasks]]
id = "trigger_reporting"
name = "Trigger Reporting Flow"
type = "flow_trigger"
dependencies = ["calculate_stats"]
[tasks.properties]
target_flow_id = "reporting_flow"
variables = {
min_threshold = 100
max_threshold = 1000
report_type = "daily"
}
Child Flow (uses variables):
# flows/reporting_flow.toml
id = "reporting_flow"
flow_type = "standard"
[[tasks]]
id = "generate_report"
name = "Generate Report"
type = "transformation"
source_dataset_id = "stats"
target_dataset_id = "report"
[tasks.config]
sql = """
SELECT *
FROM {{ source_dataset }}
WHERE avg_value >= {{ var('min_threshold') }}
AND avg_value <= {{ var('max_threshold') }}
"""
Integration with XCom
Flow trigger tasks automatically push results to XCom:
[[tasks]]
id = "trigger_flow"
type = "flow_trigger"
[tasks.properties]
target_flow_id = "child_flow"
Accessing Triggered Flow Results:
[[tasks]]
id = "use_triggered_result"
type = "transformation"
dependencies = ["trigger_flow"]
[tasks.config]
sql = """
SELECT
*,
{{ xcom_pull('trigger_flow', key='flow_trigger_result') }} as trigger_result
FROM {{ source_dataset }}
"""
Best Practices
-
Use Descriptive Task IDs: Use clear names for flow trigger tasks
id = "trigger_analytics_flow" # Good
id = "trigger1" # Avoid -
Set Appropriate Conditions: Choose conditions based on use case
success: For dependent flows that must succeedfailure: For error handling flowsalways: For logging/monitoring flows
-
Batch ID Management: Use
pass_batch_id = truefor related data processingpass_batch_id = true # Child flow processes same batch -
Wait for Critical Flows: Always wait for flows that produce data needed downstream
wait_for_completion = true # Wait for critical flows -
Fire and Forget for Async: Use
wait_for_completion = falsefor independent async flowswait_for_completion = false # For async/background flows -
Variable Naming: Use clear, descriptive variable names
variables = {
min_value = 100 # Clear
mv = 100 # Avoid
}
Limitations
- Flow Repository Access: Currently uses default flow repository locations. In production, repositories should be passed through execution context.
- Error Handling: Flow trigger failures are handled based on
conditionsetting - Circular Dependencies: Ensure no circular dependencies between flows (flow A triggers flow B, which triggers flow A)
Related Documentation
- Tasks Guide - Understanding task system
- Flows Guide - Flow execution
- XCom Guide - Inter-task data exchange
- Triggers Guide - Flow triggers