Skip to main content

Task Type Plugin Architecture

Overview

The task system in Qarion ETL has been refactored to use a plugin-based architecture, making it extensible and maintainable. This document describes the architecture, how it works, and how to extend it.

Architecture Components

1. Task Definitions (flows/tasks/definitions.py)

Concrete task classes that represent logical operations:

  • FlowTask: Base class for all tasks
  • IngestionTask: Loads data from external sources
  • TransformationTask: Transforms data between datasets
  • QualityCheckTask: Runs data quality checks
  • ExportTask: Exports data to external destinations

2. Task Properties (flows/tasks/properties.py)

Typed property classes for each task type:

  • IngestionTaskProperties: Properties for ingestion tasks
  • TransformationTaskProperties: Properties for transformation tasks
  • QualityCheckTaskProperties: Properties for quality check tasks
  • ExportTaskProperties: Properties for export tasks

Each property class includes:

  • Operation enums (e.g., IngestionOperation, TransformationOperation)
  • Typed fields with clear purposes
  • Serialization/deserialization methods
  • Custom properties support for extensibility

3. Task Type Plugins (flows/tasks/base.py, flows/tasks/plugins/)

Plugin system for task types:

  • TaskType: Abstract base class for task types
  • TaskTypePlugin: Plugin interface with validation and property creation
  • Built-in plugins in flows/tasks/plugins/builtin.py:
    • IngestionTaskTypePlugin
    • TransformationTaskTypePlugin
    • QualityCheckTaskTypePlugin
    • ExportTaskTypePlugin

4. Plugin Registry (flows/tasks/registry.py)

Central registry for managing task type plugins:

  • TaskTypeRegistry: Registry class
  • Global functions: register_task_type(), get_task_type(), list_task_types(), etc.
  • Automatic plugin discovery on import

Key Features

Explicit Typed Properties

Each task type has explicit, typed properties instead of generic dictionaries:

# Before (generic dict)
task = IngestionTask(
id='ingest',
properties={'operation': 'ingestion', 'target_table_type': 'landing'}
)

# After (typed properties)
props = IngestionTaskProperties(
operation=IngestionOperation.INGESTION,
target_table_type='landing'
)
task = IngestionTask(id='ingest', task_properties=props)

Plugin-Based Validation

Validation is handled by plugins, allowing custom validation logic:

plugin = get_task_type('ingestion')
plugin.validate_task(task) # Uses plugin's validation logic

Extensibility

New task types can be added without modifying core code:

class CustomTaskTypePlugin(TaskTypePlugin):
@property
def task_type(self) -> str:
return 'custom_task'
# ... implement required methods

register_task_type(CustomTaskTypePlugin())

Backward Compatibility

The system maintains backward compatibility:

  • task.properties still returns a dictionary
  • Tasks can be created from dictionaries
  • Existing code continues to work

Usage Examples

Creating Tasks

from qarion_etl.flows.tasks import (
IngestionTask,
IngestionTaskProperties,
IngestionOperation
)

# Create properties
props = IngestionTaskProperties(
operation=IngestionOperation.INGESTION,
target_table_type='landing'
)

# Create task
task = IngestionTask(
id='ingestion',
name='Load Data',
target_dataset_id='landing_dataset',
task_properties=props
)

Using the Plugin Registry

from qarion_etl.flows.tasks import get_task_type, list_task_types

# List all available task types
types = list_task_types()
# ['ingestion', 'transformation', 'dq_check', 'export']

# Get a plugin
plugin = get_task_type('ingestion')
print(plugin.name) # "Ingestion"
print(plugin.description) # "Loads data from external sources..."

Creating Custom Task Types

See the Task System Guide for detailed examples of creating custom task types.

Migration Guide

For Task Generation Code

Task generation code has been updated to use explicit properties:

# Old way (still works via backward compatibility)
task = IngestionTask(
id='ingest',
target_dataset_id='dataset',
properties={'operation': 'ingestion'}
)

# New way (recommended)
props = IngestionTaskProperties(
operation=IngestionOperation.INGESTION
)
task = IngestionTask(
id='ingest',
target_dataset_id='dataset',
task_properties=props
)

For Code That Accesses Properties

Code that accesses task.properties continues to work:

# This still works
operation = task.properties.get('operation')

However, you can now access typed properties directly:

# Recommended for new code
operation = task.task_properties.operation

Testing

Tests have been updated to reflect the new architecture:

  • Task creation tests use explicit properties
  • Plugin registry tests verify plugin discovery
  • Validation tests use plugin validation

Some tests may need updates if they relied on specific validation behavior that has changed (e.g., transformation tasks can now have None source_dataset_id for multi-source transformations).

Creating Custom Task Type Plugins

Complete Example

from qarion_etl.flows.tasks.base import TaskTypePlugin
from dataclasses import dataclass
from typing import Dict, Any, Optional, TYPE_CHECKING

if TYPE_CHECKING:
from qarion_etl.flows.tasks.definitions import FlowTask

# Step 1: Define properties class
@dataclass
class CustomTaskProperties:
"""Properties for custom task type."""
custom_field: str
optional_field: Optional[str] = None

def to_dict(self) -> Dict[str, Any]:
return {
'custom_field': self.custom_field,
'optional_field': self.optional_field,
}

@classmethod
def from_dict(cls, props_dict: Dict[str, Any]):
return cls(
custom_field=props_dict.get('custom_field', 'default'),
optional_field=props_dict.get('optional_field'),
)

# Step 2: Create plugin class
class CustomTaskTypePlugin(TaskTypePlugin):
"""Plugin for custom task type."""

@property
def task_type(self) -> str:
return 'custom_task'

@property
def name(self) -> str:
return 'Custom Task'

@property
def description(self) -> str:
return 'Performs a custom operation'

@property
def properties_class(self):
return CustomTaskProperties

def validate_task(self, task: 'FlowTask') -> bool:
"""Validate custom task."""
super().validate_task(task)
props = task.task_properties
if not props.custom_field:
raise ValueError("custom_field is required")
return True

# Step 3: Register the plugin
from qarion_etl.flows.tasks.registry import register_task_type

register_task_type(CustomTaskTypePlugin())

Plugin Discovery

Plugins can be automatically discovered by placing them in the qarion_etl/flows/tasks/plugins/ directory. The plugin system will automatically register them when the module is imported.

Documentation

Benefits

  1. Type Safety: Typed properties provide better IDE support and catch errors early
  2. Extensibility: Easy to add new task types via plugins
  3. Maintainability: Clear separation of concerns
  4. Documentation: Properties are self-documenting with clear purposes
  5. Backward Compatibility: Existing code continues to work