Skip to main content

Task System in Qarion ETL

Overview

The task system in Qarion ETL provides a unified, extensible way to define operations in data flows. Tasks represent logical operations like ingestion, transformation, quality checks, and exports. Each task type has explicit typed properties that define its purpose and behavior.

Task Architecture

Plugin-Based Design

Task types are implemented using a plugin architecture, making it easy to extend the system with custom task types without modifying core code. The architecture consists of:

  1. Task Definitions: Concrete task classes (IngestionTask, TransformationTask, etc.)
  2. Task Properties: Typed property classes for each task type
  3. Task Type Plugins: Plugin classes that define validation and behavior
  4. Plugin Registry: Central registry for discovering and managing task types

Defining Tasks in Standard Flows

In Standard flows, tasks are defined directly in the flow definition using TOML:

# flows/my_standard_flow.toml
id = "my_standard_flow"
name = "My Standard Flow"
flow_type = "standard"
namespace = "production"

[input]
primary_key = ["id"]
columns = [
{ name = "id", schema_type = "integer" },
{ name = "name", schema_type = "string" },
{ name = "amount", schema_type = "float" }
]

[[tasks]]
id = "ingest_data"
name = "Ingest Data"
type = "ingestion"
target_dataset_id = "landing_table"
[tasks.properties]
operation = "ingestion"
target_table_type = "landing"
processing_type = "FULL_REFRESH"
[tasks.config]
path = "data/input.csv"
format = "csv"

[[tasks]]
id = "transform_data"
name = "Transform Data"
type = "transformation"
source_dataset_id = "landing_table"
target_dataset_id = "staging_table"
dependencies = ["ingest_data"]
[tasks.properties]
operation = "landing_to_staging"
source_table_type = "landing"
target_table_type = "staging"
processing_type = "INCREMENTAL"
[tasks.config]
sql = """
SELECT
id,
UPPER(name) as name,
amount * 1.1 as adjusted_amount
FROM {{ source_dataset }}
WHERE amount > 0
"""

[[tasks]]
id = "quality_check"
name = "Quality Check"
type = "dq_check"
source_dataset_id = "staging_table"
dependencies = ["transform_data"]
[tasks.properties]
operation = "quality_check"
table_type = "staging"
[tasks.properties.quality_checks]
[[tasks.properties.quality_checks]]
check_id = "completeness_check"
check_name = "Completeness Check"
check_type = "completeness"
enabled = true
[tasks.properties.quality_checks.config]
columns = ["id", "name", "adjusted_amount"]
threshold = 0.95
allow_null = false

[[tasks]]
id = "export_data"
name = "Export Data"
type = "export"
source_dataset_id = "staging_table"
dependencies = ["quality_check"]
[tasks.properties]
operation = "export"
export_type = "file"
destination = "exports/staging_data.csv"
format = "csv"
[tasks.config]
include_header = true
delimiter = ","

Processing Types

Every task has a processing type that determines how data is processed. This is a critical concept for understanding pipeline behavior, performance, and data processing strategies.

FULL_REFRESH

Rebuilds the entire dataset from scratch. All existing data is replaced with new data.

When to Use:

  • You need to completely refresh the dataset
  • Data volume is small enough for full rebuilds
  • You want to ensure data consistency by starting fresh
  • You're reprocessing historical data from scratch
  • You've fixed a bug and need to rebuild with corrected logic
  • You're doing initial data load

How It Works:

  • Target table is truncated or dropped and recreated
  • All data is processed from source, regardless of previous runs
  • No dependency on previous batch IDs
  • Complete historical reprocessing is possible

Example Use Cases:

  • Initial data load
  • Complete data refresh after schema changes
  • Reprocessing all historical data with corrected logic
  • Small reference datasets that change infrequently

INCREMENTAL

Processes only new or changed data since the last run. Existing data is preserved and updated.

When to Use:

  • You have large datasets where full refresh is expensive
  • You only receive new/changed records
  • You want to optimize processing time and resources
  • You're tracking changes over time (change feeds, SCD2, etc.)
  • You need to maintain historical state

How It Works:

  • Only processes records from the current batch
  • Compares with previous batch to identify changes
  • Uses merge/upsert strategies to update existing records
  • Requires previous_batch_id to determine what's new/changed
  • Preserves historical data while updating current state

Example Use Cases:

  • Daily incremental loads from source systems
  • Change tracking (change feeds, SCD2)
  • Event processing (outbox pattern)
  • Large fact tables that grow over time

Automatic Processing Type Assignment

The processing type is automatically determined by flow plugins based on the transformation pattern:

  • Change Feed flows: INCREMENTAL (merge strategy) - Tracks changes between batches
  • SCD2 flows: INCREMENTAL (merge strategy) - Maintains historical versions
  • Outbox flows: INCREMENTAL (insert strategy) - Always inserts new events
  • Delta Publishing flows: INCREMENTAL (merge strategy) - Processes transactions incrementally
  • Standard flows: Can be either, depending on task configuration

Forcing Full Refresh

Even for incremental tasks, you can force a full refresh when needed:

Via Execution API:

from qarion_etl.flows.execution import FlowExecutionService

service = FlowExecutionService(engine, transformation_service)
result = service.execute_flow(
execution_plan=plan,
batch_id=1,
force_full_refresh=True # Forces full refresh even for incremental tasks
)

Via Code Generation:

  • Set previous_batch_id=None when generating code
  • This triggers first batch scenario, which processes all data

When to Force Full Refresh:

  • Reprocessing all historical data
  • Fixing data quality issues
  • Schema changes requiring complete rebuild
  • Initial data load
  • Testing with complete dataset

Important Notes:

  • Full refresh processes ALL data from source, not just current batch
  • For historical reprocessing, ensure source data contains all historical records
  • Full refresh may take significantly longer than incremental processing
  • Use with caution on large datasets

Task Types

Qarion ETL provides six built-in task types:

1. Ingestion Tasks

Purpose: Load data from external sources into target datasets.

Properties:

  • operation: Type of ingestion operation (ingestion, load, import)
  • target_table_type: Type of table being populated (e.g., 'landing', 'staging')
  • source_connector_id: Optional connector ID for the source system
  • processing_type: Type of processing (FULL_REFRESH or INCREMENTAL) - Important: Determines if data is fully refreshed or incrementally loaded
  • load_strategy: Optional load strategy (deprecated, use processing_type instead)
  • custom_properties: Additional flow-specific properties

TOML Configuration Example:

[[tasks]]
id = "ingest_orders"
name = "Ingest Orders"
type = "ingestion"
target_dataset_id = "orders_landing"
[tasks.properties]
operation = "ingestion"
target_table_type = "landing"
processing_type = "FULL_REFRESH"
[tasks.config]
path = "data/orders"
pattern = "orders_*.csv"
format = "csv"

Python API Example:

from qarion_etl.flows.tasks import IngestionTask, IngestionTaskProperties, IngestionOperation

props = IngestionTaskProperties(
operation=IngestionOperation.INGESTION,
target_table_type='landing'
)

task = IngestionTask(
id='ingestion',
name='Load Data',
target_dataset_id='landing_dataset',
task_properties=props
)

2. Transformation Tasks

Purpose: Transform data from source dataset(s) to target dataset.

Properties:

  • operation: Type of transformation (e.g., 'landing_to_staging', 'sessionization')
  • source_table_type: Type of source table (e.g., 'landing', 'staging')
  • target_table_type: Type of target table (e.g., 'change_feed', 'delta_transaction')
  • processing_type: Type of processing (FULL_REFRESH or INCREMENTAL) - Important: Determines if transformation rebuilds entire dataset or processes incrementally
  • source_datasets: List of source dataset IDs for multi-source transformations
  • transformation_type: Optional type classification (e.g., 'sql', 'python', 'dbt')
  • custom_properties: Additional flow-specific properties

TOML Configuration Example:

[[tasks]]
id = "transform_orders"
name = "Transform Orders"
type = "transformation"
source_dataset_id = "orders_landing"
target_dataset_id = "orders_staging"
dependencies = ["ingest_orders"]
[tasks.properties]
operation = "landing_to_staging"
source_table_type = "landing"
target_table_type = "staging"
processing_type = "INCREMENTAL"
[tasks.config]
sql = """
SELECT
id,
customer_id,
amount,
order_date,
CASE
WHEN amount > 1000 THEN 'high_value'
WHEN amount > 100 THEN 'medium_value'
ELSE 'low_value'
END as value_category
FROM {{ source_dataset }}
WHERE order_date >= '2024-01-01'
"""

Python API Example:

from qarion_etl.flows.tasks import TransformationTask, TransformationTaskProperties, TransformationOperation

props = TransformationTaskProperties(
operation=TransformationOperation.LANDING_TO_STAGING,
source_table_type='landing',
target_table_type='staging'
)

task = TransformationTask(
id='landing_to_staging',
name='Transform Landing to Staging',
source_dataset_id='landing_dataset',
target_dataset_id='staging_dataset',
dependencies=['ingestion'],
task_properties=props
)

3. Quality Check Tasks

Purpose: Run data quality checks to validate dataset integrity and quality.

Properties:

  • operation: Type of quality check operation
  • table_type: Type of table being checked (e.g., 'staging', 'publish')
  • quality_checks: List of quality check definitions to run
  • failure_action: Action to take on failure (e.g., 'fail', 'warn', 'continue')
  • custom_properties: Additional flow-specific properties

TOML Configuration Example:

[[tasks]]
id = "quality_check_orders"
name = "Quality Check Orders"
type = "dq_check"
source_dataset_id = "orders_staging"
dependencies = ["transform_orders"]
[tasks.properties]
operation = "quality_check"
table_type = "staging"
stop_on_first_failure = false
[tasks.properties.quality_checks]
[[tasks.properties.quality_checks]]
check_id = "completeness_check"
check_name = "Completeness Check"
check_type = "completeness"
enabled = true
[tasks.properties.quality_checks.config]
columns = ["id", "customer_id", "amount"]
threshold = 0.95
allow_null = false

[[tasks.properties.quality_checks]]
check_id = "uniqueness_check"
check_name = "Uniqueness Check"
check_type = "uniqueness"
enabled = true
[tasks.properties.quality_checks.config]
columns = ["id"]

[[tasks.properties.quality_checks]]
check_id = "range_check"
check_name = "Amount Range Check"
check_type = "range"
enabled = true
[tasks.properties.quality_checks.config]
columns = ["amount"]
min_value = 0
max_value = 1000000

Python API Example:

from qarion_etl.flows.tasks import QualityCheckTask, QualityCheckTaskProperties, QualityCheckOperation

props = QualityCheckTaskProperties(
operation=QualityCheckOperation.QUALITY_CHECK,
table_type='staging',
quality_checks=[
{'type': 'not_null', 'column': 'id'},
{'type': 'unique', 'column': 'id'}
]
)

task = QualityCheckTask(
id='staging_quality_checks',
name='Staging Quality Checks',
source_dataset_id='staging_dataset',
target_dataset_id='staging_dataset', # Quality checks operate on the same dataset
dependencies=['landing_to_staging'],
task_properties=props
)

4. Export Tasks

Purpose: Export data from datasets to external destinations.

Properties:

  • operation: Type of export operation
  • export_type: Type of export destination (e.g., 'file', 'api', 'database')
  • destination: Export destination identifier or configuration
  • export_config: Full export configuration dictionary
  • format: Optional export format (e.g., 'csv', 'parquet', 'json')
  • custom_properties: Additional flow-specific properties

TOML Configuration Example:

[[tasks]]
id = "export_orders"
name = "Export Orders"
type = "export"
source_dataset_id = "orders_staging"
dependencies = ["quality_check_orders"]
[tasks.properties]
operation = "export"
export_type = "file"
destination = "exports/orders.csv"
format = "csv"
[tasks.config]
include_header = true
delimiter = ","
encoding = "utf-8"
columns = ["id", "customer_id", "amount", "order_date"]

Export to Parquet Example:

[[tasks]]
id = "export_orders_parquet"
name = "Export Orders to Parquet"
type = "export"
source_dataset_id = "orders_staging"
dependencies = ["quality_check_orders"]
[tasks.properties]
operation = "export"
export_type = "file"
destination = "exports/orders.parquet"
format = "parquet"
[tasks.config]
compression = "snappy"
columns = ["id", "customer_id", "amount", "order_date", "value_category"]

Python API Example:

from qarion_etl.flows.tasks import ExportTask, ExportTaskProperties, ExportOperation

props = ExportTaskProperties(
operation=ExportOperation.EXPORT,
export_type='file',
destination='/path/to/output.csv',
format='csv'
)

task = ExportTask(
id='export_data',
name='Export to CSV',
source_dataset_id='final_dataset',
task_properties=props
)

5. Flow Trigger Tasks

Purpose: Trigger execution of another flow from within the current flow.

Properties:

  • operation: Type of flow trigger operation (trigger)
  • target_flow_id: ID of the flow to trigger (required)
  • trigger_id: Optional specific trigger ID to use
  • wait_for_completion: Whether to wait for triggered flow to complete (default: true)
  • pass_batch_id: Whether to pass current batch_id to triggered flow (default: true)
  • batch_id: Optional batch_id to use for triggered flow
  • execution_date: Optional execution date for triggered flow
  • variables: Optional variables to pass to triggered flow
  • condition: Condition for triggering ('success', 'failure', 'always') (default: 'success')
  • custom_properties: Additional flow-specific properties

TOML Configuration Example:

[[tasks]]
id = "trigger_child_flow"
name = "Trigger Child Flow"
type = "flow_trigger"
dependencies = ["process_data"]
[tasks.properties]
target_flow_id = "child_flow"
wait_for_completion = true
pass_batch_id = true
condition = "success"
variables = {
min_value = 100
max_value = 1000
}

Python API Example:

from qarion_etl.flows.tasks import FlowTriggerTask, FlowTriggerTaskProperties, FlowTriggerOperation

props = FlowTriggerTaskProperties(
operation=FlowTriggerOperation.TRIGGER,
target_flow_id='child_flow',
wait_for_completion=True,
pass_batch_id=True,
condition='success',
variables={'min_value': 100, 'max_value': 1000}
)

task = FlowTriggerTask(
id='trigger_child',
name='Trigger Child Flow',
dependencies=['process_data'],
task_properties=props
)

For complete documentation, see Flow Trigger Tasks Guide.

6. API Call Tasks

Purpose: Make HTTP API calls with support for authentication and XCom data integration.

Properties:

  • operation: HTTP method (get, post, put, patch, delete)
  • url: API endpoint URL (supports Jinja2 templating)
  • auth_type: Authentication type (none, basic, oauth2, api_key, bearer)
  • credential_id: Credential ID from credential store
  • headers: Custom HTTP headers (supports Jinja2 templating)
  • params: URL query parameters (supports Jinja2 templating)
  • body: Request body (for POST, PUT, PATCH) - supports Jinja2 templating
  • body_format: Body format (json, form, raw, xml)
  • timeout: Request timeout in seconds
  • retry_count: Number of retries on failure
  • retry_delay: Delay between retries in seconds
  • xcom_pull: List of task IDs to pull data from via XCom
  • xcom_pull_key: Key to extract from XCom data
  • response_handling: How to handle response (store, validate, ignore)
  • response_validation: Optional validation rules for response

TOML Configuration Example:

[[tasks]]
id = "call_external_api"
name = "Call External API"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/data"
operation = "get"
auth_type = "basic"
credential_id = "api_credentials"
timeout = 30
retry_count = 3

OAuth2 Example:

[[tasks]]
id = "oauth2_api_call"
name = "OAuth2 API Call"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/protected/data"
operation = "get"
auth_type = "oauth2"
credential_id = "oauth2_credentials"

Using XCom Data:

[[tasks]]
id = "api_with_xcom"
name = "API Call with XCom"
type = "api_call"
dependencies = ["previous_task"]
[tasks.properties]
url = "https://api.example.com/users/{{ xcom_previous_task.user_id }}"
operation = "post"
xcom_pull = ["previous_task"]
body = {
"user_id" = "{{ xcom_previous_task.user_id }}",
"status" = "{{ xcom_previous_task.status }}"
}

For complete documentation, see API Call Tasks Guide.

Task Dependencies

Tasks can define explicit dependencies to control execution order:

TOML Configuration Example:

[[tasks]]
id = "ingest_orders"
type = "ingestion"
target_dataset_id = "orders_landing"
# No dependencies - this is the first task

[[tasks]]
id = "transform_orders"
type = "transformation"
source_dataset_id = "orders_landing"
target_dataset_id = "orders_staging"
dependencies = ["ingest_orders"] # Depends on ingestion task

[[tasks]]
id = "export_orders"
type = "export"
source_dataset_id = "orders_staging"
dependencies = ["transform_orders"] # Depends on transformation task

Python API Example:

task = TransformationTask(
id='transform',
source_dataset_id='source',
target_dataset_id='target',
dependencies=['ingestion'], # This task depends on the 'ingestion' task
task_properties=props
)

FAN IN and FAN OUT Patterns

Tasks support both FAN IN (multiple sources) and FAN OUT (multiple targets) patterns:

FAN IN (Multiple tasks → Single task):

# Task that depends on multiple upstream tasks
[[tasks]]
id = "join_task"
type = "transformation"
target_dataset_id = "joined_data"
dependencies = ["task_a", "task_b", "task_c"] # Multiple dependencies
[tasks.properties]
operation = "join"
source_datasets = ["dataset_a", "dataset_b", "dataset_c"]

Python Example:

# Task that depends on multiple upstream tasks
task = TransformationTask(
id='join_task',
target_dataset_id='joined_data',
dependencies=['task_a', 'task_b', 'task_c'], # Multiple dependencies
task_properties=props
)

FAN OUT (Single task → Multiple tasks):

# Source task feeds into multiple downstream tasks
[[tasks]]
id = "source_task"
type = "transformation"
source_dataset_id = "source"
target_dataset_id = "target"
# Multiple downstream tasks will depend on this one

[[tasks]]
id = "task_a"
type = "transformation"
source_dataset_id = "target"
target_dataset_id = "output_a"
dependencies = ["source_task"] # Depends on source_task

[[tasks]]
id = "task_b"
type = "transformation"
source_dataset_id = "target"
target_dataset_id = "output_b"
dependencies = ["source_task"] # Also depends on source_task

Python Example:

# Task that feeds into multiple downstream tasks
task = TransformationTask(
id='source_task',
source_dataset_id='source',
target_dataset_id='target',
downstream_tasks=['task_a', 'task_b'], # Multiple downstream tasks
task_properties=props
)

Creating Custom Task Types

You can extend the task system by creating custom task type plugins. This allows you to define new task types with specific properties and validation logic.

Step 1: Define Properties Class

from dataclasses import dataclass
from typing import Dict, Any, Optional

@dataclass
class CustomTaskProperties:
"""Properties for custom task type."""
custom_field: str
optional_field: Optional[str] = None

def to_dict(self) -> Dict[str, Any]:
return {
'custom_field': self.custom_field,
'optional_field': self.optional_field,
}

@classmethod
def from_dict(cls, props_dict: Dict[str, Any]):
return cls(
custom_field=props_dict.get('custom_field', 'default'),
optional_field=props_dict.get('optional_field'),
)

Step 2: Create Plugin Class

from qarion_etl.flows.tasks.base import TaskTypePlugin

class CustomTaskTypePlugin(TaskTypePlugin):
"""Plugin for custom task type."""

@property
def task_type(self) -> str:
return 'custom_task'

@property
def name(self) -> str:
return 'Custom Task'

@property
def description(self) -> str:
return 'Performs a custom operation'

@property
def properties_class(self):
return CustomTaskProperties

def validate_task(self, task: 'FlowTask') -> bool:
"""Validate custom task."""
super().validate_task(task)
# Add custom validation logic
props = task.task_properties
if not props.custom_field:
raise ValueError("custom_field is required")
return True

Step 3: Register the Plugin

from qarion_etl.flows.tasks.registry import register_task_type

register_task_type(CustomTaskTypePlugin())

For more detailed information on creating custom task types, see the Task Type Plugin Architecture guide.

Using the Plugin Registry

The plugin registry provides functions to discover and use task types:

from qarion_etl.flows.tasks import (
get_task_type,
list_task_types,
has_task_type
)

# List all available task types
types = list_task_types()
print(types) # ['ingestion', 'transformation', 'dq_check', 'export']

# Get a specific plugin
plugin = get_task_type('ingestion')
print(plugin.name) # "Ingestion"
print(plugin.description) # "Loads data from external sources..."

# Check if a task type exists
if has_task_type('custom_task'):
print("Custom task type is available")

Task Serialization

Tasks can be converted to dictionaries and back:

# Convert task to dictionary
task_dict = task.to_dict()

# Recreate task from dictionary
task = FlowTask.from_dict(task_dict)

This is useful for:

  • Storing tasks in configuration files
  • Serializing tasks for API responses
  • Persisting task definitions

Integration with Flows

Tasks are used internally by flow plugins to define the logical steps in a flow. When a flow generates a DAG, it:

  1. Creates task definitions for each step
  2. Converts tasks to DAG nodes
  3. Builds dependencies between nodes
  4. Validates the DAG structure

You typically don't need to create tasks directly when using flows - the flow plugins handle this automatically. However, understanding the task system is useful for:

  • Creating custom flow plugins
  • Understanding how flows work internally
  • Extending the system with new task types

Best Practices

  1. Use explicit properties: Always use typed property classes rather than generic dictionaries
  2. Define clear dependencies: Explicitly declare task dependencies for better clarity
  3. Validate early: Use plugin validation to catch errors early
  4. Document custom task types: Provide clear descriptions and examples for custom types
  5. Follow naming conventions: Use consistent naming for task IDs and types

For developers: