Task System in Qarion ETL
Overview
The task system in Qarion ETL provides a unified, extensible way to define operations in data flows. Tasks represent logical operations like ingestion, transformation, quality checks, and exports. Each task type has explicit typed properties that define its purpose and behavior.
Task Architecture
Plugin-Based Design
Task types are implemented using a plugin architecture, making it easy to extend the system with custom task types without modifying core code. The architecture consists of:
- Task Definitions: Concrete task classes (IngestionTask, TransformationTask, etc.)
- Task Properties: Typed property classes for each task type
- Task Type Plugins: Plugin classes that define validation and behavior
- Plugin Registry: Central registry for discovering and managing task types
Defining Tasks in Standard Flows
In Standard flows, tasks are defined directly in the flow definition using TOML:
# flows/my_standard_flow.toml
id = "my_standard_flow"
name = "My Standard Flow"
flow_type = "standard"
namespace = "production"
[input]
primary_key = ["id"]
columns = [
{ name = "id", schema_type = "integer" },
{ name = "name", schema_type = "string" },
{ name = "amount", schema_type = "float" }
]
[[tasks]]
id = "ingest_data"
name = "Ingest Data"
type = "ingestion"
target_dataset_id = "landing_table"
[tasks.properties]
operation = "ingestion"
target_table_type = "landing"
processing_type = "FULL_REFRESH"
[tasks.config]
path = "data/input.csv"
format = "csv"
[[tasks]]
id = "transform_data"
name = "Transform Data"
type = "transformation"
source_dataset_id = "landing_table"
target_dataset_id = "staging_table"
dependencies = ["ingest_data"]
[tasks.properties]
operation = "landing_to_staging"
source_table_type = "landing"
target_table_type = "staging"
processing_type = "INCREMENTAL"
[tasks.config]
sql = """
SELECT
id,
UPPER(name) as name,
amount * 1.1 as adjusted_amount
FROM {{ source_dataset }}
WHERE amount > 0
"""
[[tasks]]
id = "quality_check"
name = "Quality Check"
type = "dq_check"
source_dataset_id = "staging_table"
dependencies = ["transform_data"]
[tasks.properties]
operation = "quality_check"
table_type = "staging"
[tasks.properties.quality_checks]
[[tasks.properties.quality_checks]]
check_id = "completeness_check"
check_name = "Completeness Check"
check_type = "completeness"
enabled = true
[tasks.properties.quality_checks.config]
columns = ["id", "name", "adjusted_amount"]
threshold = 0.95
allow_null = false
[[tasks]]
id = "export_data"
name = "Export Data"
type = "export"
source_dataset_id = "staging_table"
dependencies = ["quality_check"]
[tasks.properties]
operation = "export"
export_type = "file"
destination = "exports/staging_data.csv"
format = "csv"
[tasks.config]
include_header = true
delimiter = ","
Processing Types
Every task has a processing type that determines how data is processed. This is a critical concept for understanding pipeline behavior, performance, and data processing strategies.
FULL_REFRESH
Rebuilds the entire dataset from scratch. All existing data is replaced with new data.
When to Use:
- You need to completely refresh the dataset
- Data volume is small enough for full rebuilds
- You want to ensure data consistency by starting fresh
- You're reprocessing historical data from scratch
- You've fixed a bug and need to rebuild with corrected logic
- You're doing initial data load
How It Works:
- Target table is truncated or dropped and recreated
- All data is processed from source, regardless of previous runs
- No dependency on previous batch IDs
- Complete historical reprocessing is possible
Example Use Cases:
- Initial data load
- Complete data refresh after schema changes
- Reprocessing all historical data with corrected logic
- Small reference datasets that change infrequently
INCREMENTAL
Processes only new or changed data since the last run. Existing data is preserved and updated.
When to Use:
- You have large datasets where full refresh is expensive
- You only receive new/changed records
- You want to optimize processing time and resources
- You're tracking changes over time (change feeds, SCD2, etc.)
- You need to maintain historical state
How It Works:
- Only processes records from the current batch
- Compares with previous batch to identify changes
- Uses merge/upsert strategies to update existing records
- Requires
previous_batch_idto determine what's new/changed - Preserves historical data while updating current state
Example Use Cases:
- Daily incremental loads from source systems
- Change tracking (change feeds, SCD2)
- Event processing (outbox pattern)
- Large fact tables that grow over time
Automatic Processing Type Assignment
The processing type is automatically determined by flow plugins based on the transformation pattern:
- Change Feed flows:
INCREMENTAL(merge strategy) - Tracks changes between batches - SCD2 flows:
INCREMENTAL(merge strategy) - Maintains historical versions - Outbox flows:
INCREMENTAL(insert strategy) - Always inserts new events - Delta Publishing flows:
INCREMENTAL(merge strategy) - Processes transactions incrementally - Standard flows: Can be either, depending on task configuration
Forcing Full Refresh
Even for incremental tasks, you can force a full refresh when needed:
Via Execution API:
from qarion_etl.flows.execution import FlowExecutionService
service = FlowExecutionService(engine, transformation_service)
result = service.execute_flow(
execution_plan=plan,
batch_id=1,
force_full_refresh=True # Forces full refresh even for incremental tasks
)
Via Code Generation:
- Set
previous_batch_id=Nonewhen generating code - This triggers first batch scenario, which processes all data
When to Force Full Refresh:
- Reprocessing all historical data
- Fixing data quality issues
- Schema changes requiring complete rebuild
- Initial data load
- Testing with complete dataset
Important Notes:
- Full refresh processes ALL data from source, not just current batch
- For historical reprocessing, ensure source data contains all historical records
- Full refresh may take significantly longer than incremental processing
- Use with caution on large datasets
Task Types
Qarion ETL provides six built-in task types:
1. Ingestion Tasks
Purpose: Load data from external sources into target datasets.
Properties:
operation: Type of ingestion operation (ingestion, load, import)target_table_type: Type of table being populated (e.g., 'landing', 'staging')source_connector_id: Optional connector ID for the source systemprocessing_type: Type of processing (FULL_REFRESH or INCREMENTAL) - Important: Determines if data is fully refreshed or incrementally loadedload_strategy: Optional load strategy (deprecated, useprocessing_typeinstead)custom_properties: Additional flow-specific properties
TOML Configuration Example:
[[tasks]]
id = "ingest_orders"
name = "Ingest Orders"
type = "ingestion"
target_dataset_id = "orders_landing"
[tasks.properties]
operation = "ingestion"
target_table_type = "landing"
processing_type = "FULL_REFRESH"
[tasks.config]
path = "data/orders"
pattern = "orders_*.csv"
format = "csv"
Python API Example:
from qarion_etl.flows.tasks import IngestionTask, IngestionTaskProperties, IngestionOperation
props = IngestionTaskProperties(
operation=IngestionOperation.INGESTION,
target_table_type='landing'
)
task = IngestionTask(
id='ingestion',
name='Load Data',
target_dataset_id='landing_dataset',
task_properties=props
)
2. Transformation Tasks
Purpose: Transform data from source dataset(s) to target dataset.
Properties:
operation: Type of transformation (e.g., 'landing_to_staging', 'sessionization')source_table_type: Type of source table (e.g., 'landing', 'staging')target_table_type: Type of target table (e.g., 'change_feed', 'delta_transaction')processing_type: Type of processing (FULL_REFRESH or INCREMENTAL) - Important: Determines if transformation rebuilds entire dataset or processes incrementallysource_datasets: List of source dataset IDs for multi-source transformationstransformation_type: Optional type classification (e.g., 'sql', 'python', 'dbt')custom_properties: Additional flow-specific properties
TOML Configuration Example:
[[tasks]]
id = "transform_orders"
name = "Transform Orders"
type = "transformation"
source_dataset_id = "orders_landing"
target_dataset_id = "orders_staging"
dependencies = ["ingest_orders"]
[tasks.properties]
operation = "landing_to_staging"
source_table_type = "landing"
target_table_type = "staging"
processing_type = "INCREMENTAL"
[tasks.config]
sql = """
SELECT
id,
customer_id,
amount,
order_date,
CASE
WHEN amount > 1000 THEN 'high_value'
WHEN amount > 100 THEN 'medium_value'
ELSE 'low_value'
END as value_category
FROM {{ source_dataset }}
WHERE order_date >= '2024-01-01'
"""
Python API Example:
from qarion_etl.flows.tasks import TransformationTask, TransformationTaskProperties, TransformationOperation
props = TransformationTaskProperties(
operation=TransformationOperation.LANDING_TO_STAGING,
source_table_type='landing',
target_table_type='staging'
)
task = TransformationTask(
id='landing_to_staging',
name='Transform Landing to Staging',
source_dataset_id='landing_dataset',
target_dataset_id='staging_dataset',
dependencies=['ingestion'],
task_properties=props
)
3. Quality Check Tasks
Purpose: Run data quality checks to validate dataset integrity and quality.
Properties:
operation: Type of quality check operationtable_type: Type of table being checked (e.g., 'staging', 'publish')quality_checks: List of quality check definitions to runfailure_action: Action to take on failure (e.g., 'fail', 'warn', 'continue')custom_properties: Additional flow-specific properties
TOML Configuration Example:
[[tasks]]
id = "quality_check_orders"
name = "Quality Check Orders"
type = "dq_check"
source_dataset_id = "orders_staging"
dependencies = ["transform_orders"]
[tasks.properties]
operation = "quality_check"
table_type = "staging"
stop_on_first_failure = false
[tasks.properties.quality_checks]
[[tasks.properties.quality_checks]]
check_id = "completeness_check"
check_name = "Completeness Check"
check_type = "completeness"
enabled = true
[tasks.properties.quality_checks.config]
columns = ["id", "customer_id", "amount"]
threshold = 0.95
allow_null = false
[[tasks.properties.quality_checks]]
check_id = "uniqueness_check"
check_name = "Uniqueness Check"
check_type = "uniqueness"
enabled = true
[tasks.properties.quality_checks.config]
columns = ["id"]
[[tasks.properties.quality_checks]]
check_id = "range_check"
check_name = "Amount Range Check"
check_type = "range"
enabled = true
[tasks.properties.quality_checks.config]
columns = ["amount"]
min_value = 0
max_value = 1000000
Python API Example:
from qarion_etl.flows.tasks import QualityCheckTask, QualityCheckTaskProperties, QualityCheckOperation
props = QualityCheckTaskProperties(
operation=QualityCheckOperation.QUALITY_CHECK,
table_type='staging',
quality_checks=[
{'type': 'not_null', 'column': 'id'},
{'type': 'unique', 'column': 'id'}
]
)
task = QualityCheckTask(
id='staging_quality_checks',
name='Staging Quality Checks',
source_dataset_id='staging_dataset',
target_dataset_id='staging_dataset', # Quality checks operate on the same dataset
dependencies=['landing_to_staging'],
task_properties=props
)
4. Export Tasks
Purpose: Export data from datasets to external destinations.
Properties:
operation: Type of export operationexport_type: Type of export destination (e.g., 'file', 'api', 'database')destination: Export destination identifier or configurationexport_config: Full export configuration dictionaryformat: Optional export format (e.g., 'csv', 'parquet', 'json')custom_properties: Additional flow-specific properties
TOML Configuration Example:
[[tasks]]
id = "export_orders"
name = "Export Orders"
type = "export"
source_dataset_id = "orders_staging"
dependencies = ["quality_check_orders"]
[tasks.properties]
operation = "export"
export_type = "file"
destination = "exports/orders.csv"
format = "csv"
[tasks.config]
include_header = true
delimiter = ","
encoding = "utf-8"
columns = ["id", "customer_id", "amount", "order_date"]
Export to Parquet Example:
[[tasks]]
id = "export_orders_parquet"
name = "Export Orders to Parquet"
type = "export"
source_dataset_id = "orders_staging"
dependencies = ["quality_check_orders"]
[tasks.properties]
operation = "export"
export_type = "file"
destination = "exports/orders.parquet"
format = "parquet"
[tasks.config]
compression = "snappy"
columns = ["id", "customer_id", "amount", "order_date", "value_category"]
Python API Example:
from qarion_etl.flows.tasks import ExportTask, ExportTaskProperties, ExportOperation
props = ExportTaskProperties(
operation=ExportOperation.EXPORT,
export_type='file',
destination='/path/to/output.csv',
format='csv'
)
task = ExportTask(
id='export_data',
name='Export to CSV',
source_dataset_id='final_dataset',
task_properties=props
)
5. Flow Trigger Tasks
Purpose: Trigger execution of another flow from within the current flow.
Properties:
operation: Type of flow trigger operation (trigger)target_flow_id: ID of the flow to trigger (required)trigger_id: Optional specific trigger ID to usewait_for_completion: Whether to wait for triggered flow to complete (default: true)pass_batch_id: Whether to pass current batch_id to triggered flow (default: true)batch_id: Optional batch_id to use for triggered flowexecution_date: Optional execution date for triggered flowvariables: Optional variables to pass to triggered flowcondition: Condition for triggering ('success', 'failure', 'always') (default: 'success')custom_properties: Additional flow-specific properties
TOML Configuration Example:
[[tasks]]
id = "trigger_child_flow"
name = "Trigger Child Flow"
type = "flow_trigger"
dependencies = ["process_data"]
[tasks.properties]
target_flow_id = "child_flow"
wait_for_completion = true
pass_batch_id = true
condition = "success"
variables = {
min_value = 100
max_value = 1000
}
Python API Example:
from qarion_etl.flows.tasks import FlowTriggerTask, FlowTriggerTaskProperties, FlowTriggerOperation
props = FlowTriggerTaskProperties(
operation=FlowTriggerOperation.TRIGGER,
target_flow_id='child_flow',
wait_for_completion=True,
pass_batch_id=True,
condition='success',
variables={'min_value': 100, 'max_value': 1000}
)
task = FlowTriggerTask(
id='trigger_child',
name='Trigger Child Flow',
dependencies=['process_data'],
task_properties=props
)
For complete documentation, see Flow Trigger Tasks Guide.
6. API Call Tasks
Purpose: Make HTTP API calls with support for authentication and XCom data integration.
Properties:
operation: HTTP method (get, post, put, patch, delete)url: API endpoint URL (supports Jinja2 templating)auth_type: Authentication type (none, basic, oauth2, api_key, bearer)credential_id: Credential ID from credential storeheaders: Custom HTTP headers (supports Jinja2 templating)params: URL query parameters (supports Jinja2 templating)body: Request body (for POST, PUT, PATCH) - supports Jinja2 templatingbody_format: Body format (json, form, raw, xml)timeout: Request timeout in secondsretry_count: Number of retries on failureretry_delay: Delay between retries in secondsxcom_pull: List of task IDs to pull data from via XComxcom_pull_key: Key to extract from XCom dataresponse_handling: How to handle response (store, validate, ignore)response_validation: Optional validation rules for response
TOML Configuration Example:
[[tasks]]
id = "call_external_api"
name = "Call External API"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/data"
operation = "get"
auth_type = "basic"
credential_id = "api_credentials"
timeout = 30
retry_count = 3
OAuth2 Example:
[[tasks]]
id = "oauth2_api_call"
name = "OAuth2 API Call"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/protected/data"
operation = "get"
auth_type = "oauth2"
credential_id = "oauth2_credentials"
Using XCom Data:
[[tasks]]
id = "api_with_xcom"
name = "API Call with XCom"
type = "api_call"
dependencies = ["previous_task"]
[tasks.properties]
url = "https://api.example.com/users/{{ xcom_previous_task.user_id }}"
operation = "post"
xcom_pull = ["previous_task"]
body = {
"user_id" = "{{ xcom_previous_task.user_id }}",
"status" = "{{ xcom_previous_task.status }}"
}
For complete documentation, see API Call Tasks Guide.
Task Dependencies
Tasks can define explicit dependencies to control execution order:
TOML Configuration Example:
[[tasks]]
id = "ingest_orders"
type = "ingestion"
target_dataset_id = "orders_landing"
# No dependencies - this is the first task
[[tasks]]
id = "transform_orders"
type = "transformation"
source_dataset_id = "orders_landing"
target_dataset_id = "orders_staging"
dependencies = ["ingest_orders"] # Depends on ingestion task
[[tasks]]
id = "export_orders"
type = "export"
source_dataset_id = "orders_staging"
dependencies = ["transform_orders"] # Depends on transformation task
Python API Example:
task = TransformationTask(
id='transform',
source_dataset_id='source',
target_dataset_id='target',
dependencies=['ingestion'], # This task depends on the 'ingestion' task
task_properties=props
)
FAN IN and FAN OUT Patterns
Tasks support both FAN IN (multiple sources) and FAN OUT (multiple targets) patterns:
FAN IN (Multiple tasks → Single task):
# Task that depends on multiple upstream tasks
[[tasks]]
id = "join_task"
type = "transformation"
target_dataset_id = "joined_data"
dependencies = ["task_a", "task_b", "task_c"] # Multiple dependencies
[tasks.properties]
operation = "join"
source_datasets = ["dataset_a", "dataset_b", "dataset_c"]
Python Example:
# Task that depends on multiple upstream tasks
task = TransformationTask(
id='join_task',
target_dataset_id='joined_data',
dependencies=['task_a', 'task_b', 'task_c'], # Multiple dependencies
task_properties=props
)
FAN OUT (Single task → Multiple tasks):
# Source task feeds into multiple downstream tasks
[[tasks]]
id = "source_task"
type = "transformation"
source_dataset_id = "source"
target_dataset_id = "target"
# Multiple downstream tasks will depend on this one
[[tasks]]
id = "task_a"
type = "transformation"
source_dataset_id = "target"
target_dataset_id = "output_a"
dependencies = ["source_task"] # Depends on source_task
[[tasks]]
id = "task_b"
type = "transformation"
source_dataset_id = "target"
target_dataset_id = "output_b"
dependencies = ["source_task"] # Also depends on source_task
Python Example:
# Task that feeds into multiple downstream tasks
task = TransformationTask(
id='source_task',
source_dataset_id='source',
target_dataset_id='target',
downstream_tasks=['task_a', 'task_b'], # Multiple downstream tasks
task_properties=props
)
Creating Custom Task Types
You can extend the task system by creating custom task type plugins. This allows you to define new task types with specific properties and validation logic.
Step 1: Define Properties Class
from dataclasses import dataclass
from typing import Dict, Any, Optional
@dataclass
class CustomTaskProperties:
"""Properties for custom task type."""
custom_field: str
optional_field: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
'custom_field': self.custom_field,
'optional_field': self.optional_field,
}
@classmethod
def from_dict(cls, props_dict: Dict[str, Any]):
return cls(
custom_field=props_dict.get('custom_field', 'default'),
optional_field=props_dict.get('optional_field'),
)
Step 2: Create Plugin Class
from qarion_etl.flows.tasks.base import TaskTypePlugin
class CustomTaskTypePlugin(TaskTypePlugin):
"""Plugin for custom task type."""
@property
def task_type(self) -> str:
return 'custom_task'
@property
def name(self) -> str:
return 'Custom Task'
@property
def description(self) -> str:
return 'Performs a custom operation'
@property
def properties_class(self):
return CustomTaskProperties
def validate_task(self, task: 'FlowTask') -> bool:
"""Validate custom task."""
super().validate_task(task)
# Add custom validation logic
props = task.task_properties
if not props.custom_field:
raise ValueError("custom_field is required")
return True
Step 3: Register the Plugin
from qarion_etl.flows.tasks.registry import register_task_type
register_task_type(CustomTaskTypePlugin())
For more detailed information on creating custom task types, see the Task Type Plugin Architecture guide.
Using the Plugin Registry
The plugin registry provides functions to discover and use task types:
from qarion_etl.flows.tasks import (
get_task_type,
list_task_types,
has_task_type
)
# List all available task types
types = list_task_types()
print(types) # ['ingestion', 'transformation', 'dq_check', 'export']
# Get a specific plugin
plugin = get_task_type('ingestion')
print(plugin.name) # "Ingestion"
print(plugin.description) # "Loads data from external sources..."
# Check if a task type exists
if has_task_type('custom_task'):
print("Custom task type is available")
Task Serialization
Tasks can be converted to dictionaries and back:
# Convert task to dictionary
task_dict = task.to_dict()
# Recreate task from dictionary
task = FlowTask.from_dict(task_dict)
This is useful for:
- Storing tasks in configuration files
- Serializing tasks for API responses
- Persisting task definitions
Integration with Flows
Tasks are used internally by flow plugins to define the logical steps in a flow. When a flow generates a DAG, it:
- Creates task definitions for each step
- Converts tasks to DAG nodes
- Builds dependencies between nodes
- Validates the DAG structure
You typically don't need to create tasks directly when using flows - the flow plugins handle this automatically. However, understanding the task system is useful for:
- Creating custom flow plugins
- Understanding how flows work internally
- Extending the system with new task types
Best Practices
- Use explicit properties: Always use typed property classes rather than generic dictionaries
- Define clear dependencies: Explicitly declare task dependencies for better clarity
- Validate early: Use plugin validation to catch errors early
- Document custom task types: Provide clear descriptions and examples for custom types
- Follow naming conventions: Use consistent naming for task IDs and types
Related Documentation
- Flows Guide - Learn how flows use tasks internally
- API Call Tasks - Making HTTP API calls with authentication
- Flow Trigger Tasks - Triggering other flows
- XCom Guide - Inter-task data exchange
- Flow Types Reference - Learn about different flow types
- Core Concepts - Understanding Qarion ETL fundamentals
For developers:
- Task Type Plugin Architecture - Technical details for extending the system
- Plugin Interfaces Reference - Complete API reference