Source code for kerb.config.manager

"""Core configuration manager.

This module contains the main ConfigManager class for centralized configuration management.
"""

import json
import os
import warnings
from copy import deepcopy
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional

from .enums import ProviderType
from .types import AppConfig, ModelConfig, ProviderConfig


[docs] class ConfigManager: """Centralized configuration manager for LLM applications. Manages model configs, provider settings, and API keys in a unified way. Focuses on LLM-specific configuration without general application settings. """
[docs] def __init__( self, app_name: str = "llm_app", config_file: Optional[str] = None, auto_load_env: bool = True, encryption_key: Optional[str] = None, encryption_salt: Optional[bytes] = None, ): """Initialize configuration manager. Args: app_name: Application name config_file: Path to configuration file auto_load_env: Automatically load from environment variables encryption_key: Optional encryption key for secrets (auto-generated if None) encryption_salt: Optional salt for key derivation (auto-generated if None) """ self.app_name = app_name self._config: AppConfig = AppConfig(app_name=app_name) self._config_history: List[AppConfig] = [] self._change_listeners: List[Callable[[AppConfig], None]] = [] # Initialize secure secrets storage self._secrets: Dict[str, bytes] = {} self._init_encryption(encryption_key, encryption_salt) if config_file: self.load_from_file(config_file) if auto_load_env: self.load_from_environment()
def _init_encryption( self, key: Optional[str] = None, salt: Optional[bytes] = None ) -> None: """Initialize encryption for secrets storage. Args: key: Optional encryption key (auto-generated if None) salt: Optional salt for key derivation (auto-generated if None) """ try: import base64 import secrets from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC if key: # Generate or use provided salt if salt is None: # Generate a cryptographically secure random salt salt = secrets.token_bytes(16) # Store salt for potential serialization/persistence needs self._encryption_salt = salt # Derive a proper key from the provided key kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, ) derived_key = base64.urlsafe_b64encode(kdf.derive(key.encode())) self._fernet = Fernet(derived_key) else: # Generate a new key for this session self._fernet = Fernet(Fernet.generate_key()) self._encryption_salt = None self._encryption_available = True except ImportError: # Fallback to no encryption if cryptography not available self._fernet = None self._encryption_available = False self._encryption_salt = None
[docs] def get_config(self) -> AppConfig: """Get current application configuration.""" return deepcopy(self._config)
[docs] def set_config(self, config: AppConfig) -> None: """Set application configuration and notify listeners.""" self._config_history.append(deepcopy(self._config)) self._config = config self._notify_listeners()
[docs] def add_change_listener(self, listener: Callable[[AppConfig], None]) -> None: """Add a listener for configuration changes.""" self._change_listeners.append(listener)
def _notify_listeners(self) -> None: """Notify all listeners of configuration changes.""" for listener in self._change_listeners: try: listener(self.get_config()) except Exception: pass # Don't let listener errors break the manager def _save_history(self) -> None: """Save current configuration to history before making changes.""" self._config_history.append(deepcopy(self._config)) # ======================================================================== # Model Configuration # ========================================================================
[docs] def add_model(self, config: ModelConfig) -> None: """Add or update a model configuration.""" self._save_history() self._config.models[config.name] = config self._notify_listeners()
[docs] def get_model(self, name: str) -> Optional[ModelConfig]: """Get model configuration by name.""" return self._config.models.get(name)
[docs] def remove_model(self, name: str) -> bool: """Remove a model configuration.""" if name in self._config.models: del self._config.models[name] self._notify_listeners() return True return False
[docs] def list_models(self, provider: Optional[ProviderType] = None) -> List[str]: """List all configured models, optionally filtered by provider.""" if provider: return [ name for name, config in self._config.models.items() if config.provider == provider ] return list(self._config.models.keys())
[docs] def set_default_model(self, name: str) -> None: """Set the default model.""" if name not in self._config.models: raise ValueError(f"Model '{name}' not found in configuration") self._config.default_model = name self._notify_listeners()
[docs] def get_default_model(self) -> Optional[ModelConfig]: """Get the default model configuration.""" if self._config.default_model: return self._config.models.get(self._config.default_model) return None
# ======================================================================== # Provider Configuration # ========================================================================
[docs] def add_provider(self, config: ProviderConfig) -> None: """Add or update a provider configuration.""" self._config.providers[config.provider] = config self._notify_listeners()
[docs] def get_provider(self, provider: ProviderType) -> Optional[ProviderConfig]: """Get provider configuration.""" return self._config.providers.get(provider)
[docs] def remove_provider(self, provider: ProviderType) -> bool: """Remove a provider configuration.""" if provider in self._config.providers: del self._config.providers[provider] self._notify_listeners() return True return False
[docs] def list_providers(self) -> List[ProviderType]: """List all configured providers.""" return list(self._config.providers.keys())
[docs] def switch_provider( self, from_provider: ProviderType, to_provider: ProviderType, model_mapping: Optional[Dict[str, str]] = None, ) -> None: """Switch from one provider to another. Args: from_provider: Current provider to_provider: Target provider model_mapping: Optional mapping of old model names to new ones """ if to_provider not in self._config.providers: raise ValueError(f"Target provider '{to_provider}' not configured") # Update models that use the old provider models_to_update = [] for name, config in list(self._config.models.items()): if config.provider == from_provider: models_to_update.append((name, config)) for old_name, config in models_to_update: config.provider = to_provider # Handle renaming if mapping provided if model_mapping and old_name in model_mapping: new_name = model_mapping[old_name] config.name = new_name # Remove old entry and add new one del self._config.models[old_name] self._config.models[new_name] = config # Update default model if needed if self._config.default_model == old_name: self._config.default_model = new_name self._notify_listeners()
# ======================================================================== # API Key Management # ========================================================================
[docs] def set_api_key( self, provider: ProviderType, api_key: Optional[str] = None, env_var: Optional[str] = None, ) -> None: """Set API key for a provider. Args: provider: Provider type api_key: Direct API key (not recommended for production) env_var: Environment variable name containing API key (recommended) """ if provider not in self._config.providers: raise ValueError(f"Provider '{provider}' not configured") provider_config = self._config.providers[provider] if env_var: provider_config.api_key_env_var = env_var provider_config.api_key = None elif api_key: provider_config.api_key = api_key provider_config.api_key_env_var = None else: raise ValueError("Either api_key or env_var must be provided") self._notify_listeners()
[docs] def get_api_key(self, provider: ProviderType) -> Optional[str]: """Get API key for a provider (resolves env vars).""" provider_config = self._config.providers.get(provider) if not provider_config: return None return provider_config.get_api_key()
[docs] def validate_api_keys(self) -> Dict[ProviderType, bool]: """Validate that all configured providers have API keys. Returns: Dictionary mapping providers to validation status """ results = {} for provider, config in self._config.providers.items(): api_key = config.get_api_key() results[provider] = api_key is not None and len(api_key) > 0 return results
# ======================================================================== # File I/O # ========================================================================
[docs] def load_from_file(self, file_path: str) -> None: """Load configuration from JSON file. Args: file_path: Path to configuration file """ path = Path(file_path) if not path.exists(): raise FileNotFoundError(f"Configuration file not found: {file_path}") with open(path, "r") as f: data = json.load(f) self._config = AppConfig.from_dict(data) self._notify_listeners()
[docs] def save_to_file(self, file_path: str, include_secrets: bool = False) -> None: """Save configuration to JSON file. Args: file_path: Path to save configuration include_secrets: Include API keys in export (not recommended) """ path = Path(file_path) path.parent.mkdir(parents=True, exist_ok=True) data = self._config.to_dict() if not include_secrets: # Remove sensitive data for provider_data in data.get("providers", {}).values(): provider_data["api_key"] = None with open(path, "w") as f: json.dump(data, f, indent=2)
[docs] def load_from_environment(self) -> None: """Load configuration from environment variables. Looks for variables in the format: - {APP_NAME}_MODEL_CONFIG__{MODEL_NAME}__{PARAM} - {APP_NAME}_PROVIDER__{PROVIDER}__{PARAM} - {APP_NAME}__{PARAM} """ prefix = self.app_name.upper().replace("-", "_") # Load general settings for key, value in os.environ.items(): if key.startswith(f"{prefix}__"): param = key[len(f"{prefix}__") :].lower() if param == "default_model": self._config.default_model = value self._notify_listeners()
[docs] def export_environment_vars(self) -> Dict[str, str]: """Export configuration as environment variables. Returns: Dictionary of environment variable names to values """ prefix = self.app_name.upper().replace("-", "_") env_vars = {} # Export default model if self._config.default_model: env_vars[f"{prefix}__DEFAULT_MODEL"] = self._config.default_model # Export provider API key env vars for provider, config in self._config.providers.items(): if config.api_key_env_var: provider_prefix = f"{prefix}_PROVIDER__{provider.value.upper()}" env_vars[f"{provider_prefix}__API_KEY_ENV"] = config.api_key_env_var return env_vars
# ======================================================================== # Validation # ========================================================================
[docs] def validate(self) -> List[str]: """Validate configuration and return list of issues. Returns: List of validation error messages (empty if valid) """ issues = [] # Check default model exists if self._config.default_model: if self._config.default_model not in self._config.models: issues.append( f"Default model '{self._config.default_model}' not found in models" ) # Check models reference valid providers for name, model in self._config.models.items(): if model.provider not in self._config.providers: issues.append( f"Model '{name}' references unconfigured provider '{model.provider.value}'" ) # Check API keys api_key_status = self.validate_api_keys() for provider, has_key in api_key_status.items(): if not has_key: issues.append(f"Provider '{provider.value}' missing API key") return issues
[docs] def is_valid(self) -> bool: """Check if configuration is valid.""" return len(self.validate()) == 0
# ======================================================================== # Secrets Management # ========================================================================
[docs] def set_secret(self, key: str, value: str) -> None: """Store a secret value with encryption. Secrets are encrypted in memory using Fernet symmetric encryption. While not as secure as dedicated secrets management services, this provides reasonable protection for prototyping and development. Args: key: Secret identifier value: Secret value to store """ if self._encryption_available and self._fernet: # Encrypt the secret before storing encrypted_value = self._fernet.encrypt(value.encode()) self._secrets[key] = encrypted_value else: # Fallback: store without encryption (with warning) warnings.warn( "Cryptography library not available. Secrets stored without encryption. " "Install with: pip install cryptography", RuntimeWarning, ) self._secrets[key] = value.encode()
[docs] def get_secret(self, key: str) -> Optional[str]: """Retrieve and decrypt a secret value. Args: key: Secret identifier Returns: Decrypted secret value or None if not found """ if key not in self._secrets: return None encrypted_value = self._secrets[key] if self._encryption_available and self._fernet: try: # Decrypt the secret decrypted_value = self._fernet.decrypt(encrypted_value) return decrypted_value.decode() except Exception: return None else: # Fallback: return unencrypted value return encrypted_value.decode()
[docs] def remove_secret(self, key: str) -> bool: """Remove a secret and securely clear it from memory. Args: key: Secret identifier Returns: True if secret was removed, False if not found """ if key in self._secrets: # Securely overwrite before deletion self._secrets[key] = b"\x00" * len(self._secrets[key]) del self._secrets[key] return True return False
[docs] def clear_secrets(self) -> None: """Clear all secrets from memory securely.""" for key in list(self._secrets.keys()): self.remove_secret(key)
[docs] def list_secret_keys(self) -> List[str]: """List all secret keys (not values). Returns: List of secret identifiers """ return list(self._secrets.keys())
[docs] def has_secret(self, key: str) -> bool: """Check if a secret exists. Args: key: Secret identifier Returns: True if secret exists """ return key in self._secrets
# ======================================================================== # Utilities # ========================================================================
[docs] def reset(self) -> None: """Reset configuration to initial state and clear secrets.""" self._config = AppConfig(app_name=self.app_name) self.clear_secrets() self._notify_listeners()
[docs] def rollback(self) -> bool: """Rollback to previous configuration. Returns: True if rollback successful, False if no history """ if self._config_history: self._config = self._config_history.pop() self._notify_listeners() return True return False
[docs] def merge_config(self, other: AppConfig, override: bool = True) -> None: """Merge another configuration into current. Args: other: Configuration to merge override: Whether to override existing values """ if override or not self._config.default_model: self._config.default_model = other.default_model # Merge providers for provider, config in other.providers.items(): if override or provider not in self._config.providers: self._config.providers[provider] = config # Merge models for name, config in other.models.items(): if override or name not in self._config.models: self._config.models[name] = config # Merge metadata if override: self._config.metadata.update(other.metadata) else: for k, v in other.metadata.items(): if k not in self._config.metadata: self._config.metadata[k] = v self._notify_listeners()
[docs] def get_model_for_task( self, task: str, fallback: Optional[str] = None, ) -> Optional[ModelConfig]: """Get recommended model for a specific task. Args: task: Task type (e.g., "completion", "embedding", "chat") fallback: Fallback model name if no match found Returns: Model configuration or None """ # Check metadata for task-specific models for name, config in self._config.models.items(): if config.metadata.get("recommended_for") == task: return config # Use fallback if fallback: return self._config.models.get(fallback) # Use default return self.get_default_model()
[docs] def clone(self) -> "ConfigManager": """Create a deep copy of the configuration manager. Note: Secrets are not cloned for security reasons. """ new_manager = ConfigManager( app_name=self.app_name, auto_load_env=False, ) new_manager._config = deepcopy(self._config) return new_manager