# SPDX-License-Identifier: GPL-3.0-or-later
# Copyright (C) 2024-2026 SYMFLUENCE Team <dev@symfluence.org>
"""Optimization Manager
Coordinates calibration runs by selecting algorithms and model-specific
optimizers. Keeps orchestration lean; algorithm details and examples are in
``docs/source/calibration`` and optimizer pages.
"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional
import pandas as pd
from symfluence.core.base_manager import BaseManager
from symfluence.core.constants import SupportedModels
from symfluence.core.registries import R
from symfluence.optimization.core import TransformationManager
from symfluence.optimization.optimization_results_manager import OptimizationResultsManager
if TYPE_CHECKING:
pass
# Models whose "calibration" is internal training (gradient descent during the
# run step), not an external DDS/PSO parameter search. They register no
# optimizer/worker and are skipped by the registry calibration path. Canonical
# definition lives in SupportedModels so the sensitivity-analysis path shares it.
_SELF_TRAINING_MODELS = SupportedModels.SELF_TRAINING
[docs]
class OptimizationManager(BaseManager):
"""Facade over model-specific optimizers for iterative calibration.
Chooses algorithm, fetches optimizer from the registry, and runs it; heavy
lifting stays in the optimizer classes. See docs for workflow details.
"""
def _initialize_services(self) -> None:
"""Initialize optimization services."""
self.results_manager = self._get_service(
OptimizationResultsManager,
self.project_dir,
self.experiment_id,
self.logger,
self.reporting_manager
)
self.transformation_manager = self._get_service(
TransformationManager,
self.config,
self.logger
)
@property
def optimizers(self) -> Any:
"""Backward compatibility: expose registered optimizers/algorithms."""
# Return a dict-like object that satisfies 'in' and '[]' for algorithms expected by tests
class OptimizerMapper:
"""Maps algorithm names for backward compatibility with test assertions.
Provides dict-like interface supporting 'in' and '[]' operators
for checking algorithm availability without instantiation.
"""
def __init__(self):
self.algorithms = {
'DDS', 'DE', 'PSO', 'SCE-UA', 'NSGA-II', 'ASYNC-DDS', 'POP-DDS',
'ADAM', 'LBFGS'
}
def __contains__(self, item):
return item in self.algorithms
def __getitem__(self, item):
if item in self.algorithms:
return True # Return something truthy
raise KeyError(item)
return OptimizerMapper()
[docs]
def run_optimization_workflow(self) -> Dict[str, Any]:
"""Run main optimization workflow based on configuration.
Entry point for complete optimization process. Checks configuration
to determine which optimization methods to execute and runs them
in sequence. Currently supports 'iteration' (calibration) and handles
deprecated method warnings.
Workflow:
1. Check config.optimization.methods (list of methods)
2. For each enabled method:
- 'iteration': Run calibrate_model() for iterative optimization
- Deprecated methods: Log warnings
3. Return results from all executed methods
Configuration:
optimization.methods: List of optimization methods
- Example: ['iteration'] enables calibration
- Example: ['iteration', 'emulation'] enables both (emulation deprecated)
Supported Methods:
- 'iteration': Iterative parameter optimization (calibration)
Deprecated Methods (logged as warnings):
- 'differentiable_parameter_emulation': Use gradient-based (ADAM/LBFGS) instead
- 'emulation': Use model emulation libraries instead
Returns:
Dict[str, Any]: Results from completed workflows
- Keys: Method names (e.g., 'calibration')
- Values: Path to results file as string, or None if failed
- Example: {'calibration': '/path/to/results.csv'}
Side Effects:
- Logs method execution and warnings to logger
- Calls calibrate_model() if 'iteration' enabled
- May create results files and directories
Examples:
>>> # Standard workflow
>>> opt_mgr = OptimizationManager(config, logger)
>>> results = opt_mgr.run_optimization_workflow()
>>> if 'calibration' in results:
... print(f"Calibration results: {results['calibration']}")
>>> # With deprecated method (warning logged)
>>> # config.optimization.methods = ['iteration', 'emulation']
>>> results = opt_mgr.run_optimization_workflow()
>>> # Logs warning about 'emulation' being deprecated
Notes:
- Only 'iteration' currently implemented
- Deprecated methods logged but not executed
- Non-empty results dict indicates at least one method ran
- Empty dict means no methods were enabled or all failed
See Also:
calibrate_model(): Run iterative optimization
get_optimization_status(): Check optimization configuration
"""
results = {}
optimization_methods = self._get_config_value(
lambda: self.config.optimization.methods,
[]
)
self.logger.info(f"Running optimization workflows: {optimization_methods}")
# Run iterative optimization (calibration)
if 'iteration' in optimization_methods:
calibration_results = self.calibrate_model()
if calibration_results:
results['calibration'] = str(calibration_results)
# Run data assimilation (EnKF)
if 'data_assimilation' in optimization_methods:
from symfluence.data_assimilation.da_manager import DataAssimilationManager
da_manager = DataAssimilationManager(self.config, self.logger, self.reporting_manager)
da_results = da_manager.run_data_assimilation()
if da_results:
results['data_assimilation'] = str(da_results)
# Check for deprecated methods and warn
deprecated_methods = [
'differentiable_parameter_emulation',
'emulation'
]
for method in deprecated_methods:
if method in optimization_methods:
self.logger.warning(
f"Optimization method '{method}' is deprecated and no longer supported. "
"Use gradient-based optimization (ADAM/LBFGS) via standard model optimizers instead."
)
return results
[docs]
def calibrate_model(self) -> Optional[Path]:
"""Calibrate model(s) using configured optimization algorithm.
Coordinates iterative parameter optimization for one or more hydrological
models using the registry-based unified optimizer infrastructure. Handles
configuration validation, optimizer instantiation, algorithm selection,
and execution.
Calibration Workflow:
1. Check if 'iteration' in config.optimization.methods
- If not, log info and return None (disabled)
2. Get algorithm from config.optimization.algorithm (default: 'PSO')
- Supported: DDS, ASYNC_DDS, PSO, DE, SCE-UA, NSGA-II, ADAM, LBFGS
3. Parse configured hydrological models (config.model.hydrological_model)
- Comma-separated list, e.g., 'SUMMA,FUSE'
- Upper-case normalization
4. For each model:
a. Call _calibrate_with_registry(model, algorithm)
b. Collect results
c. Log completion
5. Return last result (for single model) or last of multiple
Algorithm Selection:
Via config.optimization.algorithm:
- DDS, ASYNC-DDS, ASYNCDDS, ASYNC_DDS: Dynamically Dimensioned Search
- PSO: Particle Swarm Optimization
- SCE-UA: Shuffled Complex Evolution
- DE: Differential Evolution
- NSGA-II: Multi-objective non-dominated sorting
- ADAM: Gradient-based with adaptive moments
- LBFGS: Gradient-based quasi-Newton method
Registry-Based Model Optimization:
Each model uses model-specific optimizer from OptimizerRegistry:
- OptimizerRegistry.get_optimizer('MODELNAME') returns optimizer class
- Optimizer class inherits from BaseModelOptimizer
- Example: SUMMAOptimizer for SUMMA calibration
Configuration Parameters:
Workflow Control:
optimization.methods: Must contain 'iteration'
optimization.algorithm: Algorithm name (PSO, DDS, ADAM, etc.)
Model Selection:
model.hydrological_model: Comma-separated model names
Algorithm-Specific (if applicable):
optimization.adam_steps: Number of steps (default: 100)
optimization.adam_learning_rate: Learning rate (default: 0.01)
optimization.lbfgs_steps: Max steps (default: 50)
optimization.lbfgs_learning_rate: Step size (default: 0.1)
Returns:
Optional[Path]: Path to last completed calibration results file
- None if: disabled, no models configured, or all failed
- Path if: at least one model calibration completed successfully
- Typically: project_dir/optimization/{model}_{algorithm}_results.csv
Raises:
(Caught internally, returns None instead):
- Registry lookup failures
- Optimizer instantiation errors
- Algorithm execution failures
Side Effects:
- Creates project_dir/optimization/ directory
- Generates model-specific results files
- Logs calibration progress and status to logger
- Modifies reporting_manager state (if configured)
Examples:
>>> # Single model with DDS
>>> opt_mgr = OptimizationManager(config, logger)
>>> results_path = opt_mgr.calibrate_model()
>>> if results_path:
... print(f"Calibration completed: {results_path}")
>>> # Multiple models (SUMMA + FUSE)
>>> # config.model.hydrological_model = 'SUMMA,FUSE'
>>> results_path = opt_mgr.calibrate_model() # Returns FUSE results
>>> # Disabled calibration
>>> # config.optimization.methods = ['forward'] (no 'iteration')
>>> results_path = opt_mgr.calibrate_model()
>>> assert results_path is None
Notes:
- Disabled silently returns None (no error)
- Registry lookup errors logged and skipped
- Execution errors caught and logged (non-fatal)
- Multiple models: Last result returned (not aggregated)
See Also:
_calibrate_with_registry(): Registry-based optimizer execution
run_optimization_workflow(): Top-level workflow coordinator
OptimizerRegistry: Registry for model-specific optimizers
BaseModelOptimizer: Base class for model optimizers
"""
self.logger.info("Starting model calibration")
# Check if iterative optimization is enabled
optimization_methods = self._get_config_value(
lambda: self.config.optimization.methods,
[]
)
if 'iteration' not in optimization_methods:
self.logger.info("Iterative optimization is disabled in configuration")
return None
# Get the optimization algorithm from config
opt_algorithm = self._get_config_value(
lambda: self.config.optimization.algorithm,
'PSO'
)
try:
models_str = self._get_config_value(
lambda: self.config.model.hydrological_model,
''
)
hydrological_models = [m.strip().upper() for m in str(models_str).split(',') if m.strip()]
# Skip external DDS for FUSE when built-in SCE calibration is enabled.
# FUSE's internal calib_sce (run in step 11) makes external DDS redundant.
# Multi-gauge calibration always needs the external worker because
# FUSE-internal SCE is single-basin only.
if 'FUSE' in hydrological_models:
use_internal = self._get_config_value(
lambda: self.config.model.fuse.run_internal_calibration,
default=True,
dict_key='FUSE_RUN_INTERNAL_CALIBRATION'
)
multi_gauge = self._get_config_value(
lambda: self.config.calibration.multi_gauge,
default=False,
dict_key='MULTI_GAUGE_CALIBRATION'
)
if use_internal and not multi_gauge:
self.logger.info(
"Skipping external optimization for FUSE — "
"using built-in SCE calibration (FUSE_RUN_INTERNAL_CALIBRATION=True). "
"Set FUSE_RUN_INTERNAL_CALIBRATION=False to use external DDS instead."
)
hydrological_models = [m for m in hydrological_models if m != 'FUSE']
if not hydrological_models:
return None
# Skip external optimization for self-training models (ML).
# LSTM/GNN learn their weights during the run step (run_lstm/run_gnn)
# via gradient descent — there are no physical parameters for the
# DDS/PSO loop to calibrate, so they register no optimizer/worker.
self_training = [m for m in hydrological_models if m in _SELF_TRAINING_MODELS]
if self_training:
self.logger.info(
"Skipping external optimization for %s — these models train "
"internally during run_model; no parameter calibration is performed.",
', '.join(self_training),
)
hydrological_models = [m for m in hydrological_models if m not in _SELF_TRAINING_MODELS]
if not hydrological_models:
return None
# Detect coupled groundwater calibration — when a GROUNDWATER_MODEL
# is configured alongside the land-surface model, route to the
# COUPLED_GW optimizer so that GW parameters (K, SY, etc.) are
# included in the calibration parameter space.
gw_model = self._get_config_value(
lambda: self.config.model.groundwater_model,
None,
)
if gw_model and str(gw_model).strip().upper() == 'MODFLOW':
hydrological_models = ['COUPLED_GW']
self.logger.info(
"Using COUPLED_GW calibration pipeline "
"(GROUNDWATER_MODEL: MODFLOW detected)"
)
# Detect coupled routing calibration — when a *calibratable* routing
# model (one exposing a registered parameter manager, e.g. DROUTE) is
# configured alongside the land model, route to the generic COUPLED
# optimizer so the routing parameters join the calibration space and
# the land->routing run goes through the dCoupler graph. (mizuRoute /
# t-route expose no parameter manager and so stay land-only here.)
elif self._coupled_routing_requested():
routing_model = str(self._get_config_value(
lambda: self.config.model.routing_model, '', dict_key='ROUTING_MODEL'
)).split(',')[0].strip().upper()
hydrological_models = ['COUPLED']
self.logger.info(
"Using COUPLED calibration pipeline "
f"(calibratable routing model {routing_model} detected)"
)
results = []
for model in hydrological_models:
result = self._calibrate_with_registry(model, opt_algorithm)
if result:
results.append(result)
if not results:
return None
if len(results) > 1:
self.logger.info(f"Completed calibration for {len(results)} model(s)")
# Generate model comparison overview after calibration
if self.reporting_manager:
self.reporting_manager.generate_model_comparison_overview(
experiment_id=self.experiment_id,
context='calibrate_model'
)
return results[-1]
except (ValueError, KeyError, TypeError, AttributeError, RuntimeError) as e:
self.logger.error(f"Error during model calibration: {str(e)}")
import traceback
self.logger.error(traceback.format_exc())
return None
def _coupled_routing_requested(self) -> bool:
"""True when a calibratable dCoupler routing model is coupled to the land model.
A routing model is "calibratable" when it exposes a registered parameter manager (e.g.
DROUTE). mizuRoute / t-route route but expose no parameter manager, so they stay land-only.
Honours an explicit ``CALIBRATE_ROUTING`` opt-out.
"""
routing_model = str(self._get_config_value(
lambda: self.config.model.routing_model, '', dict_key='ROUTING_MODEL'
)).split(',')[0].strip().upper()
if not routing_model or routing_model in ('', 'NONE', 'N/A'):
return False
if R.parameter_managers.get(routing_model) is None:
return False
calibrate = self._get_config_value(
lambda: self.config.calibration.calibrate_routing, default=True,
dict_key='CALIBRATE_ROUTING',
)
return bool(calibrate)
def _calibrate_with_registry(self, model_name: str, algorithm: str) -> Optional[Path]:
"""
Calibrate a model using the OptimizerRegistry.
This method uses the new unified optimizer infrastructure based on
BaseModelOptimizer. It provides a cleaner, more maintainable approach
to model calibration with consistent algorithm support across all models.
Args:
model_name: Name of the model (e.g., 'SUMMA', 'FUSE', 'NGEN')
algorithm: Optimization algorithm to use
Returns:
Optional[Path]: Path to results file or None if calibration failed
"""
# Import model optimizers and parameter managers to trigger registration
from symfluence.optimization import (
model_optimizers, # noqa: F401
parameter_managers, # noqa: F401
)
# Get optimizer class from registry
optimizer_cls = R.optimizers.get(model_name)
if optimizer_cls is None:
self.logger.error(f"No optimizer registered for model: {model_name}")
self.logger.info(f"Registered models: {R.optimizers.keys()}")
return None
# Create optimization directory
opt_dir = self.project_dir / "optimization"
opt_dir.mkdir(parents=True, exist_ok=True)
try:
# Initialize model-specific optimizer
self.logger.debug(f"Using {algorithm} optimization for {model_name} (registry-based)")
optimizer = optimizer_cls(self.config, self.logger, None, reporting_manager=self.reporting_manager)
# Map algorithm name to method
algorithm_methods = {
'DDS': optimizer.run_dds,
'ASYNC-DDS': optimizer.run_async_dds,
'ASYNCDDS': optimizer.run_async_dds,
'ASYNC_DDS': optimizer.run_async_dds,
'PSO': optimizer.run_pso,
'SCE-UA': optimizer.run_sce,
'DE': optimizer.run_de,
'NSGA-II': optimizer.run_nsga2,
'ADAM': lambda: optimizer.run_adam(
steps=self._get_config_value(
lambda: self.config.optimization.adam_steps,
100
),
lr=self._get_config_value(
lambda: self.config.optimization.adam_learning_rate,
0.01
)
),
'LBFGS': lambda: optimizer.run_lbfgs(
steps=self._get_config_value(
lambda: self.config.optimization.lbfgs_steps,
50
),
lr=self._get_config_value(
lambda: self.config.optimization.lbfgs_learning_rate,
0.1
)
),
'CMA-ES': optimizer.run_cmaes,
'CMAES': optimizer.run_cmaes,
'DREAM': optimizer.run_dream,
'GLUE': optimizer.run_glue,
'BASIN-HOPPING': optimizer.run_basin_hopping,
'BASINHOPPING': optimizer.run_basin_hopping,
'BH': optimizer.run_basin_hopping,
'NELDER-MEAD': optimizer.run_nelder_mead,
'NELDERMEAD': optimizer.run_nelder_mead,
'NM': optimizer.run_nelder_mead,
'SIMPLEX': optimizer.run_nelder_mead,
'GA': optimizer.run_ga,
'BAYESIAN-OPT': optimizer.run_bayesian_opt,
'BAYESIAN_OPT': optimizer.run_bayesian_opt,
'BAYESIAN': optimizer.run_bayesian_opt,
'BO': optimizer.run_bayesian_opt,
'MOEAD': optimizer.run_moead,
'MOEA-D': optimizer.run_moead,
'MOEA_D': optimizer.run_moead,
'SIMULATED-ANNEALING': optimizer.run_simulated_annealing,
'SIMULATED_ANNEALING': optimizer.run_simulated_annealing,
'SA': optimizer.run_simulated_annealing,
'ANNEALING': optimizer.run_simulated_annealing,
'ABC': optimizer.run_abc,
'ABC-SMC': optimizer.run_abc,
'ABC_SMC': optimizer.run_abc,
'APPROXIMATE-BAYESIAN': optimizer.run_abc,
}
# Get algorithm method
run_method = algorithm_methods.get(algorithm)
if run_method is None:
self.logger.error(f"Algorithm {algorithm} not supported for registry-based optimization")
self.logger.info(f"Supported algorithms: {list(algorithm_methods.keys())}")
return None
# Run optimization
results_file = run_method()
if results_file and Path(results_file).exists():
self.logger.info(f"{model_name} calibration completed: {results_file}")
# Generate calibration diagnostics
if self.reporting_manager:
try:
# Load results for diagnostic visualization
results_df = pd.read_csv(results_file)
# Extract optimization history
history = []
iter_candidates = ['iteration', 'Iteration']
iter_col = next((c for c in iter_candidates if c in results_df.columns), None)
if iter_col:
obj_cols = [c for c in results_df.columns
if any(k in c.lower() for k in ['objective', 'kge', 'fitness', 'score', 'nse', 'rmse'])]
obj_col = obj_cols[0] if obj_cols else None
if obj_col:
for _, row in results_df.iterrows():
history.append({
'iteration': row[iter_col],
'objective': row[obj_col]
})
# Extract best parameters - find actual best row, not just row 0
non_param = {'iteration', 'Iteration', 'objective', 'Objective',
'kge', 'KGE', 'fitness', 'score', 'timestamp',
'crash_count', 'crash_rate', 'Calib_RMSE',
'Calib_KGE', 'Calib_KGEp', 'Calib_KGEnp',
'Calib_NSE', 'Calib_MAE'}
param_cols = [c for c in results_df.columns if c not in non_param]
best_params = {}
if not results_df.empty and param_cols:
# Try to load best params from JSON file first (most reliable)
import json
best_params_json = results_file.parent / f"{self.experiment_id}_{algorithm.lower().replace('/', '_')}_best_params.json"
if best_params_json.exists():
try:
with open(best_params_json, 'r', encoding='utf-8') as f:
best_data = json.load(f)
best_params = best_data.get('best_params', {})
except (json.JSONDecodeError, IOError):
pass
# Fallback: find best row by score column
if not best_params:
score_cols = [c for c in results_df.columns if c.lower() in ['score', 'objective', 'kge', 'fitness', 'nse']]
if score_cols:
score_col = score_cols[0]
# Determine if we should minimize or maximize
metric = self._get_config_value(lambda: self.config.optimization.metric, 'KGE').upper()
minimize_metrics = {'RMSE', 'MAE', 'BIAS', 'MSE', 'MARE', 'PBIAS', 'NRMSE'}
if metric in minimize_metrics:
best_idx = results_df[score_col].idxmin()
else:
best_idx = results_df[score_col].idxmax()
best_row = results_df.loc[best_idx]
else:
# No score column found, fall back to last row (most recent)
best_row = results_df.iloc[-1]
for col in param_cols:
try:
best_params[col] = float(best_row[col])
except (ValueError, TypeError):
pass
self.reporting_manager.diagnostic_calibration(
history=history if history else None,
best_params=best_params if best_params else None,
obs_vs_sim=None, # Would need to load from model outputs
model_name=model_name
)
except (OSError, FileNotFoundError, KeyError, ValueError, TypeError, RuntimeError) as e:
self.logger.debug(f"Could not generate calibration diagnostics: {e}")
except Exception as e: # noqa: BLE001 — must-not-raise contract
self.logger.exception(f"Unexpected calibration diagnostics failure: {e}")
return results_file
else:
self.logger.warning(f"{model_name} calibration completed but results file not found")
return None
except (KeyError, TypeError, ValueError, RuntimeError) as e:
self.logger.error(f"Error during {model_name} {algorithm} optimization: {str(e)}")
import traceback
self.logger.error(traceback.format_exc())
return None
[docs]
def get_optimization_status(self) -> Dict[str, Any]:
"""
Get status of optimization operations.
Returns:
Dict[str, Any]: Dictionary containing optimization status information
"""
optimization_methods = self._get_config_value(
lambda: self.config.optimization.methods,
[]
)
algorithm = self._get_config_value(
lambda: self.config.optimization.algorithm,
'PSO'
).lower()
status = {
'iterative_optimization_enabled': 'iteration' in optimization_methods,
'optimization_algorithm': algorithm.upper(),
'optimization_metric': self._get_config_value(
lambda: self.config.optimization.metric,
'KGE'
),
'optimization_dir': str(self.project_dir / "optimization"),
'results_exist': False,
}
# Check for optimization results - include algorithm subdirectory
# BaseModelOptimizer saves to: {project_dir}/optimization/{algorithm}_{experiment_id}/...
results_dir = self.project_dir / "optimization" / f"{algorithm}_{self.experiment_id}"
results_file = results_dir / f"{self.experiment_id}_parallel_iteration_results.csv"
status['results_exist'] = results_file.exists()
# Also check legacy path for backward compatibility
if not status['results_exist']:
legacy_file = self.project_dir / "optimization" / f"{self.experiment_id}_parallel_iteration_results.csv"
status['results_exist'] = legacy_file.exists()
return status
[docs]
def validate_optimization_configuration(self) -> Dict[str, bool]:
"""
Validate optimization configuration settings.
.. deprecated::
Use :meth:`validate_readiness` instead. Algorithm and metric
validation is now handled by Pydantic validators at config
construction time.
Returns:
Dict[str, bool]: Dictionary containing validation results
"""
import warnings
warnings.warn(
"validate_optimization_configuration() is deprecated, use validate_readiness()",
DeprecationWarning,
stacklevel=2,
)
readiness = self.validate_readiness()
# Return legacy-compatible shape
return {
'model_supported': readiness.get('model_supported', False),
'parameters_defined': readiness.get('parameters_defined', False),
}
[docs]
def validate_readiness(self) -> Dict[str, bool]:
"""
Validate that this manager is ready for execution.
Checks runtime prerequisites that Pydantic cannot verify:
whether the configured model has a registered optimizer and
whether calibration parameters are defined.
Returns:
Dict mapping check names to pass/fail booleans.
"""
results = {}
# Check model support via optimizer registry
models_str = self._get_config_value(
lambda: self.config.model.hydrological_model,
''
)
models = [m.strip().upper() for m in str(models_str).split(',') if m.strip()]
model_supported = any(
R.optimizers.get(m) is not None for m in models
) if models else False
results['model_supported'] = model_supported
# Check parameters to calibrate
local_params = self._get_config_value(
lambda: self.config.optimization.params_to_calibrate,
''
)
basin_params = self._get_config_value(
lambda: self.config.optimization.basin_params_to_calibrate,
''
)
results['parameters_defined'] = bool(local_params or basin_params)
return results
[docs]
def get_available_optimizers(self) -> Dict[str, str]:
"""
Get list of available optimization algorithms.
Returns:
Dict[str, str]: Dictionary mapping algorithm identifiers to their descriptions
"""
return {
'PSO': 'Particle Swarm Optimization',
'SCE-UA': 'Shuffled Complex Evolution',
'DDS': 'Dynamically Dimensioned Search',
'DE': 'Differential Evolution',
'NSGA-II': 'Non-dominated Sorting Genetic Algorithm II',
'ASYNC-DDS': 'Asynchronous Dynamically Dimensioned Search',
'POP-DDS': 'Population Dynamically Dimensioned Search',
}
def _apply_parameter_transformations(self, params: Dict[str, float], settings_dir: Path) -> bool:
"""
Applies transformations to parameters (e.g., soil depth multipliers).
"""
return self.transformation_manager.transform(params, settings_dir)
def _calculate_multivariate_objective(self, sim_results: Dict[str, pd.Series]) -> float:
"""
Calculates a composite objective score from multivariate simulation results.
"""
# 1. Get the multivariate objective handler
obj_cls = R.objectives.get('MULTIVARIATE')
objective_handler = obj_cls(self.config, self.logger) if obj_cls else None
if not objective_handler:
return 1000.0
# 2. Use AnalysisManager to evaluate variables
from symfluence.evaluation.analysis_manager import AnalysisManager
am = AnalysisManager(self.config, self.logger)
eval_results = am.run_multivariate_evaluation(sim_results)
# 3. Calculate scalar objective
return objective_handler.calculate(eval_results)
[docs]
def load_optimization_results(self, filename: str = None) -> Optional[Dict]:
"""
Load optimization results from file.
Args:
filename (str, optional): Name of results file to load. If None, uses
the default filename based on experiment_id.
Returns:
Optional[Dict]: Dictionary with optimization results. Returns None if loading fails.
"""
try:
results_df = self.results_manager.load_optimization_results(filename)
if results_df is None:
return None
# Find best iteration by score instead of assuming row 0
score_cols = [c for c in results_df.columns if c.lower() in ['score', 'objective', 'kge', 'fitness', 'nse']]
if score_cols and not results_df.empty:
score_col = score_cols[0]
# Determine if we should minimize or maximize
metric = self._get_config_value(lambda: self.config.optimization.metric, 'KGE').upper()
minimize_metrics = {'RMSE', 'MAE', 'BIAS', 'MSE', 'MARE', 'PBIAS', 'NRMSE'}
if metric in minimize_metrics:
best_idx = results_df[score_col].idxmin()
else:
best_idx = results_df[score_col].idxmax()
best_iteration = results_df.loc[best_idx].to_dict()
else:
# Fall back to last row if no score column found
best_iteration = results_df.iloc[-1].to_dict() if not results_df.empty else {}
# Convert DataFrame to dictionary format
results = {
'parameters': results_df.to_dict(orient='records'),
'best_iteration': best_iteration,
'columns': results_df.columns.tolist()
}
return results
except (FileNotFoundError, ValueError, KeyError) as e:
self.logger.error(f"Error loading optimization results: {str(e)}")
return None