Skip to main content

Flow Triggers

A comprehensive guide to using triggers to initiate flow execution in Qarion ETL.

Overview

Triggers are mechanisms that initiate flow execution based on various conditions. Qarion ETL supports multiple trigger types, allowing you to automate flow execution through:

  • CLI: Manual command-line invocation
  • Schedule: Time-based execution (cron-like)
  • Sensor: File or change detection
  • Flow: Dependencies on other flows
  • API: HTTP API calls (future)

Quick Start

Defining Triggers in Flow Definitions

Add triggers to your flow definition in flows/my_flow.toml:

id = "my_flow"
name = "My Flow"
flow_type = "standard"

[[triggers]]
id = "cli_trigger"
type = "cli"
enabled = true
description = "Manual trigger via command line"

Triggering a Flow

Use the CLI command to trigger a flow:

qarion-etl trigger --flow-id my_flow

Trigger Types

CLI Trigger

Immediate execution via command line. Always fires when activated.

Configuration:

[[triggers]]
id = "cli_trigger"
type = "cli"
enabled = true
description = "Manual trigger"

Usage:

qarion-etl trigger --flow-id my_flow
qarion-etl trigger --flow-id my_flow --trigger-id cli_trigger

Schedule Trigger

Time-based execution using cron-like expressions.

Configuration:

[[triggers]]
id = "daily_trigger"
type = "schedule"
schedule = "0 0 * * *" # Daily at midnight
timezone = "UTC"
enabled = true
description = "Daily execution at midnight"

Schedule Format:

  • Cron expression: minute hour day month weekday
  • Examples:
    • 0 0 * * * - Daily at midnight
    • 0 */6 * * * - Every 6 hours
    • 0 9 * * 1-5 - Weekdays at 9 AM
    • 0 0 1 * * - First day of each month

Note: Schedule triggers require a scheduler service to be running (future feature).

Sensor Trigger

Detects changes in files or directories and triggers flow execution.

Configuration:

[[triggers]]
id = "file_sensor"
type = "sensor"
sensor_type = "file"
path = "/data/input.csv"
event_type = "created" # created, modified, deleted, all
enabled = true
description = "Trigger when input file is created"

Sensor Types:

  1. File Sensor:

    [[triggers]]
    id = "file_watcher"
    type = "sensor"
    sensor_type = "file"
    path = "/data/input.csv"
    event_type = "modified"
  2. Directory Sensor:

    [[triggers]]
    id = "directory_watcher"
    type = "sensor"
    sensor_type = "directory"
    path = "/data/inputs"
    pattern = "*.csv" # Optional file pattern
    event_type = "created"
  3. Database Sensor:

    [[triggers]]
    id = "db_sensor"
    type = "sensor"
    sensor_type = "database"
    table_name = "orders"
    detection_mode = "row_count" # row_count, timestamp_column, custom_query
    enabled = true
    description = "Trigger when new orders are added"

    Detection Modes:

    a. Row Count Mode (detects when row count changes):

    [[triggers]]
    id = "row_count_sensor"
    type = "sensor"
    sensor_type = "database"
    table_name = "orders"
    detection_mode = "row_count"
    condition = "status = 'new'" # Optional WHERE clause

    b. Timestamp Column Mode (detects new/modified rows):

    [[triggers]]
    id = "timestamp_sensor"
    type = "sensor"
    sensor_type = "database"
    table_name = "orders"
    detection_mode = "timestamp_column"
    timestamp_column = "created_at"
    condition = "status = 'pending'" # Optional WHERE clause

    c. Custom Query Mode (detects when query result changes):

    [[triggers]]
    id = "custom_query_sensor"
    type = "sensor"
    sensor_type = "database"
    table_name = "orders"
    detection_mode = "custom_query"
    custom_query = "SELECT COUNT(*) FROM orders WHERE status = 'pending' AND created_at > CURRENT_DATE - INTERVAL '1 day'"

    Database Sensor Options:

    • table_name: Name of the table to monitor (required)
    • detection_mode: How to detect changes - row_count, timestamp_column, or custom_query (required)
    • timestamp_column: Column name for timestamp-based detection (required for timestamp_column mode)
    • custom_query: SQL query that returns a single value (required for custom_query mode)
    • condition: Optional SQL WHERE clause to filter rows
    • engine_config: Optional database engine configuration (uses project engine if not provided)
      [triggers.engine_config]
      name = "postgresql"
      [triggers.engine_config.config]
      host = "localhost"
      port = 5432
      database = "mydb"
      user = "myuser"
      password = "${credential:db_password}"
  4. S3 Sensor (future):

    [[triggers]]
    id = "s3_sensor"
    type = "sensor"
    sensor_type = "s3"
    path = "s3://my-bucket/data/"
    pattern = "*.csv"
    event_type = "created"

Event Types:

  • created: Trigger when file/directory is created
  • modified: Trigger when file/directory is modified
  • deleted: Trigger when file/directory is deleted
  • all: Trigger on any event

Flow Trigger

Triggers a flow when another flow completes.

Configuration:

[[triggers]]
id = "downstream_trigger"
type = "flow"
source_flow_id = "upstream_flow"
source_task_id = "transform_task" # Optional: specific task
condition = "success" # success, failure, always
enabled = true
description = "Trigger after upstream flow succeeds"

Conditions:

  • success: Trigger only if source flow succeeds
  • failure: Trigger only if source flow fails
  • always: Trigger regardless of source flow result

Example Flow Chain:

# flows/ingest_flow.toml
id = "ingest_flow"
flow_type = "standard"

# flows/transform_flow.toml
id = "transform_flow"
flow_type = "standard"

[[triggers]]
id = "after_ingest"
type = "flow"
source_flow_id = "ingest_flow"
condition = "success"

API Trigger (Future)

HTTP API endpoint for triggering flows.

Configuration:

[[triggers]]
id = "api_trigger"
type = "api"
endpoint = "/api/flows/my_flow/trigger"
method = "POST"
enabled = true

Note: API triggers require the API server to be running (future feature).

Trigger Configuration

Common Properties

All triggers support these common properties:

[[triggers]]
id = "trigger_id" # Unique identifier
type = "cli" # Trigger type
enabled = true # Enable/disable trigger
description = "Description" # Optional description
metadata = {} # Optional metadata

Multiple Triggers

You can define multiple triggers for a single flow:

id = "my_flow"
flow_type = "standard"

[[triggers]]
id = "cli_trigger"
type = "cli"
enabled = true

[[triggers]]
id = "schedule_trigger"
type = "schedule"
schedule = "0 0 * * *"
enabled = true

[[triggers]]
id = "file_sensor"
type = "sensor"
sensor_type = "file"
path = "/data/input.csv"
event_type = "created"
enabled = true

CLI Usage

Trigger a Flow

# Trigger with default trigger
qarion-etl trigger --flow-id my_flow

# Trigger with specific trigger ID
qarion-etl trigger --flow-id my_flow --trigger-id cli_trigger

# Trigger with custom batch ID
qarion-etl trigger --flow-id my_flow --batch-id 5

# Force execution (skip condition checks)
qarion-etl trigger --flow-id my_flow --force

Options

  • --flow-id: Flow ID to trigger (required)
  • --trigger-id: Specific trigger ID (optional)
  • --batch-id: Batch ID for execution (default: 1)
  • --force: Force execution even if conditions not met

Examples

Example 1: ETL Pipeline with File Sensor

# flows/etl_pipeline.toml
id = "etl_pipeline"
name = "ETL Pipeline"
flow_type = "standard"

[[triggers]]
id = "file_trigger"
type = "sensor"
sensor_type = "file"
path = "/data/raw/input.csv"
event_type = "created"
enabled = true
description = "Trigger when new input file arrives"

Usage:

# Flow automatically triggers when file is created
# Or manually trigger:
qarion-etl trigger --flow-id etl_pipeline

Example 2: Scheduled Daily Report

Complete Flow with Schedule Trigger:

# flows/daily_report.toml
id = "daily_report"
name = "Daily Report"
flow_type = "standard"
namespace = "reports"

[input]
primary_key = ["report_id"]
columns = [
{name = "report_id", schema_type = "string", required = true},
{name = "date", schema_type = "date", required = true},
{name = "metric", schema_type = "string", required = true},
{name = "value", schema_type = "float", required = true}
]

[[triggers]]
id = "daily_schedule"
type = "schedule"
schedule = "0 0 * * *" # Daily at midnight
timezone = "UTC"
enabled = true
description = "Daily execution at midnight UTC"

[[triggers]]
id = "manual_trigger"
type = "cli"
enabled = true
description = "Manual trigger for testing"

[[triggers]]
id = "daily_schedule"
type = "schedule"
schedule = "0 8 * * 1-5" # Weekdays at 8 AM
timezone = "America/New_York"
enabled = true
description = "Generate daily report on weekdays"

Example 3: Flow Chain

# flows/ingest.toml
id = "ingest"
flow_type = "standard"

[[triggers]]
id = "manual_ingest"
type = "cli"
enabled = true

# flows/transform.toml
id = "transform"
flow_type = "standard"

[[triggers]]
id = "after_ingest"
type = "flow"
source_flow_id = "ingest"
condition = "success"
enabled = true

# flows/publish.toml
id = "publish"
flow_type = "standard"

[[triggers]]
id = "after_transform"
type = "flow"
source_flow_id = "transform"
condition = "success"
enabled = true

This creates a chain: ingesttransformpublish

Best Practices

  1. Use Descriptive Trigger IDs:

    id = "daily_midnight_trigger"  # Good
    id = "trigger1" # Avoid
  2. Enable/Disable Triggers:

    enabled = true   # Active
    enabled = false # Disabled (won't fire)
  3. Add Descriptions:

    description = "Triggers daily report generation at 8 AM on weekdays"
  4. Use Flow Triggers for Dependencies:

    • Define clear dependencies between flows
    • Use appropriate conditions (success, failure, always)
  5. Monitor Trigger Execution:

    • Check logs for trigger activation
    • Verify flow execution results

Troubleshooting

Trigger Not Firing

  1. Check if trigger is enabled:

    enabled = true
  2. Verify trigger configuration:

    • Check trigger type is correct
    • Verify required fields are present
    • Validate schedule expressions or paths
  3. Check logs:

    # Look for trigger activation messages

Schedule Trigger Not Working

  • Schedule triggers require a scheduler service (future feature)
  • For now, use CLI triggers for manual execution

Sensor Trigger Not Detecting Changes

  1. Verify path exists:

    ls -la /data/input.csv
  2. Check file permissions:

    • Ensure Qarion ETL can read the file/directory
  3. Verify event type:

    • Use all to catch any event type