Code Guide
This guide helps developers understand the Qarion ETL codebase structure, organization, and common patterns.
Codebase Overview
Qarion ETL follows a modular architecture with clear separation of concerns:
qarion_etl/
├── engines/ # Database engine implementations
├── transformations/ # Transformation instruction system
├── flows/ # Flow execution and DAG system
├── quality/ # Data quality checking system
├── connectors/ # Data source connectors
├── loaders/ # File loaders
├── exporters/ # Data exporters
├── repository/ # Metadata repository
├── plugins/ # Plugin system infrastructure
└── cli/ # Command-line interface
Module Organization
Quality Module Structure
The quality module (qarion_etl/quality/) demonstrates the plugin-based architecture:
quality/
├── __init__.py # Public API exports
├── base.py # Base classes and enums
├── instructions.py # Engine-agnostic instructions
├── planning.py # Quality check planning
├── service.py # Quality service orchestration
├── executors.py # Engine-specific executors
├── pipeline.py # Complete processing pipeline
├── sql_generator.py # SQL query generation
├── transformation_converter.py # Converts to transformation instructions
├── reconciliation.py # Reconciliation checks
├── query_consolidation.py # Backward compatibility shim
├── consolidators/ # Query consolidation strategies
│ ├── base.py
│ ├── cross_type.py
│ ├── grouping.py
│ └── models.py
├── optimizers/ # Query optimizers
│ ├── base.py
│ ├── main.py
│ ├── completeness.py
│ ├── uniqueness.py
│ ├── range.py
│ └── multi_step.py
└── dq_check/ # DQ Check Converter Plugins
├── __init__.py
├── base.py # Plugin base class
├── registry.py # Plugin registry
├── _init_plugins.py # Auto-registration
├── completeness.py # Completeness plugin
├── uniqueness.py # Uniqueness plugin
├── range.py # Range plugin
├── pattern.py # Pattern plugin
└── referential_integrity.py # Referential integrity plugin
Key Patterns
1. Plugin Architecture
Qarion ETL uses a plugin-based architecture for extensibility:
Plugin Base Class:
from abc import ABC, abstractmethod
class DQCheckConverterPlugin(ABC):
@property
@abstractmethod
def check_type(self) -> QualityCheckType:
"""Return the check type this plugin handles."""
pass
@abstractmethod
def generate_operations(self, instruction) -> List[Dict[str, Any]]:
"""Generate transformation operations."""
pass
Plugin Registration:
from qarion_etl.quality.dq_check import register_converter_plugin
register_converter_plugin(MyPlugin())
Plugin Discovery:
from qarion_etl.quality.dq_check import get_converter_plugin
plugin = get_converter_plugin(QualityCheckType.COMPLETENESS)
operations = plugin.generate_operations(instruction)
2. Instruction-Based Design
The system uses engine-agnostic instructions that are converted to engine-specific code:
# Instruction (engine-agnostic)
instruction = QualityCheckInstruction(
table_name='orders',
check_type=QualityCheckType.COMPLETENESS,
columns=['order_id']
)
# Converted to transformation operations via plugin
# Then executed by engine-specific executor
3. Service Layer Pattern
Services orchestrate execution:
# Service orchestrates components
service = QualityService(engine)
result = service.execute_plan(plan)
# Service uses:
# - Executor (engine-specific execution)
# - Consolidator (query optimization)
# - Pipeline (complete processing)
4. Pipeline Pattern
Complex processing uses pipelines:
pipeline = QualityCheckPipeline(
engine=engine,
db_service=db_service,
enable_consolidation=True,
enable_optimization=True
)
# Pipeline stages:
# 1. Consolidation
# 2. Conversion (via plugins)
# 3. Optimization
# 4. SQL Generation
Code Reading Guide
Understanding a Module
When exploring a module, follow this order:
- Read
__init__.py: Understand the public API - Read base classes: Understand the abstractions
- Read service/executor classes: Understand execution flow
- Read plugin implementations: Understand extensibility points
Example: Understanding Quality Checks
- Start with
quality/__init__.py: See what's exported - Read
quality/base.py: Understand enums and base classes - Read
quality/instructions.py: Understand instruction structure - Read
quality/service.py: Understand execution orchestration - Read
quality/dq_check/base.py: Understand plugin interface - Read
quality/dq_check/completeness.py: See plugin implementation - Read
quality/pipeline.py: Understand complete processing
Common Code Patterns
1. Lazy Imports
Used to avoid circular dependencies:
# Lazy import to avoid circular dependencies
try:
from qarion_etl.transformations.instructions import TransformationInstruction
except ImportError:
TransformationInstruction = None
2. Registry Pattern
Plugins are registered in registries:
class DQCheckConverterRegistry:
_plugins: Dict[QualityCheckType, DQCheckConverterPlugin] = {}
@classmethod
def register(cls, plugin: DQCheckConverterPlugin):
cls._plugins[plugin.check_type] = plugin
3. Factory Pattern
Services create appropriate executors:
engine_name = engine.__class__.__name__.lower()
if 'sqlite' in engine_name:
self.executor = SQLQualityCheckExecutor(engine)
4. Strategy Pattern
Different strategies for consolidation/optimization:
# Consolidator uses different strategies
consolidator = QualityCheckQueryConsolidator()
# Uses grouping strategy, cross-type strategy, etc.
File Naming Conventions
base.py: Base classes and interfacesservice.py: Service orchestration classesexecutors.py: Engine-specific executorsinstructions.py: Engine-agnostic instructionsplanning.py: Planning and plan classesregistry.py: Plugin registries_init_plugins.py: Plugin auto-registration__init__.py: Module exports
Import Patterns
Public API Imports
Always import from the module's __init__.py:
# Good
from qarion_etl.quality import QualityService, QualityCheckInstruction
# Avoid
from qarion_etl.quality.service import QualityService
Internal Imports
Use relative imports within a package:
# Within quality module
from .base import QualityCheckType
from .instructions import QualityCheckInstruction
Extension Points
Adding a New Quality Check Type
- Create plugin class in
quality/dq_check/:
class MyCheckConverterPlugin(DQCheckConverterPlugin):
@property
def check_type(self) -> QualityCheckType:
return QualityCheckType.CUSTOM # Or add new type
def generate_operations(self, instruction):
# Implementation
pass
- Register plugin:
from qarion_etl.quality.dq_check import register_converter_plugin
register_converter_plugin(MyCheckConverterPlugin())
- Add to enum (if new type):
# In quality/base.py
class QualityCheckType(Enum):
MY_NEW_TYPE = "my_new_type"
Testing Patterns
Unit Tests
Test individual components:
def test_completeness_plugin():
plugin = CompletenessConverterPlugin()
instruction = QualityCheckInstruction(...)
operations = plugin.generate_operations(instruction)
assert len(operations) > 0
Integration Tests
Test complete flows:
def test_quality_pipeline():
pipeline = QualityCheckPipeline(engine, db_service)
result = pipeline.process_plan(plan)
assert 'sql_queries' in result
Code Navigation Tips
Finding Implementation
- Start with public API (
__init__.py) - Follow imports to find implementations
- Check plugin registries for extensibility points
- Look for
_init_plugins.pyfor auto-registration
Understanding Execution Flow
- Entry point: Service or CLI command
- Orchestration: Service classes
- Execution: Executor classes
- Conversion: Plugin classes
- Generation: Generator classes
Quality Module Deep Dive
Architecture Flow
QualityCheckPlan
↓
QualityCheckQueryConsolidator (consolidates compatible checks)
↓
ConsolidatedPlan
↓
QualityCheckToTransformationConverter (uses plugins)
↓
TransformationInstruction
↓
MultiStepQualityOptimizer (optimizes)
↓
OptimizedQuery
↓
SQLTransformationExecutor (generates SQL)
↓
SQL Query
Plugin System Flow
QualityCheckInstruction
↓
get_converter_plugin(check_type) # Registry lookup
↓
DQCheckConverterPlugin.generate_operations()
↓
List[Dict[str, Any]] # Transformation operations
↓
TransformationRule
Key Classes
QualityCheckInstruction: Engine-agnostic check definitionQualityCheckPlan: Collection of checks to executeQualityService: Orchestrates executionQualityCheckPipeline: Complete processing pipelineDQCheckConverterPlugin: Plugin base classQualityCheckQueryConsolidator: Consolidates queriesSQLQualityCheckExecutor: Executes SQL-based checks
Best Practices
- Use Instructions: Always use instruction-based approach, not concrete classes
- Extend via Plugins: Create plugins for new functionality
- Follow Patterns: Use established patterns (registry, factory, strategy)
- Document Public API: Keep
__init__.pyclean with clear exports - Lazy Imports: Use lazy imports to avoid circular dependencies
- Type Hints: Use type hints for better IDE support
- Abstract Base Classes: Use ABC for plugin interfaces
Related Documentation
- Architecture Overview - System architecture
- Plugin System Overview - Plugin architecture
- Code Style - Coding standards
- Quality Check Integration - Quality check integration