Metadata Tracking
Qarion ETL automatically tracks all operations and executions in your data pipelines, providing comprehensive visibility into flow executions, task runs, load operations, quality checks, exports, alerts, and more.
Overview
The metadata tracking system provides:
- Automatic Operation Tracking: All operations are automatically registered without additional configuration
- Comprehensive History: Complete execution history with timing, status, and results
- Hierarchical Structure: Flow executions → Task executions → Operations
- Rich Metadata: Detailed information about each operation including source/target, configuration, and results
- Queryable Database: All metadata is stored in database tables for analysis and reporting
How It Works
When you execute a flow via the CLI or programmatically, Qarion ETL automatically:
- Registers Flow Execution: Creates a record in
xt_flow_executionswhen a flow starts - Tracks Task Executions: Records each task/node execution in
xt_task_executions - Logs Operations: Captures detailed operation information in
xt_operation_logs - Records Specific Operations: Stores operation-specific details in dedicated tables:
xt_load_operations- File and database loadsxt_quality_check_executions- Quality check resultsxt_export_operations- Data exportsxt_alert_operations- Alert notificationsxt_sync_operations- File synchronizationxt_flow_trigger_operations- Flow trigger operations
Metadata Tables
Flow Executions (xt_flow_executions)
Tracks complete flow execution runs with results and timing.
Key Fields:
execution_id- Unique execution identifierflow_id- Flow that was executedbatch_id- Batch ID for this executionstart_time/end_time- Execution timingstatus- Execution status (running, success, failed, cancelled)success- Boolean success indicatorrows_loaded- Total rows loaded during executionrows_processed- Total rows processed during executionerror_message- Error message if execution failedvariables- JSON serialized variables passed to flowexecution_metadata- JSON serialized execution metadata
Example Query:
-- Get recent flow executions
SELECT
execution_id,
flow_id,
batch_id,
start_time,
end_time,
status,
rows_loaded,
rows_processed
FROM xt_flow_executions
ORDER BY start_time DESC
LIMIT 10;
Task Executions (xt_task_executions)
Tracks individual task/node executions within flows.
Key Fields:
task_execution_id- Unique task execution identifierexecution_id- Parent flow execution IDflow_id- Flow containing the tasktask_id- Task identifiernode_id- Node identifier in the DAGtask_type- Type of task (ingestion, transformation, export, etc.)node_type- Node typestart_time/end_time- Execution timingstatus- Execution statussuccess- Boolean success indicatorrecords_processed- Number of records processedrows_affected- Number of rows affectedsource_dataset_id/target_dataset_id- Source and target datasets
Example Query:
-- Get task executions for a specific flow execution
SELECT
task_execution_id,
task_id,
task_type,
status,
records_processed,
start_time,
end_time
FROM xt_task_executions
WHERE execution_id = 'your-execution-id'
ORDER BY start_time;
Operation Logs (xt_operation_logs)
Generic operation tracking for all operation types.
Key Fields:
operation_id- Unique operation identifierexecution_id- Parent flow execution IDtask_execution_id- Parent task execution IDoperation_type- Type of operation (ingestion, transformation, quality_check, export, etc.)operation_subtype- More specific type (file_load, database_load, completeness_check, etc.)flow_id/task_id- Flow and task identifiersdataset_id- Dataset involved in operationbatch_id- Batch IDstart_time/end_time- Operation timingstatus- Operation statussuccess- Boolean success indicatorrows_processed- Number of rows processedsource_info- JSON serialized source informationtarget_info- JSON serialized target informationoperation_details- JSON serialized operation-specific details
Load Operations (xt_load_operations)
Tracks file and database load operations.
Key Fields:
load_operation_id- Unique load operation identifierexecution_id- Parent flow execution IDoperation_id- Related operation log IDload_type- Type of load (file_load, database_load, directory_load, batch_load)source_type- Source type (file, directory, database)source_path- File path or source locationsource_query- SQL query for database loadstarget_table- Target table nametarget_dataset_id- Target dataset IDbatch_id- Batch IDstart_time/end_time- Operation timingstatus- Operation statussuccess- Boolean success indicatorrows_loaded- Number of rows loadedloader_config- JSON serialized loader configuration
Example Query:
-- Get load operations for a specific flow
SELECT
load_operation_id,
load_type,
source_path,
target_table,
rows_loaded,
status,
start_time
FROM xt_load_operations
WHERE execution_id = 'your-execution-id'
ORDER BY start_time;
Quality Check Executions (xt_quality_check_executions)
Tracks data quality check execution results.
Key Fields:
quality_execution_id- Unique quality check execution identifierexecution_id- Parent flow execution IDtask_execution_id- Parent task execution IDoperation_id- Related operation log IDquality_check_id- Quality check identifierquality_suite_id- Quality suite identifier (if part of a suite)check_type- Type of check (completeness, uniqueness, range, pattern, etc.)dataset_id- Dataset being checkedbatch_id- Batch IDexecution_time- When the check was executedstatus- Check status (passed, failed, warning)passed- Boolean pass indicatorrows_checked- Number of rows checkedrows_failed- Number of rows that failedfailure_rate- Percentage of failurescheck_result- JSON serialized check result details
Example Query:
-- Get quality check results for a dataset
SELECT
quality_execution_id,
check_type,
status,
rows_checked,
rows_failed,
failure_rate,
execution_time
FROM xt_quality_check_executions
WHERE dataset_id = 'your-dataset-id'
ORDER BY execution_time DESC;
Export Operations (xt_export_operations)
Tracks data export operations.
Key Fields:
export_operation_id- Unique export operation identifierexecution_id- Parent flow execution IDtask_execution_id- Parent task execution IDoperation_id- Related operation log IDexport_type- Type of export (file, database, api)export_mode- Export mode (full, batch, incremental, changes_only)source_dataset_id- Source dataset IDdestination_path- Export destination pathdestination_format- Export format (csv, json, parquet, etc.)batch_id- Batch IDstart_time/end_time- Operation timingstatus- Operation statussuccess- Boolean success indicatorrows_exported- Number of rows exportedexport_config- JSON serialized export configuration
Alert Operations (xt_alert_operations)
Tracks alert/notification operations.
Key Fields:
alert_operation_id- Unique alert operation identifierexecution_id- Parent flow execution IDtask_execution_id- Parent task execution IDoperation_id- Related operation log IDalert_type- Type of alert (email, webhook, slack, teams)channel- Alert channelrecipients- JSON array of recipientssubject- Alert subjectmessage- Alert messagecondition_met- Boolean indicating if alert condition was metsent_time- When alert was sentstatus- Alert status (sent, failed, skipped)success- Boolean success indicator
Sync Operations (xt_sync_operations)
Tracks file synchronization operations.
Key Fields:
sync_operation_id- Unique sync operation identifierexecution_id- Parent flow execution IDtask_execution_id- Parent task execution IDoperation_id- Related operation log IDsource_path- Source pathdestination_path- Destination pathpattern- File pattern filterrecursive- Boolean indicating recursive syncoverwrite- Boolean indicating if files should be overwrittenstart_time/end_time- Operation timingstatus- Operation statussuccess- Boolean success indicatorfiles_synced- Number of files syncedfiles_skipped- Number of files skippedsync_details- JSON serialized sync details
Flow Trigger Operations (xt_flow_trigger_operations)
Tracks flow trigger operations (when one flow triggers another).
Key Fields:
trigger_operation_id- Unique trigger operation identifierexecution_id- Parent flow execution ID (source flow)task_execution_id- Parent task execution IDoperation_id- Related operation log IDsource_flow_id- Flow that triggeredtarget_flow_id- Flow that was triggeredtriggered_execution_id- Execution ID of the triggered flowbatch_id- Source batch IDtarget_batch_id- Target batch IDwait_for_completion- Boolean indicating if trigger waited for completioncondition- Trigger condition (success, failure, always)variables- JSON serialized variables passed to triggered flowtrigger_time- When flow was triggeredcompletion_time- When triggered flow completedstatus- Trigger status (triggered, completed, failed, skipped)success- Boolean success indicator
Accessing Metadata
Via SQL Queries
All metadata is stored in database tables with the xt_ prefix (configurable via namespace). You can query these tables directly using SQL:
-- Get execution summary for a flow
SELECT
fe.execution_id,
fe.flow_id,
fe.batch_id,
fe.start_time,
fe.end_time,
fe.status,
fe.rows_loaded,
fe.rows_processed,
COUNT(te.task_execution_id) as task_count
FROM xt_flow_executions fe
LEFT JOIN xt_task_executions te ON fe.execution_id = te.execution_id
WHERE fe.flow_id = 'your-flow-id'
GROUP BY fe.execution_id
ORDER BY fe.start_time DESC;
Via Python API
You can access metadata programmatically:
from qarion_etl.db_service import DatabaseService
from qarion_etl.engines import EngineLoader
# Initialize engine and database service
engine = EngineLoader.get_engine('sqlite', {'database_path': 'your_db.db'})
db_service = DatabaseService(engine)
# Query flow executions
select_statement = """
SELECT * FROM xt_flow_executions
WHERE flow_id = ?
ORDER BY start_time DESC
LIMIT 10
"""
results = engine.get_dataframe(select_statement, ('your-flow-id',))
Use Cases
Monitoring Flow Executions
Track the success rate and performance of your flows:
-- Flow execution statistics
SELECT
flow_id,
COUNT(*) as total_executions,
SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as successful,
SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END) as failed,
AVG(rows_processed) as avg_rows_processed,
AVG((julianday(end_time) - julianday(start_time)) * 86400) as avg_duration_seconds
FROM xt_flow_executions
WHERE start_time > datetime('now', '-7 days')
GROUP BY flow_id;
Debugging Failed Operations
Identify and investigate failed operations:
-- Get failed operations with details
SELECT
ol.operation_id,
ol.operation_type,
ol.operation_subtype,
ol.flow_id,
ol.task_id,
ol.error_message,
ol.start_time,
fe.execution_id
FROM xt_operation_logs ol
JOIN xt_flow_executions fe ON ol.execution_id = fe.execution_id
WHERE ol.success = 0
ORDER BY ol.start_time DESC
LIMIT 20;
Performance Analysis
Analyze operation performance and identify bottlenecks:
-- Average execution time by operation type
SELECT
operation_type,
operation_subtype,
COUNT(*) as count,
AVG((julianday(end_time) - julianday(start_time)) * 86400) as avg_duration_seconds,
AVG(rows_processed) as avg_rows_processed
FROM xt_operation_logs
WHERE end_time IS NOT NULL
GROUP BY operation_type, operation_subtype
ORDER BY avg_duration_seconds DESC;
Quality Check Monitoring
Monitor data quality trends over time:
-- Quality check trends
SELECT
DATE(execution_time) as check_date,
check_type,
COUNT(*) as check_count,
SUM(CASE WHEN passed = 1 THEN 1 ELSE 0 END) as passed_count,
SUM(CASE WHEN passed = 0 THEN 1 ELSE 0 END) as failed_count,
AVG(failure_rate) as avg_failure_rate
FROM xt_quality_check_executions
WHERE execution_time > datetime('now', '-30 days')
GROUP BY DATE(execution_time), check_type
ORDER BY check_date DESC, check_type;
Configuration
Metadata tracking is automatically enabled when using the CLI or when initializing FlowExecutionService with a MetadataRegistry. The metadata tables are automatically created when DatabaseService is initialized.
Metadata Namespace
By default, metadata tables use the xt_ prefix. You can customize this via the metadata_namespace parameter:
from qarion_etl.db_service import DatabaseService
db_service = DatabaseService(engine, metadata_namespace='custom')
# Tables will be prefixed with 'custom_' instead of 'xt_'
Best Practices
- Regular Cleanup: Consider archiving old metadata records to maintain performance
- Indexing: Add indexes on frequently queried columns (flow_id, execution_id, start_time)
- Monitoring: Set up alerts based on metadata (e.g., alert on high failure rates)
- Analysis: Use metadata for capacity planning and performance optimization
- Auditing: Metadata provides a complete audit trail of all operations
Related Documentation
- Flows Guide - Understanding flow execution
- Data Quality - Quality check system
- Data Export - Export operations
- Engines and Storage - Database configuration