Skip to main content

Metadata Tracking

Qarion ETL automatically tracks all operations and executions in your data pipelines, providing comprehensive visibility into flow executions, task runs, load operations, quality checks, exports, alerts, and more.

Overview

The metadata tracking system provides:

  • Automatic Operation Tracking: All operations are automatically registered without additional configuration
  • Comprehensive History: Complete execution history with timing, status, and results
  • Hierarchical Structure: Flow executions → Task executions → Operations
  • Rich Metadata: Detailed information about each operation including source/target, configuration, and results
  • Queryable Database: All metadata is stored in database tables for analysis and reporting

How It Works

When you execute a flow via the CLI or programmatically, Qarion ETL automatically:

  1. Registers Flow Execution: Creates a record in xt_flow_executions when a flow starts
  2. Tracks Task Executions: Records each task/node execution in xt_task_executions
  3. Logs Operations: Captures detailed operation information in xt_operation_logs
  4. Records Specific Operations: Stores operation-specific details in dedicated tables:
    • xt_load_operations - File and database loads
    • xt_quality_check_executions - Quality check results
    • xt_export_operations - Data exports
    • xt_alert_operations - Alert notifications
    • xt_sync_operations - File synchronization
    • xt_flow_trigger_operations - Flow trigger operations

Metadata Tables

Flow Executions (xt_flow_executions)

Tracks complete flow execution runs with results and timing.

Key Fields:

  • execution_id - Unique execution identifier
  • flow_id - Flow that was executed
  • batch_id - Batch ID for this execution
  • start_time / end_time - Execution timing
  • status - Execution status (running, success, failed, cancelled)
  • success - Boolean success indicator
  • rows_loaded - Total rows loaded during execution
  • rows_processed - Total rows processed during execution
  • error_message - Error message if execution failed
  • variables - JSON serialized variables passed to flow
  • execution_metadata - JSON serialized execution metadata

Example Query:

-- Get recent flow executions
SELECT
execution_id,
flow_id,
batch_id,
start_time,
end_time,
status,
rows_loaded,
rows_processed
FROM xt_flow_executions
ORDER BY start_time DESC
LIMIT 10;

Task Executions (xt_task_executions)

Tracks individual task/node executions within flows.

Key Fields:

  • task_execution_id - Unique task execution identifier
  • execution_id - Parent flow execution ID
  • flow_id - Flow containing the task
  • task_id - Task identifier
  • node_id - Node identifier in the DAG
  • task_type - Type of task (ingestion, transformation, export, etc.)
  • node_type - Node type
  • start_time / end_time - Execution timing
  • status - Execution status
  • success - Boolean success indicator
  • records_processed - Number of records processed
  • rows_affected - Number of rows affected
  • source_dataset_id / target_dataset_id - Source and target datasets

Example Query:

-- Get task executions for a specific flow execution
SELECT
task_execution_id,
task_id,
task_type,
status,
records_processed,
start_time,
end_time
FROM xt_task_executions
WHERE execution_id = 'your-execution-id'
ORDER BY start_time;

Operation Logs (xt_operation_logs)

Generic operation tracking for all operation types.

Key Fields:

  • operation_id - Unique operation identifier
  • execution_id - Parent flow execution ID
  • task_execution_id - Parent task execution ID
  • operation_type - Type of operation (ingestion, transformation, quality_check, export, etc.)
  • operation_subtype - More specific type (file_load, database_load, completeness_check, etc.)
  • flow_id / task_id - Flow and task identifiers
  • dataset_id - Dataset involved in operation
  • batch_id - Batch ID
  • start_time / end_time - Operation timing
  • status - Operation status
  • success - Boolean success indicator
  • rows_processed - Number of rows processed
  • source_info - JSON serialized source information
  • target_info - JSON serialized target information
  • operation_details - JSON serialized operation-specific details

Load Operations (xt_load_operations)

Tracks file and database load operations.

Key Fields:

  • load_operation_id - Unique load operation identifier
  • execution_id - Parent flow execution ID
  • operation_id - Related operation log ID
  • load_type - Type of load (file_load, database_load, directory_load, batch_load)
  • source_type - Source type (file, directory, database)
  • source_path - File path or source location
  • source_query - SQL query for database loads
  • target_table - Target table name
  • target_dataset_id - Target dataset ID
  • batch_id - Batch ID
  • start_time / end_time - Operation timing
  • status - Operation status
  • success - Boolean success indicator
  • rows_loaded - Number of rows loaded
  • loader_config - JSON serialized loader configuration

Example Query:

-- Get load operations for a specific flow
SELECT
load_operation_id,
load_type,
source_path,
target_table,
rows_loaded,
status,
start_time
FROM xt_load_operations
WHERE execution_id = 'your-execution-id'
ORDER BY start_time;

Quality Check Executions (xt_quality_check_executions)

Tracks data quality check execution results.

Key Fields:

  • quality_execution_id - Unique quality check execution identifier
  • execution_id - Parent flow execution ID
  • task_execution_id - Parent task execution ID
  • operation_id - Related operation log ID
  • quality_check_id - Quality check identifier
  • quality_suite_id - Quality suite identifier (if part of a suite)
  • check_type - Type of check (completeness, uniqueness, range, pattern, etc.)
  • dataset_id - Dataset being checked
  • batch_id - Batch ID
  • execution_time - When the check was executed
  • status - Check status (passed, failed, warning)
  • passed - Boolean pass indicator
  • rows_checked - Number of rows checked
  • rows_failed - Number of rows that failed
  • failure_rate - Percentage of failures
  • check_result - JSON serialized check result details

Example Query:

-- Get quality check results for a dataset
SELECT
quality_execution_id,
check_type,
status,
rows_checked,
rows_failed,
failure_rate,
execution_time
FROM xt_quality_check_executions
WHERE dataset_id = 'your-dataset-id'
ORDER BY execution_time DESC;

Export Operations (xt_export_operations)

Tracks data export operations.

Key Fields:

  • export_operation_id - Unique export operation identifier
  • execution_id - Parent flow execution ID
  • task_execution_id - Parent task execution ID
  • operation_id - Related operation log ID
  • export_type - Type of export (file, database, api)
  • export_mode - Export mode (full, batch, incremental, changes_only)
  • source_dataset_id - Source dataset ID
  • destination_path - Export destination path
  • destination_format - Export format (csv, json, parquet, etc.)
  • batch_id - Batch ID
  • start_time / end_time - Operation timing
  • status - Operation status
  • success - Boolean success indicator
  • rows_exported - Number of rows exported
  • export_config - JSON serialized export configuration

Alert Operations (xt_alert_operations)

Tracks alert/notification operations.

Key Fields:

  • alert_operation_id - Unique alert operation identifier
  • execution_id - Parent flow execution ID
  • task_execution_id - Parent task execution ID
  • operation_id - Related operation log ID
  • alert_type - Type of alert (email, webhook, slack, teams)
  • channel - Alert channel
  • recipients - JSON array of recipients
  • subject - Alert subject
  • message - Alert message
  • condition_met - Boolean indicating if alert condition was met
  • sent_time - When alert was sent
  • status - Alert status (sent, failed, skipped)
  • success - Boolean success indicator

Sync Operations (xt_sync_operations)

Tracks file synchronization operations.

Key Fields:

  • sync_operation_id - Unique sync operation identifier
  • execution_id - Parent flow execution ID
  • task_execution_id - Parent task execution ID
  • operation_id - Related operation log ID
  • source_path - Source path
  • destination_path - Destination path
  • pattern - File pattern filter
  • recursive - Boolean indicating recursive sync
  • overwrite - Boolean indicating if files should be overwritten
  • start_time / end_time - Operation timing
  • status - Operation status
  • success - Boolean success indicator
  • files_synced - Number of files synced
  • files_skipped - Number of files skipped
  • sync_details - JSON serialized sync details

Flow Trigger Operations (xt_flow_trigger_operations)

Tracks flow trigger operations (when one flow triggers another).

Key Fields:

  • trigger_operation_id - Unique trigger operation identifier
  • execution_id - Parent flow execution ID (source flow)
  • task_execution_id - Parent task execution ID
  • operation_id - Related operation log ID
  • source_flow_id - Flow that triggered
  • target_flow_id - Flow that was triggered
  • triggered_execution_id - Execution ID of the triggered flow
  • batch_id - Source batch ID
  • target_batch_id - Target batch ID
  • wait_for_completion - Boolean indicating if trigger waited for completion
  • condition - Trigger condition (success, failure, always)
  • variables - JSON serialized variables passed to triggered flow
  • trigger_time - When flow was triggered
  • completion_time - When triggered flow completed
  • status - Trigger status (triggered, completed, failed, skipped)
  • success - Boolean success indicator

Accessing Metadata

Via SQL Queries

All metadata is stored in database tables with the xt_ prefix (configurable via namespace). You can query these tables directly using SQL:

-- Get execution summary for a flow
SELECT
fe.execution_id,
fe.flow_id,
fe.batch_id,
fe.start_time,
fe.end_time,
fe.status,
fe.rows_loaded,
fe.rows_processed,
COUNT(te.task_execution_id) as task_count
FROM xt_flow_executions fe
LEFT JOIN xt_task_executions te ON fe.execution_id = te.execution_id
WHERE fe.flow_id = 'your-flow-id'
GROUP BY fe.execution_id
ORDER BY fe.start_time DESC;

Via Python API

You can access metadata programmatically:

from qarion_etl.db_service import DatabaseService
from qarion_etl.engines import EngineLoader

# Initialize engine and database service
engine = EngineLoader.get_engine('sqlite', {'database_path': 'your_db.db'})
db_service = DatabaseService(engine)

# Query flow executions
select_statement = """
SELECT * FROM xt_flow_executions
WHERE flow_id = ?
ORDER BY start_time DESC
LIMIT 10
"""
results = engine.get_dataframe(select_statement, ('your-flow-id',))

Use Cases

Monitoring Flow Executions

Track the success rate and performance of your flows:

-- Flow execution statistics
SELECT
flow_id,
COUNT(*) as total_executions,
SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as successful,
SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END) as failed,
AVG(rows_processed) as avg_rows_processed,
AVG((julianday(end_time) - julianday(start_time)) * 86400) as avg_duration_seconds
FROM xt_flow_executions
WHERE start_time > datetime('now', '-7 days')
GROUP BY flow_id;

Debugging Failed Operations

Identify and investigate failed operations:

-- Get failed operations with details
SELECT
ol.operation_id,
ol.operation_type,
ol.operation_subtype,
ol.flow_id,
ol.task_id,
ol.error_message,
ol.start_time,
fe.execution_id
FROM xt_operation_logs ol
JOIN xt_flow_executions fe ON ol.execution_id = fe.execution_id
WHERE ol.success = 0
ORDER BY ol.start_time DESC
LIMIT 20;

Performance Analysis

Analyze operation performance and identify bottlenecks:

-- Average execution time by operation type
SELECT
operation_type,
operation_subtype,
COUNT(*) as count,
AVG((julianday(end_time) - julianday(start_time)) * 86400) as avg_duration_seconds,
AVG(rows_processed) as avg_rows_processed
FROM xt_operation_logs
WHERE end_time IS NOT NULL
GROUP BY operation_type, operation_subtype
ORDER BY avg_duration_seconds DESC;

Quality Check Monitoring

Monitor data quality trends over time:

-- Quality check trends
SELECT
DATE(execution_time) as check_date,
check_type,
COUNT(*) as check_count,
SUM(CASE WHEN passed = 1 THEN 1 ELSE 0 END) as passed_count,
SUM(CASE WHEN passed = 0 THEN 1 ELSE 0 END) as failed_count,
AVG(failure_rate) as avg_failure_rate
FROM xt_quality_check_executions
WHERE execution_time > datetime('now', '-30 days')
GROUP BY DATE(execution_time), check_type
ORDER BY check_date DESC, check_type;

Configuration

Metadata tracking is automatically enabled when using the CLI or when initializing FlowExecutionService with a MetadataRegistry. The metadata tables are automatically created when DatabaseService is initialized.

Metadata Namespace

By default, metadata tables use the xt_ prefix. You can customize this via the metadata_namespace parameter:

from qarion_etl.db_service import DatabaseService

db_service = DatabaseService(engine, metadata_namespace='custom')
# Tables will be prefixed with 'custom_' instead of 'xt_'

Best Practices

  1. Regular Cleanup: Consider archiving old metadata records to maintain performance
  2. Indexing: Add indexes on frequently queried columns (flow_id, execution_id, start_time)
  3. Monitoring: Set up alerts based on metadata (e.g., alert on high failure rates)
  4. Analysis: Use metadata for capacity planning and performance optimization
  5. Auditing: Metadata provides a complete audit trail of all operations