Skip to main content

Code Generator System

Overview

The code generator system provides an extensible architecture for generating executable code from flow transformations in various formats (SQL, DBT, Airflow, etc.).

Architecture

Base Classes

BaseCodeGenerator

Abstract base class for all code generators. Defines the interface for generating code:

class BaseCodeGenerator(ABC):
@property
@abstractmethod
def format_name(self) -> str:
"""Returns format identifier (e.g., 'sql', 'dbt', 'airflow')"""
pass

@abstractmethod
def generate_for_node(self, ...) -> CodeGenerationResult:
"""Generate code for a single DAG node"""
pass

@abstractmethod
def generate_for_flow(self, ...) -> FlowCodeGenerationResult:
"""Generate code for a complete flow"""
pass

Built-in Generators

  1. SQLCodeGenerator (format_name: "sql")

    • Generates SQL files and Python scripts
    • Fully implemented
    • Wraps existing code_generation.py functionality
  2. DBTCodeGenerator (format_name: "dbt")

    • Generates DBT models and configurations
    • Placeholder implementation (TODO)
  3. AirflowCodeGenerator (format_name: "airflow")

    • Generates Airflow DAGs and tasks
    • Placeholder implementation (TODO)

Usage

Using Code Generators

from code_generators import get_code_generator, list_code_generators

# List available generators
formats = list_code_generators() # ['sql', 'dbt', 'airflow']

# Get a generator
generator = get_code_generator('sql')

# Generate code for a flow
result = generator.generate_for_flow(
flow_definition=flow_def,
datasets=datasets,
dag=dag,
transformation_service=transformation_service,
output_dir='generated_code',
batch_id=1,
previous_batch_id=None,
overwrite=False
)

# Check results
print(f"Generated {result.total_files} files")
print(f"Successful nodes: {result.successful_nodes}/{result.total_nodes}")

Generator Registry

from code_generators import (
register_code_generator,
get_code_generator,
list_code_generators
)

# Register a custom generator
register_code_generator(MyCustomGenerator())

# Get a generator
generator = get_code_generator('my_format')

# List all formats
formats = list_code_generators()

Adding a New Code Generator

1. Create Generator Class

from code_generators.base import BaseCodeGenerator, CodeGenerationResult, FlowCodeGenerationResult

class MyCodeGenerator(BaseCodeGenerator):
@property
def format_name(self) -> str:
return "my_format"

@property
def file_extension(self) -> str:
return ".myext"

@property
def description(self) -> str:
return "Generates code in my custom format"

def generate_for_node(self, ...) -> CodeGenerationResult:
# Implement node-level code generation
pass

def generate_for_flow(self, ...) -> FlowCodeGenerationResult:
# Implement flow-level code generation
pass

2. Register Generator

from code_generators import register_code_generator

register_code_generator(MyCodeGenerator())

Or add to code_generators/_init_generators.py:

from .my_generator import MyCodeGenerator
from .registry import register_code_generator

register_code_generator(MyCodeGenerator())

Generator Results

CodeGenerationResult

Result for a single node:

@dataclass
class CodeGenerationResult:
node_id: str
files: List[str] # Generated file paths
errors: List[str] # Any errors encountered
metadata: Dict[str, Any] # Additional metadata

@property
def success(self) -> bool:
return len(self.errors) == 0

FlowCodeGenerationResult

Result for a complete flow:

@dataclass
class FlowCodeGenerationResult:
flow_id: str
total_nodes: int
successful_nodes: int
failed_nodes: int
total_files: int
results: List[CodeGenerationResult] # Per-node results
metadata: Dict[str, Any]

@property
def success(self) -> bool:
return self.failed_nodes == 0

Implementation Status

DBT Generator ✅ Implemented

The DBT generator:

  1. Generates DBT Models

    • Creates .sql files for each transformation
    • Uses DBT's ref() function for dependencies
    • Includes proper DBT model structure with {{ config() }} macros
  2. Generates Schema Files

    • Creates schema.yml files with column definitions
    • Includes column metadata and data types
  3. Generates Project Configuration

    • Creates dbt_project.yml with project settings
    • Configures model paths and materialization

Airflow Generator ✅ Implemented

The Airflow generator:

  1. Generates DAG Files

    • Creates Python files with Airflow DAG definitions
    • Includes proper imports and configuration
    • Sets up default arguments and scheduling
  2. Generates Tasks

    • Creates task definitions for each transformation
    • Uses PythonOperator with transformation execution function
    • Includes proper parameter handling
  3. Generates Dependencies

    • Sets up task dependencies based on DAG structure
    • Handles transformation step dependencies correctly

Migration from code_generation.py

The existing code_generation.py functions are still available for backward compatibility. The new generator system wraps these functions:

  • generate_code_for_node()SQLCodeGenerator.generate_for_node()
  • generate_code_for_flow()SQLCodeGenerator.generate_for_flow()

Future refactoring can move the implementation directly into the generator.

Benefits

  1. Extensibility: Easy to add new output formats
  2. Consistency: All generators follow the same interface
  3. Testability: Generators can be tested independently
  4. Flexibility: Can generate multiple formats from the same flow
  5. Maintainability: Clear separation of concerns