Source code for symfluence.optimization.optimization_manager

# SPDX-License-Identifier: GPL-3.0-or-later
# Copyright (C) 2024-2026 SYMFLUENCE Team <dev@symfluence.org>

"""Optimization Manager

Coordinates calibration runs by selecting algorithms and model-specific
optimizers. Keeps orchestration lean; algorithm details and examples are in
``docs/source/calibration`` and optimizer pages.
"""

from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional

import pandas as pd

from symfluence.core.base_manager import BaseManager
from symfluence.core.registries import R
from symfluence.optimization.core import TransformationManager
from symfluence.optimization.optimization_results_manager import OptimizationResultsManager

if TYPE_CHECKING:
    pass


[docs] class OptimizationManager(BaseManager): """Facade over model-specific optimizers for iterative calibration. Chooses algorithm, fetches optimizer from the registry, and runs it; heavy lifting stays in the optimizer classes. See docs for workflow details. """ def _initialize_services(self) -> None: """Initialize optimization services.""" self.results_manager = self._get_service( OptimizationResultsManager, self.project_dir, self.experiment_id, self.logger, self.reporting_manager ) self.transformation_manager = self._get_service( TransformationManager, self.config, self.logger ) @property def optimizers(self) -> Any: """Backward compatibility: expose registered optimizers/algorithms.""" # Return a dict-like object that satisfies 'in' and '[]' for algorithms expected by tests class OptimizerMapper: """Maps algorithm names for backward compatibility with test assertions. Provides dict-like interface supporting 'in' and '[]' operators for checking algorithm availability without instantiation. """ def __init__(self): self.algorithms = { 'DDS', 'DE', 'PSO', 'SCE-UA', 'NSGA-II', 'ASYNC-DDS', 'POP-DDS', 'ADAM', 'LBFGS' } def __contains__(self, item): return item in self.algorithms def __getitem__(self, item): if item in self.algorithms: return True # Return something truthy raise KeyError(item) return OptimizerMapper()
[docs] def run_optimization_workflow(self) -> Dict[str, Any]: """Run main optimization workflow based on configuration. Entry point for complete optimization process. Checks configuration to determine which optimization methods to execute and runs them in sequence. Currently supports 'iteration' (calibration) and handles deprecated method warnings. Workflow: 1. Check config.optimization.methods (list of methods) 2. For each enabled method: - 'iteration': Run calibrate_model() for iterative optimization - Deprecated methods: Log warnings 3. Return results from all executed methods Configuration: optimization.methods: List of optimization methods - Example: ['iteration'] enables calibration - Example: ['iteration', 'emulation'] enables both (emulation deprecated) Supported Methods: - 'iteration': Iterative parameter optimization (calibration) Deprecated Methods (logged as warnings): - 'differentiable_parameter_emulation': Use gradient-based (ADAM/LBFGS) instead - 'emulation': Use model emulation libraries instead Returns: Dict[str, Any]: Results from completed workflows - Keys: Method names (e.g., 'calibration') - Values: Path to results file as string, or None if failed - Example: {'calibration': '/path/to/results.csv'} Side Effects: - Logs method execution and warnings to logger - Calls calibrate_model() if 'iteration' enabled - May create results files and directories Examples: >>> # Standard workflow >>> opt_mgr = OptimizationManager(config, logger) >>> results = opt_mgr.run_optimization_workflow() >>> if 'calibration' in results: ... print(f"Calibration results: {results['calibration']}") >>> # With deprecated method (warning logged) >>> # config.optimization.methods = ['iteration', 'emulation'] >>> results = opt_mgr.run_optimization_workflow() >>> # Logs warning about 'emulation' being deprecated Notes: - Only 'iteration' currently implemented - Deprecated methods logged but not executed - Non-empty results dict indicates at least one method ran - Empty dict means no methods were enabled or all failed See Also: calibrate_model(): Run iterative optimization get_optimization_status(): Check optimization configuration """ results = {} optimization_methods = self._get_config_value( lambda: self.config.optimization.methods, [] ) self.logger.info(f"Running optimization workflows: {optimization_methods}") # Run iterative optimization (calibration) if 'iteration' in optimization_methods: calibration_results = self.calibrate_model() if calibration_results: results['calibration'] = str(calibration_results) # Run data assimilation (EnKF) if 'data_assimilation' in optimization_methods: from symfluence.data_assimilation.da_manager import DataAssimilationManager da_manager = DataAssimilationManager(self.config, self.logger, self.reporting_manager) da_results = da_manager.run_data_assimilation() if da_results: results['data_assimilation'] = str(da_results) # Check for deprecated methods and warn deprecated_methods = [ 'differentiable_parameter_emulation', 'emulation' ] for method in deprecated_methods: if method in optimization_methods: self.logger.warning( f"Optimization method '{method}' is deprecated and no longer supported. " "Use gradient-based optimization (ADAM/LBFGS) via standard model optimizers instead." ) return results
[docs] def calibrate_model(self) -> Optional[Path]: """Calibrate model(s) using configured optimization algorithm. Coordinates iterative parameter optimization for one or more hydrological models using the registry-based unified optimizer infrastructure. Handles configuration validation, optimizer instantiation, algorithm selection, and execution. Calibration Workflow: 1. Check if 'iteration' in config.optimization.methods - If not, log info and return None (disabled) 2. Get algorithm from config.optimization.algorithm (default: 'PSO') - Supported: DDS, ASYNC_DDS, PSO, DE, SCE-UA, NSGA-II, ADAM, LBFGS 3. Parse configured hydrological models (config.model.hydrological_model) - Comma-separated list, e.g., 'SUMMA,FUSE' - Upper-case normalization 4. For each model: a. Call _calibrate_with_registry(model, algorithm) b. Collect results c. Log completion 5. Return last result (for single model) or last of multiple Algorithm Selection: Via config.optimization.algorithm: - DDS, ASYNC-DDS, ASYNCDDS, ASYNC_DDS: Dynamically Dimensioned Search - PSO: Particle Swarm Optimization - SCE-UA: Shuffled Complex Evolution - DE: Differential Evolution - NSGA-II: Multi-objective non-dominated sorting - ADAM: Gradient-based with adaptive moments - LBFGS: Gradient-based quasi-Newton method Registry-Based Model Optimization: Each model uses model-specific optimizer from OptimizerRegistry: - OptimizerRegistry.get_optimizer('MODELNAME') returns optimizer class - Optimizer class inherits from BaseModelOptimizer - Example: SUMMAOptimizer for SUMMA calibration Configuration Parameters: Workflow Control: optimization.methods: Must contain 'iteration' optimization.algorithm: Algorithm name (PSO, DDS, ADAM, etc.) Model Selection: model.hydrological_model: Comma-separated model names Algorithm-Specific (if applicable): optimization.adam_steps: Number of steps (default: 100) optimization.adam_learning_rate: Learning rate (default: 0.01) optimization.lbfgs_steps: Max steps (default: 50) optimization.lbfgs_learning_rate: Step size (default: 0.1) Returns: Optional[Path]: Path to last completed calibration results file - None if: disabled, no models configured, or all failed - Path if: at least one model calibration completed successfully - Typically: project_dir/optimization/{model}_{algorithm}_results.csv Raises: (Caught internally, returns None instead): - Registry lookup failures - Optimizer instantiation errors - Algorithm execution failures Side Effects: - Creates project_dir/optimization/ directory - Generates model-specific results files - Logs calibration progress and status to logger - Modifies reporting_manager state (if configured) Examples: >>> # Single model with DDS >>> opt_mgr = OptimizationManager(config, logger) >>> results_path = opt_mgr.calibrate_model() >>> if results_path: ... print(f"Calibration completed: {results_path}") >>> # Multiple models (SUMMA + FUSE) >>> # config.model.hydrological_model = 'SUMMA,FUSE' >>> results_path = opt_mgr.calibrate_model() # Returns FUSE results >>> # Disabled calibration >>> # config.optimization.methods = ['forward'] (no 'iteration') >>> results_path = opt_mgr.calibrate_model() >>> assert results_path is None Notes: - Disabled silently returns None (no error) - Registry lookup errors logged and skipped - Execution errors caught and logged (non-fatal) - Multiple models: Last result returned (not aggregated) See Also: _calibrate_with_registry(): Registry-based optimizer execution run_optimization_workflow(): Top-level workflow coordinator OptimizerRegistry: Registry for model-specific optimizers BaseModelOptimizer: Base class for model optimizers """ self.logger.info("Starting model calibration") # Check if iterative optimization is enabled optimization_methods = self._get_config_value( lambda: self.config.optimization.methods, [] ) if 'iteration' not in optimization_methods: self.logger.info("Iterative optimization is disabled in configuration") return None # Get the optimization algorithm from config opt_algorithm = self._get_config_value( lambda: self.config.optimization.algorithm, 'PSO' ) try: models_str = self._get_config_value( lambda: self.config.model.hydrological_model, '' ) hydrological_models = [m.strip().upper() for m in str(models_str).split(',') if m.strip()] # Skip external DDS for FUSE when built-in SCE calibration is enabled. # FUSE's internal calib_sce (run in step 11) makes external DDS redundant. # Multi-gauge calibration always needs the external worker because # FUSE-internal SCE is single-basin only. if 'FUSE' in hydrological_models: use_internal = self._get_config_value( lambda: self.config.model.fuse.run_internal_calibration, default=True, dict_key='FUSE_RUN_INTERNAL_CALIBRATION' ) multi_gauge = self._get_config_value( lambda: self.config.calibration.multi_gauge, default=False, dict_key='MULTI_GAUGE_CALIBRATION' ) if use_internal and not multi_gauge: self.logger.info( "Skipping external optimization for FUSE — " "using built-in SCE calibration (FUSE_RUN_INTERNAL_CALIBRATION=True). " "Set FUSE_RUN_INTERNAL_CALIBRATION=False to use external DDS instead." ) hydrological_models = [m for m in hydrological_models if m != 'FUSE'] if not hydrological_models: return None # Detect coupled groundwater calibration — when a GROUNDWATER_MODEL # is configured alongside the land-surface model, route to the # COUPLED_GW optimizer so that GW parameters (K, SY, etc.) are # included in the calibration parameter space. gw_model = self._get_config_value( lambda: self.config.model.groundwater_model, None, ) if gw_model and str(gw_model).strip().upper() == 'MODFLOW': hydrological_models = ['COUPLED_GW'] self.logger.info( "Using COUPLED_GW calibration pipeline " "(GROUNDWATER_MODEL: MODFLOW detected)" ) results = [] for model in hydrological_models: result = self._calibrate_with_registry(model, opt_algorithm) if result: results.append(result) if not results: return None if len(results) > 1: self.logger.info(f"Completed calibration for {len(results)} model(s)") # Generate model comparison overview after calibration if self.reporting_manager: self.reporting_manager.generate_model_comparison_overview( experiment_id=self.experiment_id, context='calibrate_model' ) return results[-1] except (ValueError, KeyError, TypeError, AttributeError, RuntimeError) as e: self.logger.error(f"Error during model calibration: {str(e)}") import traceback self.logger.error(traceback.format_exc()) return None
def _calibrate_with_registry(self, model_name: str, algorithm: str) -> Optional[Path]: """ Calibrate a model using the OptimizerRegistry. This method uses the new unified optimizer infrastructure based on BaseModelOptimizer. It provides a cleaner, more maintainable approach to model calibration with consistent algorithm support across all models. Args: model_name: Name of the model (e.g., 'SUMMA', 'FUSE', 'NGEN') algorithm: Optimization algorithm to use Returns: Optional[Path]: Path to results file or None if calibration failed """ # Import model optimizers and parameter managers to trigger registration from symfluence.optimization import ( model_optimizers, # noqa: F401 parameter_managers, # noqa: F401 ) # Get optimizer class from registry optimizer_cls = R.optimizers.get(model_name) if optimizer_cls is None: self.logger.error(f"No optimizer registered for model: {model_name}") self.logger.info(f"Registered models: {R.optimizers.keys()}") return None # Create optimization directory opt_dir = self.project_dir / "optimization" opt_dir.mkdir(parents=True, exist_ok=True) try: # Initialize model-specific optimizer self.logger.debug(f"Using {algorithm} optimization for {model_name} (registry-based)") optimizer = optimizer_cls(self.config, self.logger, None, reporting_manager=self.reporting_manager) # Map algorithm name to method algorithm_methods = { 'DDS': optimizer.run_dds, 'ASYNC-DDS': optimizer.run_async_dds, 'ASYNCDDS': optimizer.run_async_dds, 'ASYNC_DDS': optimizer.run_async_dds, 'PSO': optimizer.run_pso, 'SCE-UA': optimizer.run_sce, 'DE': optimizer.run_de, 'NSGA-II': optimizer.run_nsga2, 'ADAM': lambda: optimizer.run_adam( steps=self._get_config_value( lambda: self.config.optimization.adam_steps, 100 ), lr=self._get_config_value( lambda: self.config.optimization.adam_learning_rate, 0.01 ) ), 'LBFGS': lambda: optimizer.run_lbfgs( steps=self._get_config_value( lambda: self.config.optimization.lbfgs_steps, 50 ), lr=self._get_config_value( lambda: self.config.optimization.lbfgs_learning_rate, 0.1 ) ), 'CMA-ES': optimizer.run_cmaes, 'CMAES': optimizer.run_cmaes, 'DREAM': optimizer.run_dream, 'GLUE': optimizer.run_glue, 'BASIN-HOPPING': optimizer.run_basin_hopping, 'BASINHOPPING': optimizer.run_basin_hopping, 'BH': optimizer.run_basin_hopping, 'NELDER-MEAD': optimizer.run_nelder_mead, 'NELDERMEAD': optimizer.run_nelder_mead, 'NM': optimizer.run_nelder_mead, 'SIMPLEX': optimizer.run_nelder_mead, 'GA': optimizer.run_ga, 'BAYESIAN-OPT': optimizer.run_bayesian_opt, 'BAYESIAN_OPT': optimizer.run_bayesian_opt, 'BAYESIAN': optimizer.run_bayesian_opt, 'BO': optimizer.run_bayesian_opt, 'MOEAD': optimizer.run_moead, 'MOEA-D': optimizer.run_moead, 'MOEA_D': optimizer.run_moead, 'SIMULATED-ANNEALING': optimizer.run_simulated_annealing, 'SIMULATED_ANNEALING': optimizer.run_simulated_annealing, 'SA': optimizer.run_simulated_annealing, 'ANNEALING': optimizer.run_simulated_annealing, 'ABC': optimizer.run_abc, 'ABC-SMC': optimizer.run_abc, 'ABC_SMC': optimizer.run_abc, 'APPROXIMATE-BAYESIAN': optimizer.run_abc, } # Get algorithm method run_method = algorithm_methods.get(algorithm) if run_method is None: self.logger.error(f"Algorithm {algorithm} not supported for registry-based optimization") self.logger.info(f"Supported algorithms: {list(algorithm_methods.keys())}") return None # Run optimization results_file = run_method() if results_file and Path(results_file).exists(): self.logger.info(f"{model_name} calibration completed: {results_file}") # Generate calibration diagnostics if self.reporting_manager: try: # Load results for diagnostic visualization results_df = pd.read_csv(results_file) # Extract optimization history history = [] iter_candidates = ['iteration', 'Iteration'] iter_col = next((c for c in iter_candidates if c in results_df.columns), None) if iter_col: obj_cols = [c for c in results_df.columns if any(k in c.lower() for k in ['objective', 'kge', 'fitness', 'score', 'nse', 'rmse'])] obj_col = obj_cols[0] if obj_cols else None if obj_col: for _, row in results_df.iterrows(): history.append({ 'iteration': row[iter_col], 'objective': row[obj_col] }) # Extract best parameters - find actual best row, not just row 0 non_param = {'iteration', 'Iteration', 'objective', 'Objective', 'kge', 'KGE', 'fitness', 'score', 'timestamp', 'crash_count', 'crash_rate', 'Calib_RMSE', 'Calib_KGE', 'Calib_KGEp', 'Calib_KGEnp', 'Calib_NSE', 'Calib_MAE'} param_cols = [c for c in results_df.columns if c not in non_param] best_params = {} if not results_df.empty and param_cols: # Try to load best params from JSON file first (most reliable) import json best_params_json = results_file.parent / f"{self.experiment_id}_{algorithm.lower().replace('/', '_')}_best_params.json" if best_params_json.exists(): try: with open(best_params_json, 'r', encoding='utf-8') as f: best_data = json.load(f) best_params = best_data.get('best_params', {}) except (json.JSONDecodeError, IOError): pass # Fallback: find best row by score column if not best_params: score_cols = [c for c in results_df.columns if c.lower() in ['score', 'objective', 'kge', 'fitness', 'nse']] if score_cols: score_col = score_cols[0] # Determine if we should minimize or maximize metric = self._get_config_value(lambda: self.config.optimization.metric, 'KGE').upper() minimize_metrics = {'RMSE', 'MAE', 'BIAS', 'MSE', 'MARE', 'PBIAS', 'NRMSE'} if metric in minimize_metrics: best_idx = results_df[score_col].idxmin() else: best_idx = results_df[score_col].idxmax() best_row = results_df.loc[best_idx] else: # No score column found, fall back to last row (most recent) best_row = results_df.iloc[-1] for col in param_cols: try: best_params[col] = float(best_row[col]) except (ValueError, TypeError): pass self.reporting_manager.diagnostic_calibration( history=history if history else None, best_params=best_params if best_params else None, obs_vs_sim=None, # Would need to load from model outputs model_name=model_name ) except (OSError, FileNotFoundError, KeyError, ValueError, TypeError, RuntimeError) as e: self.logger.debug(f"Could not generate calibration diagnostics: {e}") except Exception as e: # noqa: BLE001 — must-not-raise contract self.logger.exception(f"Unexpected calibration diagnostics failure: {e}") return results_file else: self.logger.warning(f"{model_name} calibration completed but results file not found") return None except (KeyError, TypeError, ValueError, RuntimeError) as e: self.logger.error(f"Error during {model_name} {algorithm} optimization: {str(e)}") import traceback self.logger.error(traceback.format_exc()) return None
[docs] def get_optimization_status(self) -> Dict[str, Any]: """ Get status of optimization operations. Returns: Dict[str, Any]: Dictionary containing optimization status information """ optimization_methods = self._get_config_value( lambda: self.config.optimization.methods, [] ) algorithm = self._get_config_value( lambda: self.config.optimization.algorithm, 'PSO' ).lower() status = { 'iterative_optimization_enabled': 'iteration' in optimization_methods, 'optimization_algorithm': algorithm.upper(), 'optimization_metric': self._get_config_value( lambda: self.config.optimization.metric, 'KGE' ), 'optimization_dir': str(self.project_dir / "optimization"), 'results_exist': False, } # Check for optimization results - include algorithm subdirectory # BaseModelOptimizer saves to: {project_dir}/optimization/{algorithm}_{experiment_id}/... results_dir = self.project_dir / "optimization" / f"{algorithm}_{self.experiment_id}" results_file = results_dir / f"{self.experiment_id}_parallel_iteration_results.csv" status['results_exist'] = results_file.exists() # Also check legacy path for backward compatibility if not status['results_exist']: legacy_file = self.project_dir / "optimization" / f"{self.experiment_id}_parallel_iteration_results.csv" status['results_exist'] = legacy_file.exists() return status
[docs] def validate_optimization_configuration(self) -> Dict[str, bool]: """ Validate optimization configuration settings. .. deprecated:: Use :meth:`validate_readiness` instead. Algorithm and metric validation is now handled by Pydantic validators at config construction time. Returns: Dict[str, bool]: Dictionary containing validation results """ import warnings warnings.warn( "validate_optimization_configuration() is deprecated, use validate_readiness()", DeprecationWarning, stacklevel=2, ) readiness = self.validate_readiness() # Return legacy-compatible shape return { 'model_supported': readiness.get('model_supported', False), 'parameters_defined': readiness.get('parameters_defined', False), }
[docs] def validate_readiness(self) -> Dict[str, bool]: """ Validate that this manager is ready for execution. Checks runtime prerequisites that Pydantic cannot verify: whether the configured model has a registered optimizer and whether calibration parameters are defined. Returns: Dict mapping check names to pass/fail booleans. """ results = {} # Check model support via optimizer registry models_str = self._get_config_value( lambda: self.config.model.hydrological_model, '' ) models = [m.strip().upper() for m in str(models_str).split(',') if m.strip()] model_supported = any( R.optimizers.get(m) is not None for m in models ) if models else False results['model_supported'] = model_supported # Check parameters to calibrate local_params = self._get_config_value( lambda: self.config.optimization.params_to_calibrate, '' ) basin_params = self._get_config_value( lambda: self.config.optimization.basin_params_to_calibrate, '' ) results['parameters_defined'] = bool(local_params or basin_params) return results
[docs] def get_available_optimizers(self) -> Dict[str, str]: """ Get list of available optimization algorithms. Returns: Dict[str, str]: Dictionary mapping algorithm identifiers to their descriptions """ return { 'PSO': 'Particle Swarm Optimization', 'SCE-UA': 'Shuffled Complex Evolution', 'DDS': 'Dynamically Dimensioned Search', 'DE': 'Differential Evolution', 'NSGA-II': 'Non-dominated Sorting Genetic Algorithm II', 'ASYNC-DDS': 'Asynchronous Dynamically Dimensioned Search', 'POP-DDS': 'Population Dynamically Dimensioned Search', }
def _apply_parameter_transformations(self, params: Dict[str, float], settings_dir: Path) -> bool: """ Applies transformations to parameters (e.g., soil depth multipliers). """ return self.transformation_manager.transform(params, settings_dir) def _calculate_multivariate_objective(self, sim_results: Dict[str, pd.Series]) -> float: """ Calculates a composite objective score from multivariate simulation results. """ # 1. Get the multivariate objective handler obj_cls = R.objectives.get('MULTIVARIATE') objective_handler = obj_cls(self.config, self.logger) if obj_cls else None if not objective_handler: return 1000.0 # 2. Use AnalysisManager to evaluate variables from symfluence.evaluation.analysis_manager import AnalysisManager am = AnalysisManager(self.config, self.logger) eval_results = am.run_multivariate_evaluation(sim_results) # 3. Calculate scalar objective return objective_handler.calculate(eval_results)
[docs] def load_optimization_results(self, filename: str = None) -> Optional[Dict]: """ Load optimization results from file. Args: filename (str, optional): Name of results file to load. If None, uses the default filename based on experiment_id. Returns: Optional[Dict]: Dictionary with optimization results. Returns None if loading fails. """ try: results_df = self.results_manager.load_optimization_results(filename) if results_df is None: return None # Find best iteration by score instead of assuming row 0 score_cols = [c for c in results_df.columns if c.lower() in ['score', 'objective', 'kge', 'fitness', 'nse']] if score_cols and not results_df.empty: score_col = score_cols[0] # Determine if we should minimize or maximize metric = self._get_config_value(lambda: self.config.optimization.metric, 'KGE').upper() minimize_metrics = {'RMSE', 'MAE', 'BIAS', 'MSE', 'MARE', 'PBIAS', 'NRMSE'} if metric in minimize_metrics: best_idx = results_df[score_col].idxmin() else: best_idx = results_df[score_col].idxmax() best_iteration = results_df.loc[best_idx].to_dict() else: # Fall back to last row if no score column found best_iteration = results_df.iloc[-1].to_dict() if not results_df.empty else {} # Convert DataFrame to dictionary format results = { 'parameters': results_df.to_dict(orient='records'), 'best_iteration': best_iteration, 'columns': results_df.columns.tolist() } return results except (FileNotFoundError, ValueError, KeyError) as e: self.logger.error(f"Error loading optimization results: {str(e)}") return None