API Call Tasks
Qarion ETL provides a powerful API call task type that allows you to make HTTP API calls from within your data pipelines. This enables integration with external services, webhooks, REST APIs, and more.
Overview
API call tasks support:
- Multiple HTTP Methods: GET, POST, PUT, PATCH, DELETE
- Authentication: Basic auth, OAuth2, API keys, Bearer tokens
- XCom Integration: Pull data from other tasks to use in API calls
- Jinja2 Templating: Dynamic URLs, headers, parameters, and request bodies
- Response Handling: Store responses in XCom, validate responses, or ignore
- Retry Logic: Automatic retries with configurable delays
- Error Handling: Comprehensive error reporting and validation
Basic Usage
Simple GET Request
[[tasks]]
id = "fetch_data"
type = "api_call"
properties = { url = "https://api.example.com/data", operation = "get" }
POST Request with JSON Body
[[tasks]]
id = "create_record"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/records"
operation = "post"
body = { "name": "Test Record", "status": "active" }
body_format = "json"
Authentication
Basic Authentication
Store credentials in the credential store:
qarion-etl credentials store --credential-id api_basic \
--type basic_auth \
--json '{"username": "user", "password": "pass"}'
Use in task:
[[tasks]]
id = "authenticated_api"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/protected"
operation = "get"
auth_type = "basic"
credential_id = "api_basic"
OAuth2 Authentication
Store OAuth2 credentials:
qarion-etl credentials store --credential-id api_oauth2 \
--type oauth \
--json '{
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"token_url": "https://api.example.com/oauth/token"
}'
Use in task:
[[tasks]]
id = "oauth2_api"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/data"
operation = "get"
auth_type = "oauth2"
credential_id = "api_oauth2"
Note: OAuth2 support requires the requests-oauthlib library:
pip install requests-oauthlib
API Key Authentication
Store API key:
qarion-etl credentials store --credential-id api_key \
--type api_key \
--json '{"api_key": "your_api_key", "header_name": "X-API-Key"}'
Use in task:
[[tasks]]
id = "api_key_request"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/data"
operation = "get"
auth_type = "api_key"
credential_id = "api_key"
Bearer Token Authentication
Store bearer token:
qarion-etl credentials store --credential-id bearer_token \
--type custom \
--json '{"token": "your_bearer_token"}'
Use in task:
[[tasks]]
id = "bearer_api"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/data"
operation = "get"
auth_type = "bearer"
credential_id = "bearer_token"
XCom Integration
API call tasks can pull data from other tasks using XCom, enabling dynamic API calls based on previous task results.
Pulling Data from Other Tasks
[[tasks]]
id = "get_user_id"
type = "transformation"
# ... transformation config ...
[[tasks]]
id = "fetch_user_data"
type = "api_call"
dependencies = ["get_user_id"]
[tasks.properties]
url = "https://api.example.com/users/{{ xcom_get_user_id.user_id }}"
operation = "get"
xcom_pull = ["get_user_id"]
xcom_pull_key = "user_id" # Extract specific key from XCom data
Using XCom Data in Templates
XCom data is automatically available in Jinja2 templates:
[[tasks]]
id = "api_call"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/users/{{ xcom_previous_task.user_id }}"
operation = "post"
headers = { "X-User-ID" = "{{ xcom_previous_task.user_id }}" }
params = { "filter" = "{{ xcom_previous_task.filter_value }}" }
body = { "user_id" = "{{ xcom_previous_task.user_id }}", "action" = "update" }
xcom_pull = ["previous_task"]
Jinja2 Templating
All API call configuration supports Jinja2 templating with access to:
- Flow Variables:
{{ variable_name }} - XCom Data:
{{ xcom_task_id.key }}or{{ xcom_task_id }} - Execution Context:
{{ batch_id }},{{ execution_date }} - Flow Metadata:
{{ flow_id }},{{ node_id }}
Dynamic URLs
[tasks.properties]
url = "https://api.example.com/data?batch={{ batch_id }}&date={{ execution_date }}"
Dynamic Headers
[tasks.properties]
headers = {
"X-Batch-ID" = "{{ batch_id }}",
"Authorization" = "Bearer {{ xcom_auth_task.token }}"
}
Dynamic Request Body
[tasks.properties]
body = {
"batch_id" = "{{ batch_id }}",
"data" = "{{ xcom_transform_task.result }}",
"timestamp" = "{{ execution_date }}"
}
Configuration Options
Task Properties
| Property | Type | Required | Description |
|---|---|---|---|
url | string | Yes | API endpoint URL (supports Jinja2 templating) |
operation | string | No | HTTP method: get, post, put, patch, delete (default: get) |
auth_type | string | No | Authentication type: none, basic, oauth2, api_key, bearer (default: none) |
credential_id | string | No | Credential ID from credential store (required if auth_type != none) |
headers | dict | No | Custom HTTP headers (supports Jinja2 templating) |
params | dict | No | URL query parameters (supports Jinja2 templating) |
body | string/dict | No | Request body (for POST, PUT, PATCH) |
body_format | string | No | Body format: json, form, raw, xml (default: json) |
timeout | integer | No | Request timeout in seconds (default: 30) |
retry_count | integer | No | Number of retries on failure (default: 0) |
retry_delay | float | No | Delay between retries in seconds (default: 1.0) |
xcom_pull | list | No | List of task IDs to pull data from via XCom |
xcom_pull_key | string | No | Key to extract from XCom data (default: uses full value) |
response_handling | string | No | How to handle response: store, validate, ignore (default: store) |
response_validation | dict | No | Optional validation rules for response |
Response Validation
Configure response validation to ensure API responses meet your requirements:
[[tasks]]
id = "validated_api"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/data"
operation = "get"
response_handling = "validate"
[tasks.properties.response_validation]
expected_status_code = 200
required_fields = ["data", "status"]
[tasks.properties.response_validation.field_checks]
status = { type = "str", value = "success" }
Validation Rules:
expected_status_code: Expected HTTP status code (integer or list)required_fields: List of required fields in JSON responsefield_checks: Dictionary of field-specific checks (type, value)
Examples
Example 1: Simple API Call
[[tasks]]
id = "fetch_external_data"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/data"
operation = "get"
timeout = 60
Example 2: POST with Authentication
[[tasks]]
id = "create_record"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/records"
operation = "post"
auth_type = "basic"
credential_id = "api_credentials"
body = {
"name" = "New Record",
"status" = "active"
}
body_format = "json"
headers = {
"Content-Type" = "application/json"
}
Example 3: Using XCom Data
[[tasks]]
id = "process_data"
type = "transformation"
# ... transformation that outputs user_id ...
[[tasks]]
id = "notify_api"
type = "api_call"
dependencies = ["process_data"]
[tasks.properties]
url = "https://api.example.com/notify/{{ xcom_process_data.user_id }}"
operation = "post"
xcom_pull = ["process_data"]
body = {
"user_id" = "{{ xcom_process_data.user_id }}",
"status" = "{{ xcom_process_data.status }}",
"batch_id" = "{{ batch_id }}"
}
Example 4: OAuth2 with Retries
[[tasks]]
id = "oauth_api_call"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/protected/data"
operation = "get"
auth_type = "oauth2"
credential_id = "oauth_credentials"
retry_count = 3
retry_delay = 2.0
timeout = 30
Example 5: API Call with Response Validation
[[tasks]]
id = "validated_api"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/data"
operation = "get"
response_handling = "validate"
[tasks.properties.response_validation]
expected_status_code = [200, 201]
required_fields = ["data", "timestamp"]
Response Handling
Storing Response in XCom
By default, API responses are stored in XCom for use by downstream tasks:
[[tasks]]
id = "api_call"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/data"
response_handling = "store" # Default
Downstream tasks can access the response:
[[tasks]]
id = "use_response"
type = "transformation"
dependencies = ["api_call"]
# Can use {{ xcom_api_call }} in SQL/templates
Ignoring Response
If you don't need the response:
[tasks.properties]
response_handling = "ignore"
Validating Response
Validate response structure and values:
[tasks.properties]
response_handling = "validate"
[tasks.properties.response_validation]
expected_status_code = 200
required_fields = ["success", "data"]
Error Handling
API call tasks handle errors gracefully:
- Network Errors: Retried according to
retry_countandretry_delay - HTTP Errors: Status codes >= 400 are treated as failures
- Validation Errors: Response validation failures are reported
- Authentication Errors: Clear error messages for auth failures
Failed API calls are logged and tracked in metadata tables.
Metadata Tracking
All API call operations are automatically tracked in the metadata system:
- Operation Logs: Recorded in
xt_operation_logswith operation typeapi_call - Execution Details: URL, method, status code, response size, validation results
- Error Tracking: Failed calls are logged with error messages
Query API call history:
SELECT
operation_id,
operation_subtype,
flow_id,
task_id,
status,
success,
operation_details
FROM xt_operation_logs
WHERE operation_type = 'api_call'
ORDER BY start_time DESC;
Best Practices
- Use Credential Store: Never hardcode credentials in flow definitions
- Set Appropriate Timeouts: Configure timeouts based on expected API response times
- Use Retries: Enable retries for unreliable APIs
- Validate Responses: Use response validation for critical API calls
- Leverage XCom: Use XCom to pass data between tasks dynamically
- Error Handling: Configure appropriate error handling and alerts
- Template Variables: Use Jinja2 templating for dynamic API calls
Related Documentation
- XCom Guide - Inter-task data exchange
- Credential Management - Managing API credentials securely
- Tasks Guide - Understanding task types
- Metadata Tracking - Tracking and monitoring operations