Plugin System Overview
Qarion ETL uses a plugin-based architecture that makes it easy to extend functionality without modifying core code.
Plugin Types
Qarion ETL supports several plugin types:
- Flow Plugins - Define flow types and their behavior
- Task Type Plugins - Define task types and their properties (Ingestion, Transformation, Quality Check, Export)
- Node Type Plugins - Define node types for execution DAGs
- Engine Plugins - Provide execution engines
- Code Generator Plugins - Generate code in various formats
- Repository Plugins - Provide storage backends
- File Loader Plugins - Load different file formats
- File Exporter Plugins - Export data to different file formats
- Credential Store Plugins - Provide credential storage backends (database, local keystore, AWS SSM Parameter Store)
Plugin Discovery
Qarion ETL uses a PluginLoader system for automatic plugin discovery and registration. Plugins are automatically discovered and registered when:
- They are in the
qarion_etl/package structure - They implement the appropriate plugin interface
- They are registered in the plugin registry
Plugin Loader
The PluginLoader class provides automatic plugin discovery from:
- Directory Scanning: Scans directories for Python files containing plugin classes
- Entry Points: Discovers plugins from setuptools entry points
- External Paths: Loads plugins from external directories specified in configuration
Features:
- Automatic discovery of plugin classes that inherit from base plugin classes
- Support for external plugin directories via
plugin_dirandplugin_pathsin configuration - Entry point support for package-based plugin distribution
- File exclusion (e.g.,
base.py,__init__.py) to avoid loading base classes
Usage:
from plugins.loader import PluginLoader
loader = PluginLoader(
plugin_base_class=FlowPlugin,
register_function=register_flow_plugin,
plugin_directory="/path/to/plugins",
entry_point_group="qarion-etl.flows",
plugin_type="flow"
)
# Load and register all plugins
plugins = loader.load_and_register_plugins()
For more details, see Plugin Loader.
Plugin Interfaces
Flow Plugin
from flows.base import FlowPlugin
class MyFlowPlugin(FlowPlugin):
@property
def flow_type(self) -> str:
return "my_flow_type"
def generate_datasets(self, flow_definition: Dict[str, Any]) -> List[Dict[str, Any]]:
# Generate dataset definitions
pass
def generate_dag(self, flow_definition: Dict[str, Any], datasets: List[Dict[str, Any]]) -> FlowDAG:
# Generate execution DAG
pass
Code Generator Plugin
from code_generators.plugins.base import CodeGeneratorPlugin
class MyCodeGeneratorPlugin(CodeGeneratorPlugin):
@property
def format_name(self) -> str:
return "my_format"
@property
def generator_class(self) -> Type[BaseCodeGenerator]:
return MyCodeGenerator
Built-in Plugins
Flow Plugins
change_feed- Change detection patterndelta_publishing- Transaction patternexport_flow- Data export with multiple modes (full, batch, incremental, changes_only)sessionization- Time-window grouping patterngrowth_accounting- User growth analysis patternstandard- Task-based flexible flow
Task Type Plugins
ingestion- Load data from external sourcestransformation- Transform data between datasetsdq_check- Run data quality checksexport- Export data to external destinations
Engine Plugins
sqlite- SQLite database enginepandas- Pandas in-memory engineduckdb- DuckDB database engine
Code Generator Plugins
sql- SQL file generationdbt- DBT project generationairflow- Airflow DAG generation
File Exporter Plugins
csv- CSV file exportjson- JSON file exportparquet- Parquet file export
Credential Store Plugins
database- Database-backed credential storelocal_keystore- Local encrypted keystore fileaws_ssm- AWS SSM Parameter Store credential store
Creating Custom Plugins
See the specific plugin documentation:
- Flow Plugins
- Task Type Plugins - Task type plugin development
- Engine Plugins
- Code Generator Plugins
- Repository Plugins
- Export System - File export system and plugins
- Credential Store Architecture - Credential store system and plugins
Plugin Registration
Plugins are registered automatically when imported. For custom plugins:
from flows import register_flow_plugin
from code_generators.plugins.registry import register_code_generator_plugin
# Register flow plugin
register_flow_plugin(MyFlowPlugin())
# Register code generator plugin
register_code_generator_plugin(MyCodeGeneratorPlugin())
Related Documentation
- Flow Plugins
- Task Type Plugins - Developing task type plugins
- Engine Plugins
- Code Generator Plugins
- Repository Plugins
- Task System Guide - User guide for tasks
- Architecture Overview