Skip to main content

API Call Tasks

Qarion ETL provides a powerful API call task type that allows you to make HTTP API calls from within your data pipelines. This enables integration with external services, webhooks, REST APIs, and more.

Overview

API call tasks support:

  • Multiple HTTP Methods: GET, POST, PUT, PATCH, DELETE
  • Authentication: Basic auth, OAuth2, API keys, Bearer tokens
  • XCom Integration: Pull data from other tasks to use in API calls
  • Jinja2 Templating: Dynamic URLs, headers, parameters, and request bodies
  • Response Handling: Store responses in XCom, validate responses, or ignore
  • Retry Logic: Automatic retries with configurable delays
  • Error Handling: Comprehensive error reporting and validation

Basic Usage

Simple GET Request

[[tasks]]
id = "fetch_data"
type = "api_call"
properties = { url = "https://api.example.com/data", operation = "get" }

POST Request with JSON Body

[[tasks]]
id = "create_record"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/records"
operation = "post"
body = { "name": "Test Record", "status": "active" }
body_format = "json"

Authentication

Basic Authentication

Store credentials in the credential store:

qarion-etl credentials store --credential-id api_basic \
--type basic_auth \
--json '{"username": "user", "password": "pass"}'

Use in task:

[[tasks]]
id = "authenticated_api"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/protected"
operation = "get"
auth_type = "basic"
credential_id = "api_basic"

OAuth2 Authentication

Store OAuth2 credentials:

qarion-etl credentials store --credential-id api_oauth2 \
--type oauth \
--json '{
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"token_url": "https://api.example.com/oauth/token"
}'

Use in task:

[[tasks]]
id = "oauth2_api"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/data"
operation = "get"
auth_type = "oauth2"
credential_id = "api_oauth2"

Note: OAuth2 support requires the requests-oauthlib library:

pip install requests-oauthlib

API Key Authentication

Store API key:

qarion-etl credentials store --credential-id api_key \
--type api_key \
--json '{"api_key": "your_api_key", "header_name": "X-API-Key"}'

Use in task:

[[tasks]]
id = "api_key_request"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/data"
operation = "get"
auth_type = "api_key"
credential_id = "api_key"

Bearer Token Authentication

Store bearer token:

qarion-etl credentials store --credential-id bearer_token \
--type custom \
--json '{"token": "your_bearer_token"}'

Use in task:

[[tasks]]
id = "bearer_api"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/data"
operation = "get"
auth_type = "bearer"
credential_id = "bearer_token"

XCom Integration

API call tasks can pull data from other tasks using XCom, enabling dynamic API calls based on previous task results.

Pulling Data from Other Tasks

[[tasks]]
id = "get_user_id"
type = "transformation"
# ... transformation config ...

[[tasks]]
id = "fetch_user_data"
type = "api_call"
dependencies = ["get_user_id"]
[tasks.properties]
url = "https://api.example.com/users/{{ xcom_get_user_id.user_id }}"
operation = "get"
xcom_pull = ["get_user_id"]
xcom_pull_key = "user_id" # Extract specific key from XCom data

Using XCom Data in Templates

XCom data is automatically available in Jinja2 templates:

[[tasks]]
id = "api_call"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/users/{{ xcom_previous_task.user_id }}"
operation = "post"
headers = { "X-User-ID" = "{{ xcom_previous_task.user_id }}" }
params = { "filter" = "{{ xcom_previous_task.filter_value }}" }
body = { "user_id" = "{{ xcom_previous_task.user_id }}", "action" = "update" }
xcom_pull = ["previous_task"]

Jinja2 Templating

All API call configuration supports Jinja2 templating with access to:

  • Flow Variables: {{ variable_name }}
  • XCom Data: {{ xcom_task_id.key }} or {{ xcom_task_id }}
  • Execution Context: {{ batch_id }}, {{ execution_date }}
  • Flow Metadata: {{ flow_id }}, {{ node_id }}

Dynamic URLs

[tasks.properties]
url = "https://api.example.com/data?batch={{ batch_id }}&date={{ execution_date }}"

Dynamic Headers

[tasks.properties]
headers = {
"X-Batch-ID" = "{{ batch_id }}",
"Authorization" = "Bearer {{ xcom_auth_task.token }}"
}

Dynamic Request Body

[tasks.properties]
body = {
"batch_id" = "{{ batch_id }}",
"data" = "{{ xcom_transform_task.result }}",
"timestamp" = "{{ execution_date }}"
}

Configuration Options

Task Properties

PropertyTypeRequiredDescription
urlstringYesAPI endpoint URL (supports Jinja2 templating)
operationstringNoHTTP method: get, post, put, patch, delete (default: get)
auth_typestringNoAuthentication type: none, basic, oauth2, api_key, bearer (default: none)
credential_idstringNoCredential ID from credential store (required if auth_type != none)
headersdictNoCustom HTTP headers (supports Jinja2 templating)
paramsdictNoURL query parameters (supports Jinja2 templating)
bodystring/dictNoRequest body (for POST, PUT, PATCH)
body_formatstringNoBody format: json, form, raw, xml (default: json)
timeoutintegerNoRequest timeout in seconds (default: 30)
retry_countintegerNoNumber of retries on failure (default: 0)
retry_delayfloatNoDelay between retries in seconds (default: 1.0)
xcom_pulllistNoList of task IDs to pull data from via XCom
xcom_pull_keystringNoKey to extract from XCom data (default: uses full value)
response_handlingstringNoHow to handle response: store, validate, ignore (default: store)
response_validationdictNoOptional validation rules for response

Response Validation

Configure response validation to ensure API responses meet your requirements:

[[tasks]]
id = "validated_api"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/data"
operation = "get"
response_handling = "validate"
[tasks.properties.response_validation]
expected_status_code = 200
required_fields = ["data", "status"]
[tasks.properties.response_validation.field_checks]
status = { type = "str", value = "success" }

Validation Rules:

  • expected_status_code: Expected HTTP status code (integer or list)
  • required_fields: List of required fields in JSON response
  • field_checks: Dictionary of field-specific checks (type, value)

Examples

Example 1: Simple API Call

[[tasks]]
id = "fetch_external_data"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/data"
operation = "get"
timeout = 60

Example 2: POST with Authentication

[[tasks]]
id = "create_record"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/records"
operation = "post"
auth_type = "basic"
credential_id = "api_credentials"
body = {
"name" = "New Record",
"status" = "active"
}
body_format = "json"
headers = {
"Content-Type" = "application/json"
}

Example 3: Using XCom Data

[[tasks]]
id = "process_data"
type = "transformation"
# ... transformation that outputs user_id ...

[[tasks]]
id = "notify_api"
type = "api_call"
dependencies = ["process_data"]
[tasks.properties]
url = "https://api.example.com/notify/{{ xcom_process_data.user_id }}"
operation = "post"
xcom_pull = ["process_data"]
body = {
"user_id" = "{{ xcom_process_data.user_id }}",
"status" = "{{ xcom_process_data.status }}",
"batch_id" = "{{ batch_id }}"
}

Example 4: OAuth2 with Retries

[[tasks]]
id = "oauth_api_call"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/protected/data"
operation = "get"
auth_type = "oauth2"
credential_id = "oauth_credentials"
retry_count = 3
retry_delay = 2.0
timeout = 30

Example 5: API Call with Response Validation

[[tasks]]
id = "validated_api"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/data"
operation = "get"
response_handling = "validate"
[tasks.properties.response_validation]
expected_status_code = [200, 201]
required_fields = ["data", "timestamp"]

Response Handling

Storing Response in XCom

By default, API responses are stored in XCom for use by downstream tasks:

[[tasks]]
id = "api_call"
type = "api_call"
[tasks.properties]
url = "https://api.example.com/data"
response_handling = "store" # Default

Downstream tasks can access the response:

[[tasks]]
id = "use_response"
type = "transformation"
dependencies = ["api_call"]
# Can use {{ xcom_api_call }} in SQL/templates

Ignoring Response

If you don't need the response:

[tasks.properties]
response_handling = "ignore"

Validating Response

Validate response structure and values:

[tasks.properties]
response_handling = "validate"
[tasks.properties.response_validation]
expected_status_code = 200
required_fields = ["success", "data"]

Error Handling

API call tasks handle errors gracefully:

  • Network Errors: Retried according to retry_count and retry_delay
  • HTTP Errors: Status codes >= 400 are treated as failures
  • Validation Errors: Response validation failures are reported
  • Authentication Errors: Clear error messages for auth failures

Failed API calls are logged and tracked in metadata tables.

Metadata Tracking

All API call operations are automatically tracked in the metadata system:

  • Operation Logs: Recorded in xt_operation_logs with operation type api_call
  • Execution Details: URL, method, status code, response size, validation results
  • Error Tracking: Failed calls are logged with error messages

Query API call history:

SELECT
operation_id,
operation_subtype,
flow_id,
task_id,
status,
success,
operation_details
FROM xt_operation_logs
WHERE operation_type = 'api_call'
ORDER BY start_time DESC;

Best Practices

  1. Use Credential Store: Never hardcode credentials in flow definitions
  2. Set Appropriate Timeouts: Configure timeouts based on expected API response times
  3. Use Retries: Enable retries for unreliable APIs
  4. Validate Responses: Use response validation for critical API calls
  5. Leverage XCom: Use XCom to pass data between tasks dynamically
  6. Error Handling: Configure appropriate error handling and alerts
  7. Template Variables: Use Jinja2 templating for dynamic API calls