Skip to main content

Engine Plugin System

Overview

Qarion ETL uses a plugin-based architecture for engines, similar to the flow plugin system. This allows easy extension with new engine types without modifying core code.

Architecture

EnginePlugin Interface

All engines implement the EnginePlugin interface:

class EnginePlugin(ABC):
@property
def engine_type(self) -> str:
"""Engine type identifier (e.g., 'sqlite', 'pandas_memory')"""

@property
def engine_class(self) -> Type[BaseEngine]:
"""Engine class implementation"""

@property
def adapter_class(self) -> Type[BaseDialectAdapter]:
"""Dialect adapter class"""

def create_engine(self, config: Dict[str, Any]) -> BaseEngine:
"""Create engine instance from configuration"""

def create_adapter(self) -> BaseDialectAdapter:
"""Create adapter instance"""

Built-in Engines

SQLite Engine

  • Type: sqlite
  • Description: SQLite database engine for local file-based storage
  • Configuration:
    [engine]
    name = "sqlite"
    [engine.config]
    path = "data/db.sqlite"

Pandas In-Memory Engine

  • Type: pandas_memory
  • Description: Pandas-based in-memory engine for testing and development
  • Configuration:
    [engine]
    name = "pandas_memory"
    [engine.config]
    # No configuration required

Pandas Local Storage Engine

  • Type: pandas_local
  • Description: Pandas-based engine with local file persistence (Parquet)
  • Configuration:
    [engine]
    name = "pandas_local"
    [engine.config]
    storage_dir = "data/pandas"

Creating a Custom Engine Plugin

Step 1: Create Engine Class

from engines.base import BaseEngine

class MyEngine(BaseEngine):
def connect(self):
# Establish connection
pass

def execute_query(self, query: str, params: tuple = None):
# Execute query
pass

def get_dataframe(self, statement: str, params: tuple = None):
# Get data
pass

Step 2: Create Dialect Adapter

from engines.dialects import BaseDialectAdapter

class MyDialectAdapter(BaseDialectAdapter):
def get_select_statement(self, ...):
# Convert to your engine's syntax
pass
# ... implement other methods

Step 3: Create Plugin Class

from engines.plugins.base import EnginePlugin

class MyEnginePlugin(EnginePlugin):
@property
def engine_type(self) -> str:
return "my_engine"

@property
def engine_class(self):
return MyEngine

@property
def adapter_class(self):
return MyDialectAdapter

def validate_config(self, config):
# Validate configuration
return True

def get_default_config(self):
return {'my_setting': 'default_value'}

Step 4: Register Plugin

from engines.plugins import register_engine_plugin

register_engine_plugin(MyEnginePlugin())

Usage

Loading an Engine

from engines import EngineLoader

config = {
'name': 'sqlite',
'config': {'path': 'data/db.sqlite'}
}

engine = EngineLoader.load(config)
engine.connect()

Getting an Adapter

from engines import EngineLoader

adapter = EngineLoader.get_adapter('sqlite')
select_stmt = adapter.get_select_statement('users', ['id', 'name'])

Listing Available Engines

from engines.plugins import list_engine_types

engines = list_engine_types()
# Returns: ['sqlite', 'pandas_memory', 'pandas_local']

Benefits

  1. Extensibility: Easy to add new engines without modifying core code
  2. Consistency: All engines follow the same interface
  3. Configuration: Engine-specific configuration validation
  4. Discovery: Can list available engines programmatically

Migration from Old System

The old ENGINE_MAP and DIALECT_MAP dictionaries have been replaced with the plugin system. The EngineLoader API remains the same for backward compatibility.