Engine Plugin System
Overview
Qarion ETL uses a plugin-based architecture for engines, similar to the flow plugin system. This allows easy extension with new engine types without modifying core code.
Architecture
EnginePlugin Interface
All engines implement the EnginePlugin interface:
class EnginePlugin(ABC):
@property
def engine_type(self) -> str:
"""Engine type identifier (e.g., 'sqlite', 'pandas_memory')"""
@property
def engine_class(self) -> Type[BaseEngine]:
"""Engine class implementation"""
@property
def adapter_class(self) -> Type[BaseDialectAdapter]:
"""Dialect adapter class"""
def create_engine(self, config: Dict[str, Any]) -> BaseEngine:
"""Create engine instance from configuration"""
def create_adapter(self) -> BaseDialectAdapter:
"""Create adapter instance"""
Built-in Engines
SQLite Engine
- Type:
sqlite - Description: SQLite database engine for local file-based storage
- Configuration:
[engine]
name = "sqlite"
[engine.config]
path = "data/db.sqlite"
Pandas In-Memory Engine
- Type:
pandas_memory - Description: Pandas-based in-memory engine for testing and development
- Configuration:
[engine]
name = "pandas_memory"
[engine.config]
# No configuration required
Pandas Local Storage Engine
- Type:
pandas_local - Description: Pandas-based engine with local file persistence (Parquet)
- Configuration:
[engine]
name = "pandas_local"
[engine.config]
storage_dir = "data/pandas"
Creating a Custom Engine Plugin
Step 1: Create Engine Class
from engines.base import BaseEngine
class MyEngine(BaseEngine):
def connect(self):
# Establish connection
pass
def execute_query(self, query: str, params: tuple = None):
# Execute query
pass
def get_dataframe(self, statement: str, params: tuple = None):
# Get data
pass
Step 2: Create Dialect Adapter
from engines.dialects import BaseDialectAdapter
class MyDialectAdapter(BaseDialectAdapter):
def get_select_statement(self, ...):
# Convert to your engine's syntax
pass
# ... implement other methods
Step 3: Create Plugin Class
from engines.plugins.base import EnginePlugin
class MyEnginePlugin(EnginePlugin):
@property
def engine_type(self) -> str:
return "my_engine"
@property
def engine_class(self):
return MyEngine
@property
def adapter_class(self):
return MyDialectAdapter
def validate_config(self, config):
# Validate configuration
return True
def get_default_config(self):
return {'my_setting': 'default_value'}
Step 4: Register Plugin
from engines.plugins import register_engine_plugin
register_engine_plugin(MyEnginePlugin())
Usage
Loading an Engine
from engines import EngineLoader
config = {
'name': 'sqlite',
'config': {'path': 'data/db.sqlite'}
}
engine = EngineLoader.load(config)
engine.connect()
Getting an Adapter
from engines import EngineLoader
adapter = EngineLoader.get_adapter('sqlite')
select_stmt = adapter.get_select_statement('users', ['id', 'name'])
Listing Available Engines
from engines.plugins import list_engine_types
engines = list_engine_types()
# Returns: ['sqlite', 'pandas_memory', 'pandas_local']
Benefits
- Extensibility: Easy to add new engines without modifying core code
- Consistency: All engines follow the same interface
- Configuration: Engine-specific configuration validation
- Discovery: Can list available engines programmatically
Migration from Old System
The old ENGINE_MAP and DIALECT_MAP dictionaries have been replaced with the plugin system. The EngineLoader API remains the same for backward compatibility.