Skip to main content

XCom: Inter-Task Data Exchange

Overview

XCom (Cross-Communication) provides a mechanism for tasks to exchange data, similar to Airflow's XCom system. It allows tasks to push data that can be pulled by downstream tasks, enabling complex data dependencies and workflows.

Key Features

  • Multiple Persistence Backends: Database, S3, Local File
  • Automatic Task Result Storage: Task results are automatically pushed to XCom
  • Template Integration: Pull XCom values directly in SQL/template strings
  • Batch and Run Tracking: Support for batch_id and dag_run_id tracking
  • Type Support: JSON, String, Integer, Float, Boolean, Binary

Configuration

Enable XCom

XCom is configured in qarion-etl.toml:

[xcom]
enabled = true
backend = "database" # or "s3", "local_file"
backend_config = {}

Database Backend (Default)

Stores XCom values in the metadata database:

[xcom]
enabled = true
backend = "database"
backend_config = {
# Optional: custom table name (default: uses metadata namespace)
table_name = "xcom"
}

S3 Backend

Stores XCom values as JSON files in S3:

[xcom]
enabled = true
backend = "s3"
backend_config = {
bucket = "my-xcom-bucket"
prefix = "xcom" # Optional: S3 prefix/path
credentials = "${credential:aws_creds}" # Optional: credential reference
}

Or with explicit credentials:

[xcom]
enabled = true
backend = "s3"
backend_config = {
bucket = "my-xcom-bucket"
prefix = "xcom"
credentials = {
aws_access_key_id = "your-key"
aws_secret_access_key = "your-secret"
region_name = "us-east-1"
}
}

Local File Backend

Stores XCom values as JSON files on the local filesystem:

[xcom]
enabled = true
backend = "local_file"
backend_config = {
base_path = "xcom" # Base directory for XCom files
}

Usage

Automatic Task Result Storage

When XCom is enabled, task execution results are automatically pushed to XCom:

# flows/my_flow.toml
flow_type = "standard"

[[tasks]]
id = "ingest_data"
type = "ingestion"
target_dataset_id = "data_landing"
# Result is automatically pushed to XCom with key "return_value"

[[tasks]]
id = "process_data"
type = "transformation"
source_dataset_id = "data_landing"
dependencies = ["ingest_data"]
# Can pull result from ingest_data task

Pulling XCom Values in Templates

Use the xcom_pull() function in SQL/template strings:

[[tasks]]
id = "process_data"
type = "transformation"
source_dataset_id = "data_landing"
dependencies = ["ingest_data"]
[tasks.config]
sql = """
SELECT
*,
{{ xcom_pull('ingest_data', key='rows_loaded') }} as upstream_rows
FROM {{ source_dataset }}
WHERE batch_id = {{ batch_id }}
"""

Pull with custom key:

sql = """
SELECT * FROM {{ source_dataset }}
WHERE amount > {{ xcom_pull('upstream_task', key='min_amount') }}
"""

Pull from specific flow/run:

sql = """
SELECT * FROM {{ source_dataset }}
WHERE value = {{ xcom_pull('other_task', flow_id='other_flow', batch_id=5) }}
"""

Manual Push/Pull in Python

You can also use XCom programmatically:

from qarion_etl.xcom import XComService
from qarion_etl.xcom.backends.database import DatabaseXComBackend
from qarion_etl.db_service import DatabaseService

# Create XCom service
db_service = DatabaseService(engine)
backend = DatabaseXComBackend(db_service)
xcom_service = XComService(
backend=backend,
default_flow_id="my_flow",
default_batch_id=1
)

# Push a value
xcom_service.push(
task_id="my_task",
value={"rows_processed": 1000, "status": "success"},
key="processing_result"
)

# Pull a value
result = xcom_service.pull(
task_id="my_task",
key="processing_result"
)
print(result) # {"rows_processed": 1000, "status": "success"}

# Pull multiple values
results = xcom_service.pull_many(
task_ids=["task1", "task2", "task3"],
key="return_value"
)

XCom Value Types

XCom supports multiple value types:

  • JSON (default): Complex objects, dictionaries, lists
  • String: Text values
  • Integer: Numeric integers
  • Float: Numeric floats
  • Boolean: True/false values
  • Binary: Binary data (base64 encoded)

Example:

# Push different types
xcom_service.push(task_id="task1", value="text", value_type=XComValueType.STRING)
xcom_service.push(task_id="task2", value=42, value_type=XComValueType.INTEGER)
xcom_service.push(task_id="task3", value={"key": "value"}, value_type=XComValueType.JSON)

Batch and Run Tracking

XCom values are tracked by:

  • flow_id: Flow identifier
  • task_id: Task identifier
  • key: Optional key (default: "return_value")
  • dag_run_id: Optional DAG run identifier
  • batch_id: Optional batch ID

This allows you to:

  • Track values across different flow executions
  • Retrieve values from specific batches
  • Isolate values by run ID

Example:

# Push with batch tracking
xcom_service.push(
task_id="my_task",
value={"count": 100},
batch_id=5,
dag_run_id="run_2024-01-15"
)

# Pull from specific batch
result = xcom_service.pull(
task_id="my_task",
batch_id=5,
dag_run_id="run_2024-01-15"
)

Complete Example

Flow Definition:

# flows/data_pipeline.toml
id = "data_pipeline"
name = "Data Processing Pipeline"
flow_type = "standard"
namespace = "production"

[input]
primary_key = ["id"]
columns = [
{name = "id", schema_type = "integer", required = true},
{name = "value", schema_type = "float", required = true}
]

[[tasks]]
id = "ingest_orders"
name = "Ingest Orders"
type = "ingestion"
target_dataset_id = "orders_landing"
[tasks.properties]
operation = "ingestion"
target_table_type = "landing"
processing_type = "FULL_REFRESH"
[tasks.config]
path = "data/orders.csv"
format = "csv"
# Result automatically pushed to XCom

[[tasks]]
id = "calculate_stats"
name = "Calculate Statistics"
type = "transformation"
source_dataset_id = "orders_landing"
target_dataset_id = "orders_stats"
dependencies = ["ingest_orders"]
[tasks.config]
sql = """
SELECT
COUNT(*) as total_count,
AVG(value) as avg_value,
MAX(value) as max_value,
MIN(value) as min_value
FROM {{ source_dataset }}
"""
# Can access ingest_orders result via xcom_pull('ingest_orders')

[[tasks]]
id = "filter_high_value"
name = "Filter High Value Orders"
type = "transformation"
source_dataset_id = "orders_landing"
target_dataset_id = "high_value_orders"
dependencies = ["calculate_stats"]
[tasks.config]
sql = """
SELECT *
FROM {{ source_dataset }}
WHERE value > {{ xcom_pull('calculate_stats', key='avg_value') }}
"""

Configuration:

# qarion-etl.toml
[xcom]
enabled = true
backend = "database"
backend_config = {}

Best Practices

  1. Use Descriptive Keys: Use meaningful keys instead of default "return_value"

    xcom_service.push(task_id="task1", value=data, key="processed_data")
  2. Batch Tracking: Always use batch_id for proper data isolation

    xcom_service.push(task_id="task1", value=data, batch_id=batch_id)
  3. Error Handling: Always check if XCom value exists before using

    result = xcom_service.pull(task_id="upstream_task")
    if result is None:
    raise ValueError("Upstream task result not found")
  4. Type Safety: Specify value types explicitly for better serialization

    xcom_service.push(
    task_id="task1",
    value=42,
    value_type=XComValueType.INTEGER
    )
  5. Cleanup: Clear old XCom values periodically to manage storage

    # Clear all values for a flow
    count = xcom_service.clear(flow_id="my_flow")

    # Clear values for a specific task
    count = xcom_service.clear(flow_id="my_flow", task_id="old_task")

Backend Comparison

BackendBest ForPersistencePerformanceSetup Complexity
DatabaseProduction, multi-user✅ PersistentGoodLow
S3Cloud environments, distributed✅ PersistentGoodMedium
Local FileDevelopment, testing✅ PersistentFastVery Low