Skip to main content

Code Guide

This guide helps developers understand the Qarion ETL codebase structure, organization, and common patterns.

Codebase Overview

Qarion ETL follows a modular architecture with clear separation of concerns:

qarion_etl/
├── engines/ # Database engine implementations
├── transformations/ # Transformation instruction system
├── flows/ # Flow execution and DAG system
├── quality/ # Data quality checking system
├── connectors/ # Data source connectors
├── loaders/ # File loaders
├── exporters/ # Data exporters
├── repository/ # Metadata repository
├── plugins/ # Plugin system infrastructure
└── cli/ # Command-line interface

Module Organization

Quality Module Structure

The quality module (qarion_etl/quality/) demonstrates the plugin-based architecture:

quality/
├── __init__.py # Public API exports
├── base.py # Base classes and enums
├── instructions.py # Engine-agnostic instructions
├── planning.py # Quality check planning
├── service.py # Quality service orchestration
├── executors.py # Engine-specific executors
├── pipeline.py # Complete processing pipeline
├── sql_generator.py # SQL query generation
├── transformation_converter.py # Converts to transformation instructions
├── reconciliation.py # Reconciliation checks
├── query_consolidation.py # Backward compatibility shim
├── consolidators/ # Query consolidation strategies
│ ├── base.py
│ ├── cross_type.py
│ ├── grouping.py
│ └── models.py
├── optimizers/ # Query optimizers
│ ├── base.py
│ ├── main.py
│ ├── completeness.py
│ ├── uniqueness.py
│ ├── range.py
│ └── multi_step.py
└── dq_check/ # DQ Check Converter Plugins
├── __init__.py
├── base.py # Plugin base class
├── registry.py # Plugin registry
├── _init_plugins.py # Auto-registration
├── completeness.py # Completeness plugin
├── uniqueness.py # Uniqueness plugin
├── range.py # Range plugin
├── pattern.py # Pattern plugin
└── referential_integrity.py # Referential integrity plugin

Key Patterns

1. Plugin Architecture

Qarion ETL uses a plugin-based architecture for extensibility:

Plugin Base Class:

from abc import ABC, abstractmethod

class DQCheckConverterPlugin(ABC):
@property
@abstractmethod
def check_type(self) -> QualityCheckType:
"""Return the check type this plugin handles."""
pass

@abstractmethod
def generate_operations(self, instruction) -> List[Dict[str, Any]]:
"""Generate transformation operations."""
pass

Plugin Registration:

from qarion_etl.quality.dq_check import register_converter_plugin

register_converter_plugin(MyPlugin())

Plugin Discovery:

from qarion_etl.quality.dq_check import get_converter_plugin

plugin = get_converter_plugin(QualityCheckType.COMPLETENESS)
operations = plugin.generate_operations(instruction)

2. Instruction-Based Design

The system uses engine-agnostic instructions that are converted to engine-specific code:

# Instruction (engine-agnostic)
instruction = QualityCheckInstruction(
table_name='orders',
check_type=QualityCheckType.COMPLETENESS,
columns=['order_id']
)

# Converted to transformation operations via plugin
# Then executed by engine-specific executor

3. Service Layer Pattern

Services orchestrate execution:

# Service orchestrates components
service = QualityService(engine)
result = service.execute_plan(plan)

# Service uses:
# - Executor (engine-specific execution)
# - Consolidator (query optimization)
# - Pipeline (complete processing)

4. Pipeline Pattern

Complex processing uses pipelines:

pipeline = QualityCheckPipeline(
engine=engine,
db_service=db_service,
enable_consolidation=True,
enable_optimization=True
)

# Pipeline stages:
# 1. Consolidation
# 2. Conversion (via plugins)
# 3. Optimization
# 4. SQL Generation

Code Reading Guide

Understanding a Module

When exploring a module, follow this order:

  1. Read __init__.py: Understand the public API
  2. Read base classes: Understand the abstractions
  3. Read service/executor classes: Understand execution flow
  4. Read plugin implementations: Understand extensibility points

Example: Understanding Quality Checks

  1. Start with quality/__init__.py: See what's exported
  2. Read quality/base.py: Understand enums and base classes
  3. Read quality/instructions.py: Understand instruction structure
  4. Read quality/service.py: Understand execution orchestration
  5. Read quality/dq_check/base.py: Understand plugin interface
  6. Read quality/dq_check/completeness.py: See plugin implementation
  7. Read quality/pipeline.py: Understand complete processing

Common Code Patterns

1. Lazy Imports

Used to avoid circular dependencies:

# Lazy import to avoid circular dependencies
try:
from qarion_etl.transformations.instructions import TransformationInstruction
except ImportError:
TransformationInstruction = None

2. Registry Pattern

Plugins are registered in registries:

class DQCheckConverterRegistry:
_plugins: Dict[QualityCheckType, DQCheckConverterPlugin] = {}

@classmethod
def register(cls, plugin: DQCheckConverterPlugin):
cls._plugins[plugin.check_type] = plugin

3. Factory Pattern

Services create appropriate executors:

engine_name = engine.__class__.__name__.lower()
if 'sqlite' in engine_name:
self.executor = SQLQualityCheckExecutor(engine)

4. Strategy Pattern

Different strategies for consolidation/optimization:

# Consolidator uses different strategies
consolidator = QualityCheckQueryConsolidator()
# Uses grouping strategy, cross-type strategy, etc.

File Naming Conventions

  • base.py: Base classes and interfaces
  • service.py: Service orchestration classes
  • executors.py: Engine-specific executors
  • instructions.py: Engine-agnostic instructions
  • planning.py: Planning and plan classes
  • registry.py: Plugin registries
  • _init_plugins.py: Plugin auto-registration
  • __init__.py: Module exports

Import Patterns

Public API Imports

Always import from the module's __init__.py:

# Good
from qarion_etl.quality import QualityService, QualityCheckInstruction

# Avoid
from qarion_etl.quality.service import QualityService

Internal Imports

Use relative imports within a package:

# Within quality module
from .base import QualityCheckType
from .instructions import QualityCheckInstruction

Extension Points

Adding a New Quality Check Type

  1. Create plugin class in quality/dq_check/:
class MyCheckConverterPlugin(DQCheckConverterPlugin):
@property
def check_type(self) -> QualityCheckType:
return QualityCheckType.CUSTOM # Or add new type

def generate_operations(self, instruction):
# Implementation
pass
  1. Register plugin:
from qarion_etl.quality.dq_check import register_converter_plugin
register_converter_plugin(MyCheckConverterPlugin())
  1. Add to enum (if new type):
# In quality/base.py
class QualityCheckType(Enum):
MY_NEW_TYPE = "my_new_type"

Testing Patterns

Unit Tests

Test individual components:

def test_completeness_plugin():
plugin = CompletenessConverterPlugin()
instruction = QualityCheckInstruction(...)
operations = plugin.generate_operations(instruction)
assert len(operations) > 0

Integration Tests

Test complete flows:

def test_quality_pipeline():
pipeline = QualityCheckPipeline(engine, db_service)
result = pipeline.process_plan(plan)
assert 'sql_queries' in result

Code Navigation Tips

Finding Implementation

  1. Start with public API (__init__.py)
  2. Follow imports to find implementations
  3. Check plugin registries for extensibility points
  4. Look for _init_plugins.py for auto-registration

Understanding Execution Flow

  1. Entry point: Service or CLI command
  2. Orchestration: Service classes
  3. Execution: Executor classes
  4. Conversion: Plugin classes
  5. Generation: Generator classes

Quality Module Deep Dive

Architecture Flow

QualityCheckPlan

QualityCheckQueryConsolidator (consolidates compatible checks)

ConsolidatedPlan

QualityCheckToTransformationConverter (uses plugins)

TransformationInstruction

MultiStepQualityOptimizer (optimizes)

OptimizedQuery

SQLTransformationExecutor (generates SQL)

SQL Query

Plugin System Flow

QualityCheckInstruction

get_converter_plugin(check_type) # Registry lookup

DQCheckConverterPlugin.generate_operations()

List[Dict[str, Any]] # Transformation operations

TransformationRule

Key Classes

  • QualityCheckInstruction: Engine-agnostic check definition
  • QualityCheckPlan: Collection of checks to execute
  • QualityService: Orchestrates execution
  • QualityCheckPipeline: Complete processing pipeline
  • DQCheckConverterPlugin: Plugin base class
  • QualityCheckQueryConsolidator: Consolidates queries
  • SQLQualityCheckExecutor: Executes SQL-based checks

Best Practices

  1. Use Instructions: Always use instruction-based approach, not concrete classes
  2. Extend via Plugins: Create plugins for new functionality
  3. Follow Patterns: Use established patterns (registry, factory, strategy)
  4. Document Public API: Keep __init__.py clean with clear exports
  5. Lazy Imports: Use lazy imports to avoid circular dependencies
  6. Type Hints: Use type hints for better IDE support
  7. Abstract Base Classes: Use ABC for plugin interfaces