Flow Trigger System
Technical documentation for the Qarion ETL flow trigger system.
Overview
The trigger system provides a flexible, extensible mechanism for initiating flow execution based on various conditions. Triggers abstract the activation logic from flow execution, allowing flows to be triggered by CLI commands, schedules, sensors, other flows, or API calls.
Architecture
Core Components
-
Trigger Base Classes (
flows/triggers/base.py):Trigger: Abstract base class for all triggersTriggerType: Enum of trigger typesTriggerContext: Context information for trigger executionTriggerResult: Result of trigger activation
-
Trigger Implementations:
CLITrigger: Immediate CLI-based executionScheduleTrigger: Cron-like scheduled executionSensorTrigger: File/directory change detectionFlowTrigger: Flow-to-flow dependencies
-
Trigger Service (
flows/triggers/service.py):- Manages trigger lifecycle
- Loads triggers from flow definitions
- Activates triggers and initiates flow execution
-
Trigger Registry (
flows/triggers/registry.py):- Plugin system for registering trigger types
- Factory for creating trigger instances
Trigger Base Class
class Trigger(ABC):
"""Abstract base class for flow triggers."""
@property
@abstractmethod
def trigger_type(self) -> TriggerType:
"""Return the type of this trigger."""
pass
@abstractmethod
def should_trigger(self, context: Optional[TriggerContext] = None) -> bool:
"""Check if the trigger condition is met."""
pass
@abstractmethod
def activate(self, context: Optional[TriggerContext] = None) -> TriggerResult:
"""Activate the trigger and initiate flow execution."""
pass
Trigger Types
CLI Trigger
File: flows/triggers/cli_trigger.py
Immediate execution via command line. Always fires when activated.
Configuration:
{
'id': 'cli_trigger',
'type': 'cli',
'flow_id': 'my_flow',
'enabled': True
}
Usage:
qarion-etl trigger --flow-id my_flow
Schedule Trigger
File: flows/triggers/schedule_trigger.py
Time-based execution using cron-like expressions.
Configuration:
{
'id': 'schedule_trigger',
'type': 'schedule',
'flow_id': 'my_flow',
'schedule': '0 0 * * *', # Cron expression
'timezone': 'UTC',
'enabled': True
}
Implementation Notes:
- Currently uses placeholder for cron parsing
- Future: Integrate with
croniteror similar library - Requires scheduler service for automatic execution
Sensor Trigger
File: flows/triggers/sensor_trigger.py
Detects changes in files, directories, or other sources.
Configuration:
{
'id': 'file_sensor',
'type': 'sensor',
'flow_id': 'my_flow',
'sensor_type': 'file', # file, directory, s3, database
'path': '/data/input.csv',
'pattern': '*.csv', # Optional
'event_type': 'created', # created, modified, deleted, all
'enabled': True
}
Supported Sensor Types:
file: Single file monitoringdirectory: Directory monitoring with optional patterns3: S3 bucket monitoring (future)database: Database table change detection (implemented)
Database Sensor Configuration:
{
'id': 'db_sensor',
'type': 'sensor',
'flow_id': 'my_flow',
'sensor_type': 'database',
'table_name': 'orders',
'detection_mode': 'row_count', # row_count, timestamp_column, custom_query
'timestamp_column': 'created_at', # Required for timestamp_column mode
'custom_query': 'SELECT COUNT(*) FROM orders WHERE status = "pending"', # For custom_query mode
'condition': 'status = "new"', # Optional WHERE clause
'engine_config': { # Optional - uses project engine if not provided
'name': 'postgresql',
'config': {...}
},
'enabled': True
}
Detection Modes:
row_count: Triggers when the row count changestimestamp_column: Triggers when new/modified rows are detected based on a timestamp columncustom_query: Triggers when the result of a custom SQL query changes
Flow Trigger
File: flows/triggers/flow_trigger.py
Triggers based on other flow execution.
Configuration:
{
'id': 'flow_trigger',
'type': 'flow',
'flow_id': 'downstream_flow',
'source_flow_id': 'upstream_flow',
'source_task_id': 'task_id', # Optional
'condition': 'success', # success, failure, always
'enabled': True
}
Conditions:
success: Trigger only if source flow succeedsfailure: Trigger only if source flow failsalways: Trigger regardless of result
Trigger Service
The TriggerService manages trigger lifecycle and execution:
class TriggerService:
"""Service for managing and executing flow triggers."""
def load_triggers_from_flow(self, flow_definition: Dict[str, Any]) -> List[Trigger]:
"""Load triggers from a flow definition."""
pass
def trigger_flow(
self,
flow_id: str,
trigger_type: Optional[str] = None,
trigger_id: Optional[str] = None,
context: Optional[TriggerContext] = None
) -> TriggerResult:
"""Trigger a flow execution."""
pass
Trigger Registry
The registry provides plugin system for trigger types:
class TriggerRegistry:
"""Registry for trigger type implementations."""
@classmethod
def register_trigger(cls, trigger_type: str, trigger_class: Type[Trigger]):
"""Register a trigger type."""
pass
@classmethod
def create_trigger(cls, config: Dict) -> Trigger:
"""Create a trigger instance from configuration."""
pass
Creating Custom Triggers
Example: Custom API Trigger
from flows.triggers.base import Trigger, TriggerType, TriggerContext, TriggerResult
from flows.triggers.registry import register_trigger
class APITrigger(Trigger):
"""Trigger for HTTP API calls."""
@property
def trigger_type(self) -> TriggerType:
return TriggerType.API
def should_trigger(self, context: Optional[TriggerContext] = None) -> bool:
# Check if API request is valid
return True
def activate(self, context: Optional[TriggerContext] = None) -> TriggerResult:
# Process API request and trigger flow
return TriggerResult(
success=True,
trigger_id=self.trigger_id,
flow_id=self.flow_id,
message="API trigger activated"
)
# Register the trigger
register_trigger('api', APITrigger)
Integration with Flow Execution
Triggers integrate with FlowExecutionService:
# Load triggers from flow
trigger_service = TriggerService(execution_service)
triggers = trigger_service.load_triggers_from_flow(flow_definition)
# Activate trigger
result = trigger_service.trigger_flow(
flow_id='my_flow',
trigger_id='cli_trigger'
)
# If successful, execute flow
if result.success:
execution_plan = execution_service.plan_flow_execution(...)
execution_result = execution_service.execute_flow(execution_plan)
Flow Definition Schema
Triggers are defined in flow definitions:
id = "my_flow"
flow_type = "standard"
[[triggers]]
id = "trigger_id"
type = "cli" # cli, schedule, sensor, flow, api
enabled = true
description = "Optional description"
# Type-specific configuration
schedule = "0 0 * * *" # For schedule triggers
path = "/data/file.csv" # For sensor triggers
source_flow_id = "upstream" # For flow triggers
CLI Integration
The qarion-etl trigger command activates triggers:
@click.command(name="trigger")
@click.option('--flow-id', required=True)
@click.option('--trigger-id', default=None)
@click.option('--batch-id', default=1)
def cli_trigger(flow_id, trigger_id, batch_id):
"""Trigger a flow execution."""
# Load flow and triggers
# Activate trigger
# Execute flow
Future Enhancements
-
Schedule Implementation:
- Full cron expression parsing
- Timezone support
- Scheduler service integration
-
Sensor Enhancements:
- S3 bucket monitoring
- Database change detection
- Message queue integration
-
API Triggers:
- HTTP endpoint implementation
- Authentication/authorization
- Webhook support
-
Trigger Chains:
- Complex trigger dependencies
- Conditional trigger logic
- Trigger groups
Testing Triggers
Unit Testing
def test_cli_trigger():
trigger = CLITrigger({
'id': 'test_trigger',
'flow_id': 'test_flow',
'enabled': True
})
result = trigger.activate()
assert result.success
assert result.flow_id == 'test_flow'
Integration Testing
def test_trigger_service():
service = TriggerService(execution_service)
triggers = service.load_triggers_from_flow(flow_definition)
result = service.trigger_flow('my_flow')
assert result.success