Skip to main content

Credential Store Development Guide

A guide for developers on implementing custom credential stores and integrating with the credential management system.

Overview

The credential store system provides a plugin-based architecture for storing and retrieving credentials from various backends. This guide covers:

  • Implementing custom credential stores
  • Integrating credential stores with existing components
  • Best practices for credential storage and encryption

Architecture

Core Components

CredentialStore (Abstract)
├── DatabaseCredentialStore
├── LocalKeystoreCredentialStore
└── AWSSSMCredentialStore

Key Interfaces:

  • CredentialStore: Abstract base class
  • CredentialDefinition: Credential metadata
  • CredentialStoreFactory: Factory for creating stores
  • CredentialStoreRegistry: Registry for store types

Implementing a Custom Credential Store

Step 1: Create Store Class

Create a class inheriting from CredentialStore:

from qarion_etl.credentials.base import (
CredentialStore,
CredentialDefinition,
CredentialStoreError
)
from typing import Dict, Any, Optional

class MyCustomCredentialStore(CredentialStore):
"""Custom credential store implementation."""

def __init__(self, config: Dict[str, Any]):
"""
Initialize credential store.

Args:
config: Configuration dictionary
"""
self.config = config
# Initialize your store here

@property
def store_type(self) -> str:
"""Return store type identifier."""
return 'my_custom_store'

def store_credential(
self,
credential_id: str,
credential_data: Dict[str, Any],
credential_definition: Optional[CredentialDefinition] = None
) -> bool:
"""Store a credential."""
# Implementation
pass

def get_credential(self, credential_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve a credential."""
# Implementation
pass

def delete_credential(self, credential_id: str) -> bool:
"""Delete a credential."""
# Implementation
pass

def list_credentials(self) -> list[str]:
"""List all credential IDs."""
# Implementation
pass

def credential_exists(self, credential_id: str) -> bool:
"""Check if a credential exists."""
# Implementation
pass

Step 2: Register Store

Register your store with the registry:

from qarion_etl.credentials.registry import register_credential_store

register_credential_store('my_custom_store', MyCustomCredentialStore)

Step 3: Configuration Schema

Add configuration schema to CONFIG_SCHEMA in configuration.py:

# Add to credential store enum
CREDENTIAL_STORE_SCHEMA = {
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": ["database", "local_keystore", "aws_ssm", "my_custom_store"],
# ...
}
}
}

Example: HashiCorp Vault Store

Here's a complete example implementing a HashiCorp Vault credential store:

"""
HashiCorp Vault credential store implementation.
"""
import logging
from typing import Dict, Any, Optional

from qarion_etl.credentials.base import (
CredentialStore,
CredentialDefinition,
CredentialStoreError
)

logger = logging.getLogger(__name__)


class VaultCredentialStore(CredentialStore):
"""
Credential store using HashiCorp Vault.

Configuration:
- vault_url: Vault server URL
- vault_token: Vault authentication token
- mount_path: Secret mount path (default: 'secret')
"""

def __init__(self, config: Dict[str, Any]):
"""Initialize Vault credential store."""
self.config = config
self.vault_url = config.get('vault_url')
self.vault_token = config.get('vault_token')
self.mount_path = config.get('mount_path', 'secret')

if not self.vault_url:
raise CredentialStoreError(
"Vault credential store requires 'vault_url' in config"
)

try:
import hvac
self.client = hvac.Client(url=self.vault_url, token=self.vault_token)
except ImportError:
raise ImportError(
"hvac is required for Vault support. Install with: pip install hvac"
)

@property
def store_type(self) -> str:
"""Return store type identifier."""
return 'vault'

def _get_secret_path(self, credential_id: str) -> str:
"""Get Vault secret path for credential."""
return f"{self.mount_path}/data/{credential_id}"

def store_credential(
self,
credential_id: str,
credential_data: Dict[str, Any],
credential_definition: Optional[CredentialDefinition] = None
) -> bool:
"""Store credential in Vault."""
try:
secret_path = self._get_secret_path(credential_id)
self.client.secrets.kv.v2.create_or_update_secret(
path=secret_path,
secret=credential_data
)
return True
except Exception as e:
logger.error(f"Failed to store credential {credential_id}: {e}")
raise CredentialStoreError(f"Failed to store credential: {e}") from e

def get_credential(self, credential_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve credential from Vault."""
try:
secret_path = self._get_secret_path(credential_id)
response = self.client.secrets.kv.v2.read_secret_version(path=secret_path)
return response['data']['data']
except Exception as e:
logger.error(f"Failed to retrieve credential {credential_id}: {e}")
return None

def delete_credential(self, credential_id: str) -> bool:
"""Delete credential from Vault."""
try:
secret_path = self._get_secret_path(credential_id)
self.client.secrets.kv.v2.delete_metadata_and_all_versions(path=secret_path)
return True
except Exception as e:
logger.error(f"Failed to delete credential {credential_id}: {e}")
return False

def list_credentials(self) -> list[str]:
"""List all credentials in Vault."""
try:
response = self.client.secrets.kv.v2.list_secrets(path=f"{self.mount_path}/metadata")
if 'data' in response and 'keys' in response['data']:
return response['data']['keys']
return []
except Exception as e:
logger.error(f"Failed to list credentials: {e}")
return []

def credential_exists(self, credential_id: str) -> bool:
"""Check if credential exists in Vault."""
try:
secret_path = self._get_secret_path(credential_id)
self.client.secrets.kv.v2.read_secret_metadata(path=secret_path)
return True
except Exception:
return False

Integration Points

Configuration Resolution

Credential references are automatically resolved during configuration loading:

from qarion_etl.credentials.utils import resolve_credential_reference

config = {
"credentials": "${credential:my_aws_creds}",
"path": "s3://my-bucket/data/"
}

# Resolve credential references
resolved = resolve_credential_reference(config, credential_store)

Storage Backend Integration

Update storage backends to support credential stores:

class S3StorageBackend(StorageBackend):
def __init__(
self,
credentials: Optional[Dict[str, Any]] = None,
credential_store: Optional[CredentialStore] = None
):
# Resolve credential references
if credential_store and credentials:
from qarion_etl.credentials.utils import resolve_credential_reference
credentials = resolve_credential_reference(credentials, credential_store)

self.credentials = credentials or {}

Security Considerations

Encryption

  1. Always encrypt sensitive data

    • Use encryption libraries (cryptography, etc.)
    • Never store plaintext credentials
    • Use secure key management
  2. Key Management

    • Store encryption keys securely
    • Use environment variables or key management services
    • Rotate keys regularly
  3. Access Control

    • Implement proper access controls
    • Use IAM roles for cloud services
    • Restrict file permissions for local stores

Best Practices

  1. Error Handling

    • Never expose credential data in error messages
    • Log errors without sensitive information
    • Use specific exception types
  2. Validation

    • Validate credential data before storing
    • Check credential format and structure
    • Verify credential IDs are valid
  3. Audit Logging

    • Log credential access (without sensitive data)
    • Track credential creation and deletion
    • Monitor for suspicious activity

Testing

Unit Tests

import pytest
from qarion_etl.credentials.base import CredentialStoreError

def test_store_credential():
store = MyCustomCredentialStore(config)

# Store credential
result = store.store_credential(
credential_id="test_cred",
credential_data={"key": "value"}
)
assert result is True

# Retrieve credential
cred = store.get_credential("test_cred")
assert cred == {"key": "value"}

def test_credential_not_found():
store = MyCustomCredentialStore(config)
cred = store.get_credential("nonexistent")
assert cred is None

def test_delete_credential():
store = MyCustomCredentialStore(config)
store.store_credential("test_cred", {"key": "value"})

result = store.delete_credential("test_cred")
assert result is True

assert not store.credential_exists("test_cred")