Skip to main content

Code Generation Guide

Qarion ETL can generate executable code in multiple formats from flow definitions. This guide covers how to use and extend the code generation system.

Overview

The code generation system provides an extensible architecture for generating code in various formats:

  • SQL: Raw SQL files with parameterized queries
  • DBT: DBT models, schemas, and project configuration
  • Airflow: Airflow DAGs and task definitions

Architecture

Base Classes

All code generators inherit from BaseCodeGenerator:

from code_generators.base import BaseCodeGenerator

class MyCodeGenerator(BaseCodeGenerator):
@property
def format_name(self) -> str:
return "my_format"

def generate_for_node(self, ...) -> CodeGenerationResult:
# Generate code for a single node
pass

def generate_for_flow(self, ...) -> FlowCodeGenerationResult:
# Generate code for a complete flow
pass

Generator Registry

Generators are automatically registered via the plugin system:

from code_generators.plugins.registry import get_code_generator_plugin

# Get a generator plugin
plugin = get_code_generator_plugin('dbt')

# Create generator instance
generator = plugin.create_generator(config={'dialect': 'postgres'})

Built-in Generators

SQL Generator

Generates SQL files with parameterized queries.

Usage:

qarion-etl generate-code --format sql --flow my_flow --output-dir sql_output

Output:

  • {table_name}.sql - SQL files with parameterized batch IDs

DBT Generator

Generates complete DBT projects with models, schemas, and documentation.

Usage:

qarion-etl generate-code --format dbt --flow my_flow --output-dir dbt_output --dialect postgres

Output:

  • dbt_project.yml - DBT project configuration
  • models/{model_name}.sql - DBT model files
  • models/schema.yml - Model and source definitions
  • models/{model_name}.md - Documentation files

Features:

  • Dialect-aware (SQLite, PostgreSQL, MySQL, Snowflake, etc.)
  • Incremental model support
  • Source definitions for landing tables
  • Documentation generation
  • Template-based SQL generation

Airflow Generator

Generates Airflow DAGs with task definitions and dependencies.

Usage:

qarion-etl generate-code --format airflow --flow my_flow --output-dir airflow_dags

Output:

  • dags/{flow_id}_dag.py - Complete Airflow DAG file

Features:

  • Task definitions for each transformation
  • Automatic dependency management
  • Batch ID parameterization
  • Customizable execution function

Using Code Generators

Command Line

# Generate SQL
qarion-etl generate-code --format sql --flow my_flow --output-dir output

# Generate DBT
qarion-etl generate-code --format dbt --flow my_flow --output-dir dbt_project --dialect postgres

# Generate Airflow
qarion-etl generate-code --format airflow --flow my_flow --output-dir dags

Python API

from code_generators.plugins.registry import get_code_generator_plugin

# Get generator plugin
plugin = get_code_generator_plugin('dbt')

# Create generator with configuration
generator = plugin.create_generator(config={
'dialect': 'postgres',
'engine': my_engine
})

# Generate code for a flow
result = generator.generate_for_flow(
flow_definition=flow_def,
datasets=datasets,
dag=dag,
transformation_service=transformation_service,
output_dir='output',
batch_id=1,
previous_batch_id=None,
overwrite=False
)

# Check results
print(f"Generated {result.total_files} files")
print(f"Successful: {result.successful_nodes}/{result.total_nodes}")

Generator Results

CodeGenerationResult

Result for a single node:

@dataclass
class CodeGenerationResult:
node_id: str
files: List[str] # Generated file paths
errors: List[str] # Any errors encountered
metadata: Dict[str, Any] # Additional metadata

@property
def success(self) -> bool:
return len(self.errors) == 0

FlowCodeGenerationResult

Result for a complete flow:

@dataclass
class FlowCodeGenerationResult:
flow_id: str
total_nodes: int
successful_nodes: int
failed_nodes: int
total_files: int
results: List[CodeGenerationResult] # Per-node results
metadata: Dict[str, Any]

@property
def success(self) -> bool:
return self.failed_nodes == 0

Creating Custom Generators

Step 1: Create Generator Class

from code_generators.base import BaseCodeGenerator, CodeGenerationResult, FlowCodeGenerationResult

class MyCodeGenerator(BaseCodeGenerator):
@property
def format_name(self) -> str:
return "my_format"

@property
def file_extension(self) -> str:
return ".myext"

@property
def description(self) -> str:
return "Generates code in my custom format"

def generate_for_node(self, ...) -> CodeGenerationResult:
# Implement node-level code generation
pass

def generate_for_flow(self, ...) -> FlowCodeGenerationResult:
# Implement flow-level code generation
pass

Step 2: Create Plugin Wrapper

from code_generators.plugins.base import CodeGeneratorPlugin
from typing import Type
from code_generators.base import BaseCodeGenerator

class MyCodeGeneratorPlugin(CodeGeneratorPlugin):
@property
def format_name(self) -> str:
return "my_format"

@property
def name(self) -> str:
return "My Code Generator"

@property
def description(self) -> str:
return "Generates code in my custom format"

@property
def generator_class(self) -> Type[BaseCodeGenerator]:
return MyCodeGenerator

def create_generator(self, config: dict = None):
return MyCodeGenerator()

Step 3: Register Plugin

from code_generators.plugins.registry import register_code_generator_plugin

register_code_generator_plugin(MyCodeGeneratorPlugin())

DBT Generator Details

Dialect Support

The DBT generator supports multiple SQL dialects:

  • SQLite (default)
  • PostgreSQL
  • MySQL
  • Snowflake
  • Redshift
  • BigQuery

Dialect is determined by:

  1. Explicit --dialect CLI option
  2. Engine type from configuration
  3. Default to SQLite

Incremental Models

DBT generator automatically detects when models should be incremental based on flow plugin configuration:

# Flow plugin defines materialization
materialization_config = flow_plugin.get_materialization_config(...)

# DBT generator uses this to configure incremental models
if materialization_config['is_incremental']:
# Generate incremental model with unique_key, strategy, etc.

Documentation

DBT generator automatically creates documentation files:

  • One .md file per dataset/table
  • Linked from schema.yml using {{ docs('model_name') }}
  • Includes metadata, schema, dependencies, and usage examples

Templates

DBT generator uses Jinja2 templates for SQL generation:

  • templates/model.sql.j2 - Main model template
  • templates/incremental_wrapper.sql.j2 - Incremental wrapper template

Best Practices

  1. Modular Design: Break generators into focused components (e.g., SQL adapter, schema generator, model generator)

  2. Template-Based: Use templates for code generation when possible (improves maintainability)

  3. Validation: Validate generated code before writing files

  4. Error Handling: Provide clear error messages with context

  5. Documentation: Generate documentation alongside code

  6. Testing: Test generators with various flow types and configurations

Troubleshooting

Common Issues

Issue: Generated code has syntax errors

  • Solution: Check dialect configuration and template rendering

Issue: Missing dependencies in generated code

  • Solution: Verify DAG structure and node relationships

Issue: Incremental models not working

  • Solution: Check flow plugin's get_materialization_config implementation