Task Type Plugin Architecture
Overview
The task system in Qarion ETL has been refactored to use a plugin-based architecture, making it extensible and maintainable. This document describes the architecture, how it works, and how to extend it.
Architecture Components
1. Task Definitions (flows/tasks/definitions.py)
Concrete task classes that represent logical operations:
FlowTask: Base class for all tasksIngestionTask: Loads data from external sourcesTransformationTask: Transforms data between datasetsQualityCheckTask: Runs data quality checksExportTask: Exports data to external destinations
2. Task Properties (flows/tasks/properties.py)
Typed property classes for each task type:
IngestionTaskProperties: Properties for ingestion tasksTransformationTaskProperties: Properties for transformation tasksQualityCheckTaskProperties: Properties for quality check tasksExportTaskProperties: Properties for export tasks
Each property class includes:
- Operation enums (e.g.,
IngestionOperation,TransformationOperation) - Typed fields with clear purposes
- Serialization/deserialization methods
- Custom properties support for extensibility
3. Task Type Plugins (flows/tasks/base.py, flows/tasks/plugins/)
Plugin system for task types:
TaskType: Abstract base class for task typesTaskTypePlugin: Plugin interface with validation and property creation- Built-in plugins in
flows/tasks/plugins/builtin.py:IngestionTaskTypePluginTransformationTaskTypePluginQualityCheckTaskTypePluginExportTaskTypePlugin
4. Plugin Registry (flows/tasks/registry.py)
Central registry for managing task type plugins:
TaskTypeRegistry: Registry class- Global functions:
register_task_type(),get_task_type(),list_task_types(), etc. - Automatic plugin discovery on import
Key Features
Explicit Typed Properties
Each task type has explicit, typed properties instead of generic dictionaries:
# Before (generic dict)
task = IngestionTask(
id='ingest',
properties={'operation': 'ingestion', 'target_table_type': 'landing'}
)
# After (typed properties)
props = IngestionTaskProperties(
operation=IngestionOperation.INGESTION,
target_table_type='landing'
)
task = IngestionTask(id='ingest', task_properties=props)
Plugin-Based Validation
Validation is handled by plugins, allowing custom validation logic:
plugin = get_task_type('ingestion')
plugin.validate_task(task) # Uses plugin's validation logic
Extensibility
New task types can be added without modifying core code:
class CustomTaskTypePlugin(TaskTypePlugin):
@property
def task_type(self) -> str:
return 'custom_task'
# ... implement required methods
register_task_type(CustomTaskTypePlugin())
Backward Compatibility
The system maintains backward compatibility:
task.propertiesstill returns a dictionary- Tasks can be created from dictionaries
- Existing code continues to work
Usage Examples
Creating Tasks
from qarion_etl.flows.tasks import (
IngestionTask,
IngestionTaskProperties,
IngestionOperation
)
# Create properties
props = IngestionTaskProperties(
operation=IngestionOperation.INGESTION,
target_table_type='landing'
)
# Create task
task = IngestionTask(
id='ingestion',
name='Load Data',
target_dataset_id='landing_dataset',
task_properties=props
)
Using the Plugin Registry
from qarion_etl.flows.tasks import get_task_type, list_task_types
# List all available task types
types = list_task_types()
# ['ingestion', 'transformation', 'dq_check', 'export']
# Get a plugin
plugin = get_task_type('ingestion')
print(plugin.name) # "Ingestion"
print(plugin.description) # "Loads data from external sources..."
Creating Custom Task Types
See the Task System Guide for detailed examples of creating custom task types.
Migration Guide
For Task Generation Code
Task generation code has been updated to use explicit properties:
# Old way (still works via backward compatibility)
task = IngestionTask(
id='ingest',
target_dataset_id='dataset',
properties={'operation': 'ingestion'}
)
# New way (recommended)
props = IngestionTaskProperties(
operation=IngestionOperation.INGESTION
)
task = IngestionTask(
id='ingest',
target_dataset_id='dataset',
task_properties=props
)
For Code That Accesses Properties
Code that accesses task.properties continues to work:
# This still works
operation = task.properties.get('operation')
However, you can now access typed properties directly:
# Recommended for new code
operation = task.task_properties.operation
Testing
Tests have been updated to reflect the new architecture:
- Task creation tests use explicit properties
- Plugin registry tests verify plugin discovery
- Validation tests use plugin validation
Some tests may need updates if they relied on specific validation behavior that has changed (e.g., transformation tasks can now have None source_dataset_id for multi-source transformations).
Creating Custom Task Type Plugins
Complete Example
from qarion_etl.flows.tasks.base import TaskTypePlugin
from dataclasses import dataclass
from typing import Dict, Any, Optional, TYPE_CHECKING
if TYPE_CHECKING:
from qarion_etl.flows.tasks.definitions import FlowTask
# Step 1: Define properties class
@dataclass
class CustomTaskProperties:
"""Properties for custom task type."""
custom_field: str
optional_field: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
'custom_field': self.custom_field,
'optional_field': self.optional_field,
}
@classmethod
def from_dict(cls, props_dict: Dict[str, Any]):
return cls(
custom_field=props_dict.get('custom_field', 'default'),
optional_field=props_dict.get('optional_field'),
)
# Step 2: Create plugin class
class CustomTaskTypePlugin(TaskTypePlugin):
"""Plugin for custom task type."""
@property
def task_type(self) -> str:
return 'custom_task'
@property
def name(self) -> str:
return 'Custom Task'
@property
def description(self) -> str:
return 'Performs a custom operation'
@property
def properties_class(self):
return CustomTaskProperties
def validate_task(self, task: 'FlowTask') -> bool:
"""Validate custom task."""
super().validate_task(task)
props = task.task_properties
if not props.custom_field:
raise ValueError("custom_field is required")
return True
# Step 3: Register the plugin
from qarion_etl.flows.tasks.registry import register_task_type
register_task_type(CustomTaskTypePlugin())
Plugin Discovery
Plugins can be automatically discovered by placing them in the qarion_etl/flows/tasks/plugins/ directory. The plugin system will automatically register them when the module is imported.
Documentation
- Task System Guide - User-facing documentation with complete examples
- Plugin Interfaces Reference - Complete API reference
- Plugin System Overview - Overview of all plugin types
Benefits
- Type Safety: Typed properties provide better IDE support and catch errors early
- Extensibility: Easy to add new task types via plugins
- Maintainability: Clear separation of concerns
- Documentation: Properties are self-documenting with clear purposes
- Backward Compatibility: Existing code continues to work