Credential Store Development Guide
A guide for developers on implementing custom credential stores and integrating with the credential management system.
Overview
The credential store system provides a plugin-based architecture for storing and retrieving credentials from various backends. This guide covers:
- Implementing custom credential stores
- Integrating credential stores with existing components
- Best practices for credential storage and encryption
Architecture
Core Components
CredentialStore (Abstract)
├── DatabaseCredentialStore
├── LocalKeystoreCredentialStore
└── AWSSSMCredentialStore
Key Interfaces:
CredentialStore: Abstract base classCredentialDefinition: Credential metadataCredentialStoreFactory: Factory for creating storesCredentialStoreRegistry: Registry for store types
Implementing a Custom Credential Store
Step 1: Create Store Class
Create a class inheriting from CredentialStore:
from qarion_etl.credentials.base import (
CredentialStore,
CredentialDefinition,
CredentialStoreError
)
from typing import Dict, Any, Optional
class MyCustomCredentialStore(CredentialStore):
"""Custom credential store implementation."""
def __init__(self, config: Dict[str, Any]):
"""
Initialize credential store.
Args:
config: Configuration dictionary
"""
self.config = config
# Initialize your store here
@property
def store_type(self) -> str:
"""Return store type identifier."""
return 'my_custom_store'
def store_credential(
self,
credential_id: str,
credential_data: Dict[str, Any],
credential_definition: Optional[CredentialDefinition] = None
) -> bool:
"""Store a credential."""
# Implementation
pass
def get_credential(self, credential_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve a credential."""
# Implementation
pass
def delete_credential(self, credential_id: str) -> bool:
"""Delete a credential."""
# Implementation
pass
def list_credentials(self) -> list[str]:
"""List all credential IDs."""
# Implementation
pass
def credential_exists(self, credential_id: str) -> bool:
"""Check if a credential exists."""
# Implementation
pass
Step 2: Register Store
Register your store with the registry:
from qarion_etl.credentials.registry import register_credential_store
register_credential_store('my_custom_store', MyCustomCredentialStore)
Step 3: Configuration Schema
Add configuration schema to CONFIG_SCHEMA in configuration.py:
# Add to credential store enum
CREDENTIAL_STORE_SCHEMA = {
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": ["database", "local_keystore", "aws_ssm", "my_custom_store"],
# ...
}
}
}
Example: HashiCorp Vault Store
Here's a complete example implementing a HashiCorp Vault credential store:
"""
HashiCorp Vault credential store implementation.
"""
import logging
from typing import Dict, Any, Optional
from qarion_etl.credentials.base import (
CredentialStore,
CredentialDefinition,
CredentialStoreError
)
logger = logging.getLogger(__name__)
class VaultCredentialStore(CredentialStore):
"""
Credential store using HashiCorp Vault.
Configuration:
- vault_url: Vault server URL
- vault_token: Vault authentication token
- mount_path: Secret mount path (default: 'secret')
"""
def __init__(self, config: Dict[str, Any]):
"""Initialize Vault credential store."""
self.config = config
self.vault_url = config.get('vault_url')
self.vault_token = config.get('vault_token')
self.mount_path = config.get('mount_path', 'secret')
if not self.vault_url:
raise CredentialStoreError(
"Vault credential store requires 'vault_url' in config"
)
try:
import hvac
self.client = hvac.Client(url=self.vault_url, token=self.vault_token)
except ImportError:
raise ImportError(
"hvac is required for Vault support. Install with: pip install hvac"
)
@property
def store_type(self) -> str:
"""Return store type identifier."""
return 'vault'
def _get_secret_path(self, credential_id: str) -> str:
"""Get Vault secret path for credential."""
return f"{self.mount_path}/data/{credential_id}"
def store_credential(
self,
credential_id: str,
credential_data: Dict[str, Any],
credential_definition: Optional[CredentialDefinition] = None
) -> bool:
"""Store credential in Vault."""
try:
secret_path = self._get_secret_path(credential_id)
self.client.secrets.kv.v2.create_or_update_secret(
path=secret_path,
secret=credential_data
)
return True
except Exception as e:
logger.error(f"Failed to store credential {credential_id}: {e}")
raise CredentialStoreError(f"Failed to store credential: {e}") from e
def get_credential(self, credential_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve credential from Vault."""
try:
secret_path = self._get_secret_path(credential_id)
response = self.client.secrets.kv.v2.read_secret_version(path=secret_path)
return response['data']['data']
except Exception as e:
logger.error(f"Failed to retrieve credential {credential_id}: {e}")
return None
def delete_credential(self, credential_id: str) -> bool:
"""Delete credential from Vault."""
try:
secret_path = self._get_secret_path(credential_id)
self.client.secrets.kv.v2.delete_metadata_and_all_versions(path=secret_path)
return True
except Exception as e:
logger.error(f"Failed to delete credential {credential_id}: {e}")
return False
def list_credentials(self) -> list[str]:
"""List all credentials in Vault."""
try:
response = self.client.secrets.kv.v2.list_secrets(path=f"{self.mount_path}/metadata")
if 'data' in response and 'keys' in response['data']:
return response['data']['keys']
return []
except Exception as e:
logger.error(f"Failed to list credentials: {e}")
return []
def credential_exists(self, credential_id: str) -> bool:
"""Check if credential exists in Vault."""
try:
secret_path = self._get_secret_path(credential_id)
self.client.secrets.kv.v2.read_secret_metadata(path=secret_path)
return True
except Exception:
return False
Integration Points
Configuration Resolution
Credential references are automatically resolved during configuration loading:
from qarion_etl.credentials.utils import resolve_credential_reference
config = {
"credentials": "${credential:my_aws_creds}",
"path": "s3://my-bucket/data/"
}
# Resolve credential references
resolved = resolve_credential_reference(config, credential_store)
Storage Backend Integration
Update storage backends to support credential stores:
class S3StorageBackend(StorageBackend):
def __init__(
self,
credentials: Optional[Dict[str, Any]] = None,
credential_store: Optional[CredentialStore] = None
):
# Resolve credential references
if credential_store and credentials:
from qarion_etl.credentials.utils import resolve_credential_reference
credentials = resolve_credential_reference(credentials, credential_store)
self.credentials = credentials or {}
Security Considerations
Encryption
-
Always encrypt sensitive data
- Use encryption libraries (cryptography, etc.)
- Never store plaintext credentials
- Use secure key management
-
Key Management
- Store encryption keys securely
- Use environment variables or key management services
- Rotate keys regularly
-
Access Control
- Implement proper access controls
- Use IAM roles for cloud services
- Restrict file permissions for local stores
Best Practices
-
Error Handling
- Never expose credential data in error messages
- Log errors without sensitive information
- Use specific exception types
-
Validation
- Validate credential data before storing
- Check credential format and structure
- Verify credential IDs are valid
-
Audit Logging
- Log credential access (without sensitive data)
- Track credential creation and deletion
- Monitor for suspicious activity
Testing
Unit Tests
import pytest
from qarion_etl.credentials.base import CredentialStoreError
def test_store_credential():
store = MyCustomCredentialStore(config)
# Store credential
result = store.store_credential(
credential_id="test_cred",
credential_data={"key": "value"}
)
assert result is True
# Retrieve credential
cred = store.get_credential("test_cred")
assert cred == {"key": "value"}
def test_credential_not_found():
store = MyCustomCredentialStore(config)
cred = store.get_credential("nonexistent")
assert cred is None
def test_delete_credential():
store = MyCustomCredentialStore(config)
store.store_credential("test_cred", {"key": "value"})
result = store.delete_credential("test_cred")
assert result is True
assert not store.credential_exists("test_cred")
Related Documentation
- Credential Store Architecture - Complete technical documentation
- Credential Management Guide - User guide
- Plugin Interfaces - Plugin interface reference