XCom: Inter-Task Data Exchange
Overview
XCom (Cross-Communication) provides a mechanism for tasks to exchange data, similar to Airflow's XCom system. It allows tasks to push data that can be pulled by downstream tasks, enabling complex data dependencies and workflows.
Key Features
- Multiple Persistence Backends: Database, S3, Local File
- Automatic Task Result Storage: Task results are automatically pushed to XCom
- Template Integration: Pull XCom values directly in SQL/template strings
- Batch and Run Tracking: Support for batch_id and dag_run_id tracking
- Type Support: JSON, String, Integer, Float, Boolean, Binary
Configuration
Enable XCom
XCom is configured in qarion-etl.toml:
[xcom]
enabled = true
backend = "database" # or "s3", "local_file"
backend_config = {}
Database Backend (Default)
Stores XCom values in the metadata database:
[xcom]
enabled = true
backend = "database"
backend_config = {
# Optional: custom table name (default: uses metadata namespace)
table_name = "xcom"
}
S3 Backend
Stores XCom values as JSON files in S3:
[xcom]
enabled = true
backend = "s3"
backend_config = {
bucket = "my-xcom-bucket"
prefix = "xcom" # Optional: S3 prefix/path
credentials = "${credential:aws_creds}" # Optional: credential reference
}
Or with explicit credentials:
[xcom]
enabled = true
backend = "s3"
backend_config = {
bucket = "my-xcom-bucket"
prefix = "xcom"
credentials = {
aws_access_key_id = "your-key"
aws_secret_access_key = "your-secret"
region_name = "us-east-1"
}
}
Local File Backend
Stores XCom values as JSON files on the local filesystem:
[xcom]
enabled = true
backend = "local_file"
backend_config = {
base_path = "xcom" # Base directory for XCom files
}
Usage
Automatic Task Result Storage
When XCom is enabled, task execution results are automatically pushed to XCom:
# flows/my_flow.toml
flow_type = "standard"
[[tasks]]
id = "ingest_data"
type = "ingestion"
target_dataset_id = "data_landing"
# Result is automatically pushed to XCom with key "return_value"
[[tasks]]
id = "process_data"
type = "transformation"
source_dataset_id = "data_landing"
dependencies = ["ingest_data"]
# Can pull result from ingest_data task
Pulling XCom Values in Templates
Use the xcom_pull() function in SQL/template strings:
[[tasks]]
id = "process_data"
type = "transformation"
source_dataset_id = "data_landing"
dependencies = ["ingest_data"]
[tasks.config]
sql = """
SELECT
*,
{{ xcom_pull('ingest_data', key='rows_loaded') }} as upstream_rows
FROM {{ source_dataset }}
WHERE batch_id = {{ batch_id }}
"""
Pull with custom key:
sql = """
SELECT * FROM {{ source_dataset }}
WHERE amount > {{ xcom_pull('upstream_task', key='min_amount') }}
"""
Pull from specific flow/run:
sql = """
SELECT * FROM {{ source_dataset }}
WHERE value = {{ xcom_pull('other_task', flow_id='other_flow', batch_id=5) }}
"""
Manual Push/Pull in Python
You can also use XCom programmatically:
from qarion_etl.xcom import XComService
from qarion_etl.xcom.backends.database import DatabaseXComBackend
from qarion_etl.db_service import DatabaseService
# Create XCom service
db_service = DatabaseService(engine)
backend = DatabaseXComBackend(db_service)
xcom_service = XComService(
backend=backend,
default_flow_id="my_flow",
default_batch_id=1
)
# Push a value
xcom_service.push(
task_id="my_task",
value={"rows_processed": 1000, "status": "success"},
key="processing_result"
)
# Pull a value
result = xcom_service.pull(
task_id="my_task",
key="processing_result"
)
print(result) # {"rows_processed": 1000, "status": "success"}
# Pull multiple values
results = xcom_service.pull_many(
task_ids=["task1", "task2", "task3"],
key="return_value"
)
XCom Value Types
XCom supports multiple value types:
- JSON (default): Complex objects, dictionaries, lists
- String: Text values
- Integer: Numeric integers
- Float: Numeric floats
- Boolean: True/false values
- Binary: Binary data (base64 encoded)
Example:
# Push different types
xcom_service.push(task_id="task1", value="text", value_type=XComValueType.STRING)
xcom_service.push(task_id="task2", value=42, value_type=XComValueType.INTEGER)
xcom_service.push(task_id="task3", value={"key": "value"}, value_type=XComValueType.JSON)
Batch and Run Tracking
XCom values are tracked by:
- flow_id: Flow identifier
- task_id: Task identifier
- key: Optional key (default: "return_value")
- dag_run_id: Optional DAG run identifier
- batch_id: Optional batch ID
This allows you to:
- Track values across different flow executions
- Retrieve values from specific batches
- Isolate values by run ID
Example:
# Push with batch tracking
xcom_service.push(
task_id="my_task",
value={"count": 100},
batch_id=5,
dag_run_id="run_2024-01-15"
)
# Pull from specific batch
result = xcom_service.pull(
task_id="my_task",
batch_id=5,
dag_run_id="run_2024-01-15"
)
Complete Example
Flow Definition:
# flows/data_pipeline.toml
id = "data_pipeline"
name = "Data Processing Pipeline"
flow_type = "standard"
namespace = "production"
[input]
primary_key = ["id"]
columns = [
{name = "id", schema_type = "integer", required = true},
{name = "value", schema_type = "float", required = true}
]
[[tasks]]
id = "ingest_orders"
name = "Ingest Orders"
type = "ingestion"
target_dataset_id = "orders_landing"
[tasks.properties]
operation = "ingestion"
target_table_type = "landing"
processing_type = "FULL_REFRESH"
[tasks.config]
path = "data/orders.csv"
format = "csv"
# Result automatically pushed to XCom
[[tasks]]
id = "calculate_stats"
name = "Calculate Statistics"
type = "transformation"
source_dataset_id = "orders_landing"
target_dataset_id = "orders_stats"
dependencies = ["ingest_orders"]
[tasks.config]
sql = """
SELECT
COUNT(*) as total_count,
AVG(value) as avg_value,
MAX(value) as max_value,
MIN(value) as min_value
FROM {{ source_dataset }}
"""
# Can access ingest_orders result via xcom_pull('ingest_orders')
[[tasks]]
id = "filter_high_value"
name = "Filter High Value Orders"
type = "transformation"
source_dataset_id = "orders_landing"
target_dataset_id = "high_value_orders"
dependencies = ["calculate_stats"]
[tasks.config]
sql = """
SELECT *
FROM {{ source_dataset }}
WHERE value > {{ xcom_pull('calculate_stats', key='avg_value') }}
"""
Configuration:
# qarion-etl.toml
[xcom]
enabled = true
backend = "database"
backend_config = {}
Best Practices
-
Use Descriptive Keys: Use meaningful keys instead of default "return_value"
xcom_service.push(task_id="task1", value=data, key="processed_data") -
Batch Tracking: Always use batch_id for proper data isolation
xcom_service.push(task_id="task1", value=data, batch_id=batch_id) -
Error Handling: Always check if XCom value exists before using
result = xcom_service.pull(task_id="upstream_task")
if result is None:
raise ValueError("Upstream task result not found") -
Type Safety: Specify value types explicitly for better serialization
xcom_service.push(
task_id="task1",
value=42,
value_type=XComValueType.INTEGER
) -
Cleanup: Clear old XCom values periodically to manage storage
# Clear all values for a flow
count = xcom_service.clear(flow_id="my_flow")
# Clear values for a specific task
count = xcom_service.clear(flow_id="my_flow", task_id="old_task")
Backend Comparison
| Backend | Best For | Persistence | Performance | Setup Complexity |
|---|---|---|---|---|
| Database | Production, multi-user | ✅ Persistent | Good | Low |
| S3 | Cloud environments, distributed | ✅ Persistent | Good | Medium |
| Local File | Development, testing | ✅ Persistent | Fast | Very Low |
Related Documentation
- Tasks Guide - Understanding task system
- Flows Guide - Flow execution
- Configuration Guide - Configuration options