Skip to main content

Plugin System Overview

Qarion ETL uses a plugin-based architecture that makes it easy to extend functionality without modifying core code.

Plugin Types

Qarion ETL supports several plugin types:

  1. Flow Plugins - Define flow types and their behavior
  2. Task Type Plugins - Define task types and their properties (Ingestion, Transformation, Quality Check, Export)
  3. Node Type Plugins - Define node types for execution DAGs
  4. Engine Plugins - Provide execution engines
  5. Code Generator Plugins - Generate code in various formats
  6. Repository Plugins - Provide storage backends
  7. File Loader Plugins - Load different file formats
  8. File Exporter Plugins - Export data to different file formats
  9. Credential Store Plugins - Provide credential storage backends (database, local keystore, AWS SSM Parameter Store)

Plugin Discovery

Qarion ETL uses a PluginLoader system for automatic plugin discovery and registration. Plugins are automatically discovered and registered when:

  • They are in the qarion_etl/ package structure
  • They implement the appropriate plugin interface
  • They are registered in the plugin registry

Plugin Loader

The PluginLoader class provides automatic plugin discovery from:

  1. Directory Scanning: Scans directories for Python files containing plugin classes
  2. Entry Points: Discovers plugins from setuptools entry points
  3. External Paths: Loads plugins from external directories specified in configuration

Features:

  • Automatic discovery of plugin classes that inherit from base plugin classes
  • Support for external plugin directories via plugin_dir and plugin_paths in configuration
  • Entry point support for package-based plugin distribution
  • File exclusion (e.g., base.py, __init__.py) to avoid loading base classes

Usage:

from plugins.loader import PluginLoader

loader = PluginLoader(
plugin_base_class=FlowPlugin,
register_function=register_flow_plugin,
plugin_directory="/path/to/plugins",
entry_point_group="qarion-etl.flows",
plugin_type="flow"
)

# Load and register all plugins
plugins = loader.load_and_register_plugins()

For more details, see Plugin Loader.

Plugin Interfaces

Flow Plugin

from flows.base import FlowPlugin

class MyFlowPlugin(FlowPlugin):
@property
def flow_type(self) -> str:
return "my_flow_type"

def generate_datasets(self, flow_definition: Dict[str, Any]) -> List[Dict[str, Any]]:
# Generate dataset definitions
pass

def generate_dag(self, flow_definition: Dict[str, Any], datasets: List[Dict[str, Any]]) -> FlowDAG:
# Generate execution DAG
pass

Code Generator Plugin

from code_generators.plugins.base import CodeGeneratorPlugin

class MyCodeGeneratorPlugin(CodeGeneratorPlugin):
@property
def format_name(self) -> str:
return "my_format"

@property
def generator_class(self) -> Type[BaseCodeGenerator]:
return MyCodeGenerator

Built-in Plugins

Flow Plugins

  • change_feed - Change detection pattern
  • delta_publishing - Transaction pattern
  • export_flow - Data export with multiple modes (full, batch, incremental, changes_only)
  • sessionization - Time-window grouping pattern
  • growth_accounting - User growth analysis pattern
  • standard - Task-based flexible flow

Task Type Plugins

  • ingestion - Load data from external sources
  • transformation - Transform data between datasets
  • dq_check - Run data quality checks
  • export - Export data to external destinations

Engine Plugins

  • sqlite - SQLite database engine
  • pandas - Pandas in-memory engine
  • duckdb - DuckDB database engine

Code Generator Plugins

  • sql - SQL file generation
  • dbt - DBT project generation
  • airflow - Airflow DAG generation

File Exporter Plugins

  • csv - CSV file export
  • json - JSON file export
  • parquet - Parquet file export

Credential Store Plugins

  • database - Database-backed credential store
  • local_keystore - Local encrypted keystore file
  • aws_ssm - AWS SSM Parameter Store credential store

Creating Custom Plugins

See the specific plugin documentation:

Plugin Registration

Plugins are registered automatically when imported. For custom plugins:

from flows import register_flow_plugin
from code_generators.plugins.registry import register_code_generator_plugin

# Register flow plugin
register_flow_plugin(MyFlowPlugin())

# Register code generator plugin
register_code_generator_plugin(MyCodeGeneratorPlugin())