Skip to main content

Flow Trigger System

Technical documentation for the Qarion ETL flow trigger system.

Overview

The trigger system provides a flexible, extensible mechanism for initiating flow execution based on various conditions. Triggers abstract the activation logic from flow execution, allowing flows to be triggered by CLI commands, schedules, sensors, other flows, or API calls.

Architecture

Core Components

  1. Trigger Base Classes (flows/triggers/base.py):

    • Trigger: Abstract base class for all triggers
    • TriggerType: Enum of trigger types
    • TriggerContext: Context information for trigger execution
    • TriggerResult: Result of trigger activation
  2. Trigger Implementations:

    • CLITrigger: Immediate CLI-based execution
    • ScheduleTrigger: Cron-like scheduled execution
    • SensorTrigger: File/directory change detection
    • FlowTrigger: Flow-to-flow dependencies
  3. Trigger Service (flows/triggers/service.py):

    • Manages trigger lifecycle
    • Loads triggers from flow definitions
    • Activates triggers and initiates flow execution
  4. Trigger Registry (flows/triggers/registry.py):

    • Plugin system for registering trigger types
    • Factory for creating trigger instances

Trigger Base Class

class Trigger(ABC):
"""Abstract base class for flow triggers."""

@property
@abstractmethod
def trigger_type(self) -> TriggerType:
"""Return the type of this trigger."""
pass

@abstractmethod
def should_trigger(self, context: Optional[TriggerContext] = None) -> bool:
"""Check if the trigger condition is met."""
pass

@abstractmethod
def activate(self, context: Optional[TriggerContext] = None) -> TriggerResult:
"""Activate the trigger and initiate flow execution."""
pass

Trigger Types

CLI Trigger

File: flows/triggers/cli_trigger.py

Immediate execution via command line. Always fires when activated.

Configuration:

{
'id': 'cli_trigger',
'type': 'cli',
'flow_id': 'my_flow',
'enabled': True
}

Usage:

qarion-etl trigger --flow-id my_flow

Schedule Trigger

File: flows/triggers/schedule_trigger.py

Time-based execution using cron-like expressions.

Configuration:

{
'id': 'schedule_trigger',
'type': 'schedule',
'flow_id': 'my_flow',
'schedule': '0 0 * * *', # Cron expression
'timezone': 'UTC',
'enabled': True
}

Implementation Notes:

  • Currently uses placeholder for cron parsing
  • Future: Integrate with croniter or similar library
  • Requires scheduler service for automatic execution

Sensor Trigger

File: flows/triggers/sensor_trigger.py

Detects changes in files, directories, or other sources.

Configuration:

{
'id': 'file_sensor',
'type': 'sensor',
'flow_id': 'my_flow',
'sensor_type': 'file', # file, directory, s3, database
'path': '/data/input.csv',
'pattern': '*.csv', # Optional
'event_type': 'created', # created, modified, deleted, all
'enabled': True
}

Supported Sensor Types:

  • file: Single file monitoring
  • directory: Directory monitoring with optional pattern
  • s3: S3 bucket monitoring (future)
  • database: Database table change detection (implemented)

Database Sensor Configuration:

{
'id': 'db_sensor',
'type': 'sensor',
'flow_id': 'my_flow',
'sensor_type': 'database',
'table_name': 'orders',
'detection_mode': 'row_count', # row_count, timestamp_column, custom_query
'timestamp_column': 'created_at', # Required for timestamp_column mode
'custom_query': 'SELECT COUNT(*) FROM orders WHERE status = "pending"', # For custom_query mode
'condition': 'status = "new"', # Optional WHERE clause
'engine_config': { # Optional - uses project engine if not provided
'name': 'postgresql',
'config': {...}
},
'enabled': True
}

Detection Modes:

  • row_count: Triggers when the row count changes
  • timestamp_column: Triggers when new/modified rows are detected based on a timestamp column
  • custom_query: Triggers when the result of a custom SQL query changes

Flow Trigger

File: flows/triggers/flow_trigger.py

Triggers based on other flow execution.

Configuration:

{
'id': 'flow_trigger',
'type': 'flow',
'flow_id': 'downstream_flow',
'source_flow_id': 'upstream_flow',
'source_task_id': 'task_id', # Optional
'condition': 'success', # success, failure, always
'enabled': True
}

Conditions:

  • success: Trigger only if source flow succeeds
  • failure: Trigger only if source flow fails
  • always: Trigger regardless of result

Trigger Service

The TriggerService manages trigger lifecycle and execution:

class TriggerService:
"""Service for managing and executing flow triggers."""

def load_triggers_from_flow(self, flow_definition: Dict[str, Any]) -> List[Trigger]:
"""Load triggers from a flow definition."""
pass

def trigger_flow(
self,
flow_id: str,
trigger_type: Optional[str] = None,
trigger_id: Optional[str] = None,
context: Optional[TriggerContext] = None
) -> TriggerResult:
"""Trigger a flow execution."""
pass

Trigger Registry

The registry provides plugin system for trigger types:

class TriggerRegistry:
"""Registry for trigger type implementations."""

@classmethod
def register_trigger(cls, trigger_type: str, trigger_class: Type[Trigger]):
"""Register a trigger type."""
pass

@classmethod
def create_trigger(cls, config: Dict) -> Trigger:
"""Create a trigger instance from configuration."""
pass

Creating Custom Triggers

Example: Custom API Trigger

from flows.triggers.base import Trigger, TriggerType, TriggerContext, TriggerResult
from flows.triggers.registry import register_trigger

class APITrigger(Trigger):
"""Trigger for HTTP API calls."""

@property
def trigger_type(self) -> TriggerType:
return TriggerType.API

def should_trigger(self, context: Optional[TriggerContext] = None) -> bool:
# Check if API request is valid
return True

def activate(self, context: Optional[TriggerContext] = None) -> TriggerResult:
# Process API request and trigger flow
return TriggerResult(
success=True,
trigger_id=self.trigger_id,
flow_id=self.flow_id,
message="API trigger activated"
)

# Register the trigger
register_trigger('api', APITrigger)

Integration with Flow Execution

Triggers integrate with FlowExecutionService:

# Load triggers from flow
trigger_service = TriggerService(execution_service)
triggers = trigger_service.load_triggers_from_flow(flow_definition)

# Activate trigger
result = trigger_service.trigger_flow(
flow_id='my_flow',
trigger_id='cli_trigger'
)

# If successful, execute flow
if result.success:
execution_plan = execution_service.plan_flow_execution(...)
execution_result = execution_service.execute_flow(execution_plan)

Flow Definition Schema

Triggers are defined in flow definitions:

id = "my_flow"
flow_type = "standard"

[[triggers]]
id = "trigger_id"
type = "cli" # cli, schedule, sensor, flow, api
enabled = true
description = "Optional description"

# Type-specific configuration
schedule = "0 0 * * *" # For schedule triggers
path = "/data/file.csv" # For sensor triggers
source_flow_id = "upstream" # For flow triggers

CLI Integration

The qarion-etl trigger command activates triggers:

@click.command(name="trigger")
@click.option('--flow-id', required=True)
@click.option('--trigger-id', default=None)
@click.option('--batch-id', default=1)
def cli_trigger(flow_id, trigger_id, batch_id):
"""Trigger a flow execution."""
# Load flow and triggers
# Activate trigger
# Execute flow

Future Enhancements

  1. Schedule Implementation:

    • Full cron expression parsing
    • Timezone support
    • Scheduler service integration
  2. Sensor Enhancements:

    • S3 bucket monitoring
    • Database change detection
    • Message queue integration
  3. API Triggers:

    • HTTP endpoint implementation
    • Authentication/authorization
    • Webhook support
  4. Trigger Chains:

    • Complex trigger dependencies
    • Conditional trigger logic
    • Trigger groups

Testing Triggers

Unit Testing

def test_cli_trigger():
trigger = CLITrigger({
'id': 'test_trigger',
'flow_id': 'test_flow',
'enabled': True
})

result = trigger.activate()
assert result.success
assert result.flow_id == 'test_flow'

Integration Testing

def test_trigger_service():
service = TriggerService(execution_service)
triggers = service.load_triggers_from_flow(flow_definition)

result = service.trigger_flow('my_flow')
assert result.success