Developer Guide
This guide provides detailed information for developers working on Qarion ETL.
Development Setup
Prerequisites
- Python 3.11+
- Poetry (for dependency management)
- Git
Setup Steps
# Clone repository
git clone https://github.com/yourorg/qarion-etl.git
cd qarion-etl
# Install dependencies
poetry install
# Run tests
poetry run pytest
# Run linting
poetry run ruff check .
Project Structure
qarion_etl/
├── qarion_etl/ # Main package
│ ├── cli/ # CLI commands
│ ├── flows/ # Flow system
│ ├── transformations/ # Transformation system
│ ├── engines/ # Engine implementations
│ ├── migrations/ # Migration system
│ ├── loaders/ # File loaders
│ └── repository/ # Repository pattern
├── tests/ # Test suite
├── docs/ # Documentation
└── README.md # Project overview
Adding a New Flow Type
Before creating a new flow type, review the Flows documentation to understand existing patterns and when to create a new flow type vs. using an existing one.
Step 1: Understand Flow Patterns
Each flow type implements a specific pattern:
- Change Feed: State comparison pattern for change tracking
- Delta Publishing: Transaction pattern for financial systems
- Sessionization: Time-window grouping pattern for event streams
Consider:
- Does your use case fit an existing pattern?
- What transformation logic is unique to your flow?
- What datasets do you need to create?
Step 2: Create Plugin Class
Create a new file in flows/plugins/your_flow_type/plugin.py:
from flows.base import FlowPlugin
from typing import Dict, Any, List, Optional, Tuple
from flows.flow_orchestration import FlowDAG, DAGNode, StepType
class YourFlowPlugin(FlowPlugin):
"""
Plugin for your_flow_type flow.
Describe what pattern this flow implements and when to use it.
"""
@property
def flow_type(self) -> str:
return "your_flow_type"
@property
def name(self) -> str:
return "Your Flow Type"
@property
def description(self) -> str:
return "Description of your flow type and when to use it"
def get_required_table_types(self) -> List[str]:
"""Returns required table types for this flow."""
return ['landing', 'your_output_type']
def validate_flow_definition(self, flow_definition: Dict[str, Any]) -> bool:
"""Validate flow definition has required properties."""
super().validate_flow_definition(flow_definition)
# Add flow-specific validation
return True
def generate_datasets(self, flow_definition: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Generate dataset definitions for your flow type.
Typically creates:
- Landing table (raw input)
- Output table(s) (transformed data)
"""
# Implementation
pass
def generate_dag(
self,
flow_definition: Dict[str, Any],
datasets: List[Dict[str, Any]]
) -> FlowDAG:
"""
Generate execution DAG for your flow type.
Creates nodes for:
- Ingestion (if file loading is configured)
- Transformations (landing → output)
"""
# Implementation
pass
def generate_transformation_instructions(
self,
flow_definition: Dict[str, Any],
source_dataset: Dict[str, Any],
target_dataset: Dict[str, Any],
batch_id: int,
previous_batch_id: Optional[int] = None
) -> Optional['TransformationInstruction']:
"""
Generate transformation instructions for your flow type.
Creates engine-agnostic instructions that describe the transformation logic.
"""
# Implementation
pass
Step 3: Register Plugin
In flows/plugins/_init_plugins.py:
from flows.plugins.your_flow_type.plugin import YourFlowPlugin
from flows import register_flow_plugin
register_flow_plugin(YourFlowPlugin())
Step 4: Implement Required Methods
See existing flow plugins for examples:
- Change Feed:
flows/plugins/change_feed/- State comparison pattern - Delta Publishing:
flows/plugins/delta_publishing/- Transaction pattern - Sessionization:
flows/plugins/sessionization/- Time-window grouping
Key methods to implement:
generate_datasets(): Create dataset definitions with proper schemasgenerate_dag(): Create execution DAG with nodes and edgesgenerate_transformation_instructions(): Generate transformation logicvalidate_flow_definition(): Validate flow-specific requirements
Step 5: Add Tests
Create tests in tests/test_your_flow_type.py:
def test_generate_datasets():
"""Test dataset generation."""
pass
def test_generate_dag():
"""Test DAG generation."""
pass
def test_transformation_instructions():
"""Test transformation instruction generation."""
pass
Step 6: Document Your Flow Type
Add documentation to docs/FLOWS.md explaining:
- What pattern it implements
- When to use it
- How it works
- Example configuration
Adding a New Engine
Step 1: Create Engine Class
Create engines/your_engine.py:
from engines.base import BaseEngine
from typing import Dict, Any, List
class YourEngine(BaseEngine):
def connect(self):
# Establish connection
pass
def execute_query(self, query: str, params: tuple = None):
# Execute query
pass
def get_dataframe(self, statement: str, params: tuple = None):
# Get data
pass
Step 2: Create Dialect Adapter
Create engines/dialects.py (or add to existing):
from engines.dialects import BaseDialectAdapter
class YourDialectAdapter(BaseDialectAdapter):
def get_select_statement(self, ...):
# Convert to your engine's syntax
pass
# ... implement other methods
Step 3: Register Engine
In engines/loader.py:
from engines.your_engine import YourEngine
from engines.dialects import YourDialectAdapter
class EngineLoader:
ENGINE_MAP = {
# ... existing engines
"your_engine": YourEngine,
}
DIALECT_MAP = {
# ... existing adapters
"your_engine": YourDialectAdapter,
}
Adding a New Executor
Step 1: Create Executor Class
Create transformations/executors/your_executor.py:
from transformations.executors.base import BaseTransformationExecutor
from transformations.instructions import TransformationInstruction
class YourTransformationExecutor(BaseTransformationExecutor):
def __init__(self, engine, db_service):
self.engine = engine
self.db_service = db_service
def execute_transformation_instruction(self, instruction: TransformationInstruction):
# Execute transformation
pass
Step 2: Update Service
In transformations/service.py:
from transformations.executors import YourTransformationExecutor
class TransformationService:
def __init__(self, engine, db_service):
# ... existing code
if engine.__class__.__name__ == 'YourEngine':
self.executor = YourTransformationExecutor(engine, db_service)
Adding a New File Loader
Step 1: Create Loader Class
Create loaders/your_loader.py:
from loaders.base import BaseFileLoader, FileLoadResult
class YourFileLoader(BaseFileLoader):
@property
def supported_format(self) -> str:
return "your_format"
def can_load(self, file_path: str) -> bool:
# Check if file can be loaded
pass
def load(self, file_path, target_table, engine, batch_id, config):
# Load data
pass
Step 2: Register Loader
In loaders/factory.py:
from loaders.your_loader import YourFileLoader
class FileLoaderFactory:
_LOADERS = {
# ... existing loaders
'your_format': YourFileLoader,
}
Testing
Writing Tests
Tests should be placed in tests/ directory:
import pytest
from qarion_etl import YourClass
def test_your_function():
# Test implementation
result = YourClass().your_function()
assert result == expected_value
Running Tests
# Run all tests
pytest
# Run specific test file
pytest tests/test_your_file.py
# Run with coverage
pytest --cov=qarion-etl --cov-report=html
Test Structure
- Unit Tests: Test individual components
- Integration Tests: Test component interactions
- End-to-End Tests: Test complete workflows
Code Style
Python Style Guide
- Follow PEP 8
- Use type hints
- Document all public functions
- Keep functions focused and small
Naming Conventions
- Classes: PascalCase (e.g.,
MyClass) - Functions: snake_case (e.g.,
my_function) - Constants: UPPER_SNAKE_CASE (e.g.,
MY_CONSTANT) - Private: Leading underscore (e.g.,
_private_method)
Documentation
- Use docstrings for all public functions
- Include type information
- Provide usage examples where helpful
Debugging
Logging
Qarion ETL uses Python's logging module:
import logging
logger = logging.getLogger(__name__)
logger.debug("Debug message")
logger.info("Info message")
logger.warning("Warning message")
logger.error("Error message")
Common Issues
- Import Errors: Check that modules are in correct locations
- Engine Errors: Verify engine is properly connected
- Transformation Errors: Check transformation instructions
- File Loading Errors: Verify file format and permissions
Performance Tips
- Use Batch Operations: Process multiple records together
- Avoid N+1 Queries: Use joins instead of multiple queries
- Cache Results: Cache frequently accessed data
- Profile Code: Use profiling tools to identify bottlenecks
Contributing
Pull Request Process
- Create feature branch
- Make changes
- Add tests
- Update documentation
- Submit pull request
Code Review
- All code must be reviewed
- Tests must pass
- Documentation must be updated
- Code style must be followed