Flow Triggers
A comprehensive guide to using triggers to initiate flow execution in Qarion ETL.
Overview
Triggers are mechanisms that initiate flow execution based on various conditions. Qarion ETL supports multiple trigger types, allowing you to automate flow execution through:
- CLI: Manual command-line invocation
- Schedule: Time-based execution (cron-like)
- Sensor: File or change detection
- Flow: Dependencies on other flows
- API: HTTP API calls (future)
Quick Start
Defining Triggers in Flow Definitions
Add triggers to your flow definition in flows/my_flow.toml:
id = "my_flow"
name = "My Flow"
flow_type = "standard"
[[triggers]]
id = "cli_trigger"
type = "cli"
enabled = true
description = "Manual trigger via command line"
Triggering a Flow
Use the CLI command to trigger a flow:
qarion-etl trigger --flow-id my_flow
Trigger Types
CLI Trigger
Immediate execution via command line. Always fires when activated.
Configuration:
[[triggers]]
id = "cli_trigger"
type = "cli"
enabled = true
description = "Manual trigger"
Usage:
qarion-etl trigger --flow-id my_flow
qarion-etl trigger --flow-id my_flow --trigger-id cli_trigger
Schedule Trigger
Time-based execution using cron-like expressions.
Configuration:
[[triggers]]
id = "daily_trigger"
type = "schedule"
schedule = "0 0 * * *" # Daily at midnight
timezone = "UTC"
enabled = true
description = "Daily execution at midnight"
Schedule Format:
- Cron expression:
minute hour day month weekday - Examples:
0 0 * * *- Daily at midnight0 */6 * * *- Every 6 hours0 9 * * 1-5- Weekdays at 9 AM0 0 1 * *- First day of each month
Note: Schedule triggers require a scheduler service to be running (future feature).
Sensor Trigger
Detects changes in files or directories and triggers flow execution.
Configuration:
[[triggers]]
id = "file_sensor"
type = "sensor"
sensor_type = "file"
path = "/data/input.csv"
event_type = "created" # created, modified, deleted, all
enabled = true
description = "Trigger when input file is created"
Sensor Types:
-
File Sensor:
[[triggers]]
id = "file_watcher"
type = "sensor"
sensor_type = "file"
path = "/data/input.csv"
event_type = "modified" -
Directory Sensor:
[[triggers]]
id = "directory_watcher"
type = "sensor"
sensor_type = "directory"
path = "/data/inputs"
pattern = "*.csv" # Optional file pattern
event_type = "created" -
Database Sensor:
[[triggers]]
id = "db_sensor"
type = "sensor"
sensor_type = "database"
table_name = "orders"
detection_mode = "row_count" # row_count, timestamp_column, custom_query
enabled = true
description = "Trigger when new orders are added"Detection Modes:
a. Row Count Mode (detects when row count changes):
[[triggers]]
id = "row_count_sensor"
type = "sensor"
sensor_type = "database"
table_name = "orders"
detection_mode = "row_count"
condition = "status = 'new'" # Optional WHERE clauseb. Timestamp Column Mode (detects new/modified rows):
[[triggers]]
id = "timestamp_sensor"
type = "sensor"
sensor_type = "database"
table_name = "orders"
detection_mode = "timestamp_column"
timestamp_column = "created_at"
condition = "status = 'pending'" # Optional WHERE clausec. Custom Query Mode (detects when query result changes):
[[triggers]]
id = "custom_query_sensor"
type = "sensor"
sensor_type = "database"
table_name = "orders"
detection_mode = "custom_query"
custom_query = "SELECT COUNT(*) FROM orders WHERE status = 'pending' AND created_at > CURRENT_DATE - INTERVAL '1 day'"Database Sensor Options:
table_name: Name of the table to monitor (required)detection_mode: How to detect changes -row_count,timestamp_column, orcustom_query(required)timestamp_column: Column name for timestamp-based detection (required fortimestamp_columnmode)custom_query: SQL query that returns a single value (required forcustom_querymode)condition: Optional SQL WHERE clause to filter rowsengine_config: Optional database engine configuration (uses project engine if not provided)[triggers.engine_config]
name = "postgresql"
[triggers.engine_config.config]
host = "localhost"
port = 5432
database = "mydb"
user = "myuser"
password = "${credential:db_password}"
-
S3 Sensor (future):
[[triggers]]
id = "s3_sensor"
type = "sensor"
sensor_type = "s3"
path = "s3://my-bucket/data/"
pattern = "*.csv"
event_type = "created"
Event Types:
created: Trigger when file/directory is createdmodified: Trigger when file/directory is modifieddeleted: Trigger when file/directory is deletedall: Trigger on any event
Flow Trigger
Triggers a flow when another flow completes.
Configuration:
[[triggers]]
id = "downstream_trigger"
type = "flow"
source_flow_id = "upstream_flow"
source_task_id = "transform_task" # Optional: specific task
condition = "success" # success, failure, always
enabled = true
description = "Trigger after upstream flow succeeds"
Conditions:
success: Trigger only if source flow succeedsfailure: Trigger only if source flow failsalways: Trigger regardless of source flow result
Example Flow Chain:
# flows/ingest_flow.toml
id = "ingest_flow"
flow_type = "standard"
# flows/transform_flow.toml
id = "transform_flow"
flow_type = "standard"
[[triggers]]
id = "after_ingest"
type = "flow"
source_flow_id = "ingest_flow"
condition = "success"
API Trigger (Future)
HTTP API endpoint for triggering flows.
Configuration:
[[triggers]]
id = "api_trigger"
type = "api"
endpoint = "/api/flows/my_flow/trigger"
method = "POST"
enabled = true
Note: API triggers require the API server to be running (future feature).
Trigger Configuration
Common Properties
All triggers support these common properties:
[[triggers]]
id = "trigger_id" # Unique identifier
type = "cli" # Trigger type
enabled = true # Enable/disable trigger
description = "Description" # Optional description
metadata = {} # Optional metadata
Multiple Triggers
You can define multiple triggers for a single flow:
id = "my_flow"
flow_type = "standard"
[[triggers]]
id = "cli_trigger"
type = "cli"
enabled = true
[[triggers]]
id = "schedule_trigger"
type = "schedule"
schedule = "0 0 * * *"
enabled = true
[[triggers]]
id = "file_sensor"
type = "sensor"
sensor_type = "file"
path = "/data/input.csv"
event_type = "created"
enabled = true
CLI Usage
Trigger a Flow
# Trigger with default trigger
qarion-etl trigger --flow-id my_flow
# Trigger with specific trigger ID
qarion-etl trigger --flow-id my_flow --trigger-id cli_trigger
# Trigger with custom batch ID
qarion-etl trigger --flow-id my_flow --batch-id 5
# Force execution (skip condition checks)
qarion-etl trigger --flow-id my_flow --force
Options
--flow-id: Flow ID to trigger (required)--trigger-id: Specific trigger ID (optional)--batch-id: Batch ID for execution (default: 1)--force: Force execution even if conditions not met
Examples
Example 1: ETL Pipeline with File Sensor
# flows/etl_pipeline.toml
id = "etl_pipeline"
name = "ETL Pipeline"
flow_type = "standard"
[[triggers]]
id = "file_trigger"
type = "sensor"
sensor_type = "file"
path = "/data/raw/input.csv"
event_type = "created"
enabled = true
description = "Trigger when new input file arrives"
Usage:
# Flow automatically triggers when file is created
# Or manually trigger:
qarion-etl trigger --flow-id etl_pipeline
Example 2: Scheduled Daily Report
Complete Flow with Schedule Trigger:
# flows/daily_report.toml
id = "daily_report"
name = "Daily Report"
flow_type = "standard"
namespace = "reports"
[input]
primary_key = ["report_id"]
columns = [
{name = "report_id", schema_type = "string", required = true},
{name = "date", schema_type = "date", required = true},
{name = "metric", schema_type = "string", required = true},
{name = "value", schema_type = "float", required = true}
]
[[triggers]]
id = "daily_schedule"
type = "schedule"
schedule = "0 0 * * *" # Daily at midnight
timezone = "UTC"
enabled = true
description = "Daily execution at midnight UTC"
[[triggers]]
id = "manual_trigger"
type = "cli"
enabled = true
description = "Manual trigger for testing"
[[triggers]]
id = "daily_schedule"
type = "schedule"
schedule = "0 8 * * 1-5" # Weekdays at 8 AM
timezone = "America/New_York"
enabled = true
description = "Generate daily report on weekdays"
Example 3: Flow Chain
# flows/ingest.toml
id = "ingest"
flow_type = "standard"
[[triggers]]
id = "manual_ingest"
type = "cli"
enabled = true
# flows/transform.toml
id = "transform"
flow_type = "standard"
[[triggers]]
id = "after_ingest"
type = "flow"
source_flow_id = "ingest"
condition = "success"
enabled = true
# flows/publish.toml
id = "publish"
flow_type = "standard"
[[triggers]]
id = "after_transform"
type = "flow"
source_flow_id = "transform"
condition = "success"
enabled = true
This creates a chain: ingest → transform → publish
Best Practices
-
Use Descriptive Trigger IDs:
id = "daily_midnight_trigger" # Good
id = "trigger1" # Avoid -
Enable/Disable Triggers:
enabled = true # Active
enabled = false # Disabled (won't fire) -
Add Descriptions:
description = "Triggers daily report generation at 8 AM on weekdays" -
Use Flow Triggers for Dependencies:
- Define clear dependencies between flows
- Use appropriate conditions (
success,failure,always)
-
Monitor Trigger Execution:
- Check logs for trigger activation
- Verify flow execution results
Troubleshooting
Trigger Not Firing
-
Check if trigger is enabled:
enabled = true -
Verify trigger configuration:
- Check trigger type is correct
- Verify required fields are present
- Validate schedule expressions or paths
-
Check logs:
# Look for trigger activation messages
Schedule Trigger Not Working
- Schedule triggers require a scheduler service (future feature)
- For now, use CLI triggers for manual execution
Sensor Trigger Not Detecting Changes
-
Verify path exists:
ls -la /data/input.csv -
Check file permissions:
- Ensure Qarion ETL can read the file/directory
-
Verify event type:
- Use
allto catch any event type
- Use