Code Generator System
Overview
The code generator system provides an extensible architecture for generating executable code from flow transformations in various formats (SQL, DBT, Airflow, etc.).
Architecture
Base Classes
BaseCodeGenerator
Abstract base class for all code generators. Defines the interface for generating code:
class BaseCodeGenerator(ABC):
@property
@abstractmethod
def format_name(self) -> str:
"""Returns format identifier (e.g., 'sql', 'dbt', 'airflow')"""
pass
@abstractmethod
def generate_for_node(self, ...) -> CodeGenerationResult:
"""Generate code for a single DAG node"""
pass
@abstractmethod
def generate_for_flow(self, ...) -> FlowCodeGenerationResult:
"""Generate code for a complete flow"""
pass
Built-in Generators
-
SQLCodeGenerator (
format_name: "sql")- Generates SQL files and Python scripts
- Fully implemented
- Wraps existing
code_generation.pyfunctionality
-
DBTCodeGenerator (
format_name: "dbt")- Generates DBT models and configurations
- Placeholder implementation (TODO)
-
AirflowCodeGenerator (
format_name: "airflow")- Generates Airflow DAGs and tasks
- Placeholder implementation (TODO)
Usage
Using Code Generators
from code_generators import get_code_generator, list_code_generators
# List available generators
formats = list_code_generators() # ['sql', 'dbt', 'airflow']
# Get a generator
generator = get_code_generator('sql')
# Generate code for a flow
result = generator.generate_for_flow(
flow_definition=flow_def,
datasets=datasets,
dag=dag,
transformation_service=transformation_service,
output_dir='generated_code',
batch_id=1,
previous_batch_id=None,
overwrite=False
)
# Check results
print(f"Generated {result.total_files} files")
print(f"Successful nodes: {result.successful_nodes}/{result.total_nodes}")
Generator Registry
from code_generators import (
register_code_generator,
get_code_generator,
list_code_generators
)
# Register a custom generator
register_code_generator(MyCustomGenerator())
# Get a generator
generator = get_code_generator('my_format')
# List all formats
formats = list_code_generators()
Adding a New Code Generator
1. Create Generator Class
from code_generators.base import BaseCodeGenerator, CodeGenerationResult, FlowCodeGenerationResult
class MyCodeGenerator(BaseCodeGenerator):
@property
def format_name(self) -> str:
return "my_format"
@property
def file_extension(self) -> str:
return ".myext"
@property
def description(self) -> str:
return "Generates code in my custom format"
def generate_for_node(self, ...) -> CodeGenerationResult:
# Implement node-level code generation
pass
def generate_for_flow(self, ...) -> FlowCodeGenerationResult:
# Implement flow-level code generation
pass
2. Register Generator
from code_generators import register_code_generator
register_code_generator(MyCodeGenerator())
Or add to code_generators/_init_generators.py:
from .my_generator import MyCodeGenerator
from .registry import register_code_generator
register_code_generator(MyCodeGenerator())
Generator Results
CodeGenerationResult
Result for a single node:
@dataclass
class CodeGenerationResult:
node_id: str
files: List[str] # Generated file paths
errors: List[str] # Any errors encountered
metadata: Dict[str, Any] # Additional metadata
@property
def success(self) -> bool:
return len(self.errors) == 0
FlowCodeGenerationResult
Result for a complete flow:
@dataclass
class FlowCodeGenerationResult:
flow_id: str
total_nodes: int
successful_nodes: int
failed_nodes: int
total_files: int
results: List[CodeGenerationResult] # Per-node results
metadata: Dict[str, Any]
@property
def success(self) -> bool:
return self.failed_nodes == 0
Implementation Status
DBT Generator ✅ Implemented
The DBT generator:
-
Generates DBT Models ✅
- Creates
.sqlfiles for each transformation - Uses DBT's
ref()function for dependencies - Includes proper DBT model structure with
{{ config() }}macros
- Creates
-
Generates Schema Files ✅
- Creates
schema.ymlfiles with column definitions - Includes column metadata and data types
- Creates
-
Generates Project Configuration ✅
- Creates
dbt_project.ymlwith project settings - Configures model paths and materialization
- Creates
Airflow Generator ✅ Implemented
The Airflow generator:
-
Generates DAG Files ✅
- Creates Python files with Airflow DAG definitions
- Includes proper imports and configuration
- Sets up default arguments and scheduling
-
Generates Tasks ✅
- Creates task definitions for each transformation
- Uses PythonOperator with transformation execution function
- Includes proper parameter handling
-
Generates Dependencies ✅
- Sets up task dependencies based on DAG structure
- Handles transformation step dependencies correctly
Migration from code_generation.py
The existing code_generation.py functions are still available for backward compatibility. The new generator system wraps these functions:
generate_code_for_node()→SQLCodeGenerator.generate_for_node()generate_code_for_flow()→SQLCodeGenerator.generate_for_flow()
Future refactoring can move the implementation directly into the generator.
Benefits
- Extensibility: Easy to add new output formats
- Consistency: All generators follow the same interface
- Testability: Generators can be tested independently
- Flexibility: Can generate multiple formats from the same flow
- Maintainability: Clear separation of concerns