Skip to main content

Developer Guide

This guide provides detailed information for developers working on Qarion ETL.

Development Setup

Prerequisites

  • Python 3.11+
  • Poetry (for dependency management)
  • Git

Setup Steps

# Clone repository
git clone https://github.com/yourorg/qarion-etl.git
cd qarion-etl

# Install dependencies
poetry install

# Run tests
poetry run pytest

# Run linting
poetry run ruff check .

Project Structure

qarion_etl/
├── qarion_etl/ # Main package
│ ├── cli/ # CLI commands
│ ├── flows/ # Flow system
│ ├── transformations/ # Transformation system
│ ├── engines/ # Engine implementations
│ ├── migrations/ # Migration system
│ ├── loaders/ # File loaders
│ └── repository/ # Repository pattern
├── tests/ # Test suite
├── docs/ # Documentation
└── README.md # Project overview

Adding a New Flow Type

Before creating a new flow type, review the Flows documentation to understand existing patterns and when to create a new flow type vs. using an existing one.

Step 1: Understand Flow Patterns

Each flow type implements a specific pattern:

  • Change Feed: State comparison pattern for change tracking
  • Delta Publishing: Transaction pattern for financial systems
  • Sessionization: Time-window grouping pattern for event streams

Consider:

  • Does your use case fit an existing pattern?
  • What transformation logic is unique to your flow?
  • What datasets do you need to create?

Step 2: Create Plugin Class

Create a new file in flows/plugins/your_flow_type/plugin.py:

from flows.base import FlowPlugin
from typing import Dict, Any, List, Optional, Tuple
from flows.flow_orchestration import FlowDAG, DAGNode, StepType

class YourFlowPlugin(FlowPlugin):
"""
Plugin for your_flow_type flow.

Describe what pattern this flow implements and when to use it.
"""

@property
def flow_type(self) -> str:
return "your_flow_type"

@property
def name(self) -> str:
return "Your Flow Type"

@property
def description(self) -> str:
return "Description of your flow type and when to use it"

def get_required_table_types(self) -> List[str]:
"""Returns required table types for this flow."""
return ['landing', 'your_output_type']

def validate_flow_definition(self, flow_definition: Dict[str, Any]) -> bool:
"""Validate flow definition has required properties."""
super().validate_flow_definition(flow_definition)
# Add flow-specific validation
return True

def generate_datasets(self, flow_definition: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Generate dataset definitions for your flow type.

Typically creates:
- Landing table (raw input)
- Output table(s) (transformed data)
"""
# Implementation
pass

def generate_dag(
self,
flow_definition: Dict[str, Any],
datasets: List[Dict[str, Any]]
) -> FlowDAG:
"""
Generate execution DAG for your flow type.

Creates nodes for:
- Ingestion (if file loading is configured)
- Transformations (landing → output)
"""
# Implementation
pass

def generate_transformation_instructions(
self,
flow_definition: Dict[str, Any],
source_dataset: Dict[str, Any],
target_dataset: Dict[str, Any],
batch_id: int,
previous_batch_id: Optional[int] = None
) -> Optional['TransformationInstruction']:
"""
Generate transformation instructions for your flow type.

Creates engine-agnostic instructions that describe the transformation logic.
"""
# Implementation
pass

Step 3: Register Plugin

In flows/plugins/_init_plugins.py:

from flows.plugins.your_flow_type.plugin import YourFlowPlugin
from flows import register_flow_plugin

register_flow_plugin(YourFlowPlugin())

Step 4: Implement Required Methods

See existing flow plugins for examples:

  • Change Feed: flows/plugins/change_feed/ - State comparison pattern
  • Delta Publishing: flows/plugins/delta_publishing/ - Transaction pattern
  • Sessionization: flows/plugins/sessionization/ - Time-window grouping

Key methods to implement:

  • generate_datasets(): Create dataset definitions with proper schemas
  • generate_dag(): Create execution DAG with nodes and edges
  • generate_transformation_instructions(): Generate transformation logic
  • validate_flow_definition(): Validate flow-specific requirements

Step 5: Add Tests

Create tests in tests/test_your_flow_type.py:

def test_generate_datasets():
"""Test dataset generation."""
pass

def test_generate_dag():
"""Test DAG generation."""
pass

def test_transformation_instructions():
"""Test transformation instruction generation."""
pass

Step 6: Document Your Flow Type

Add documentation to docs/FLOWS.md explaining:

  • What pattern it implements
  • When to use it
  • How it works
  • Example configuration

Adding a New Engine

Step 1: Create Engine Class

Create engines/your_engine.py:

from engines.base import BaseEngine
from typing import Dict, Any, List

class YourEngine(BaseEngine):
def connect(self):
# Establish connection
pass

def execute_query(self, query: str, params: tuple = None):
# Execute query
pass

def get_dataframe(self, statement: str, params: tuple = None):
# Get data
pass

Step 2: Create Dialect Adapter

Create engines/dialects.py (or add to existing):

from engines.dialects import BaseDialectAdapter

class YourDialectAdapter(BaseDialectAdapter):
def get_select_statement(self, ...):
# Convert to your engine's syntax
pass
# ... implement other methods

Step 3: Register Engine

In engines/loader.py:

from engines.your_engine import YourEngine
from engines.dialects import YourDialectAdapter

class EngineLoader:
ENGINE_MAP = {
# ... existing engines
"your_engine": YourEngine,
}

DIALECT_MAP = {
# ... existing adapters
"your_engine": YourDialectAdapter,
}

Adding a New Executor

Step 1: Create Executor Class

Create transformations/executors/your_executor.py:

from transformations.executors.base import BaseTransformationExecutor
from transformations.instructions import TransformationInstruction

class YourTransformationExecutor(BaseTransformationExecutor):
def __init__(self, engine, db_service):
self.engine = engine
self.db_service = db_service

def execute_transformation_instruction(self, instruction: TransformationInstruction):
# Execute transformation
pass

Step 2: Update Service

In transformations/service.py:

from transformations.executors import YourTransformationExecutor

class TransformationService:
def __init__(self, engine, db_service):
# ... existing code
if engine.__class__.__name__ == 'YourEngine':
self.executor = YourTransformationExecutor(engine, db_service)

Adding a New File Loader

Step 1: Create Loader Class

Create loaders/your_loader.py:

from loaders.base import BaseFileLoader, FileLoadResult

class YourFileLoader(BaseFileLoader):
@property
def supported_format(self) -> str:
return "your_format"

def can_load(self, file_path: str) -> bool:
# Check if file can be loaded
pass

def load(self, file_path, target_table, engine, batch_id, config):
# Load data
pass

Step 2: Register Loader

In loaders/factory.py:

from loaders.your_loader import YourFileLoader

class FileLoaderFactory:
_LOADERS = {
# ... existing loaders
'your_format': YourFileLoader,
}

Testing

Writing Tests

Tests should be placed in tests/ directory:

import pytest
from qarion_etl import YourClass

def test_your_function():
# Test implementation
result = YourClass().your_function()
assert result == expected_value

Running Tests

# Run all tests
pytest

# Run specific test file
pytest tests/test_your_file.py

# Run with coverage
pytest --cov=qarion-etl --cov-report=html

Test Structure

  • Unit Tests: Test individual components
  • Integration Tests: Test component interactions
  • End-to-End Tests: Test complete workflows

Code Style

Python Style Guide

  • Follow PEP 8
  • Use type hints
  • Document all public functions
  • Keep functions focused and small

Naming Conventions

  • Classes: PascalCase (e.g., MyClass)
  • Functions: snake_case (e.g., my_function)
  • Constants: UPPER_SNAKE_CASE (e.g., MY_CONSTANT)
  • Private: Leading underscore (e.g., _private_method)

Documentation

  • Use docstrings for all public functions
  • Include type information
  • Provide usage examples where helpful

Debugging

Logging

Qarion ETL uses Python's logging module:

import logging

logger = logging.getLogger(__name__)
logger.debug("Debug message")
logger.info("Info message")
logger.warning("Warning message")
logger.error("Error message")

Common Issues

  1. Import Errors: Check that modules are in correct locations
  2. Engine Errors: Verify engine is properly connected
  3. Transformation Errors: Check transformation instructions
  4. File Loading Errors: Verify file format and permissions

Performance Tips

  1. Use Batch Operations: Process multiple records together
  2. Avoid N+1 Queries: Use joins instead of multiple queries
  3. Cache Results: Cache frequently accessed data
  4. Profile Code: Use profiling tools to identify bottlenecks

Contributing

Pull Request Process

  1. Create feature branch
  2. Make changes
  3. Add tests
  4. Update documentation
  5. Submit pull request

Code Review

  • All code must be reviewed
  • Tests must pass
  • Documentation must be updated
  • Code style must be followed

Resources