# SPDX-License-Identifier: GPL-3.0-or-later
# Copyright (C) 2024-2026 SYMFLUENCE Team <dev@symfluence.org>
"""Base Model Optimizer
Abstract base class providing unified optimization infrastructure for all hydrological
models. Implements template method pattern to delegate model-specific operations while
centralizing algorithm execution, parallel processing, results tracking, and final
evaluation workflows.
Mixin Components:
ConfigurableMixin, ParallelExecutionMixin, ResultsTrackingMixin,
RetryExecutionMixin, GradientOptimizationMixin
Abstract Methods (Must Implement in Subclass):
_get_model_name() -> str
_run_model_for_final_evaluation(output_dir) -> bool
_get_final_file_manager_path() -> Path
Optional Overrides:
_create_parameter_manager(), _create_calibration_target(), _create_worker(),
_apply_best_parameters_for_final(), _get_settings_directory()
"""
from __future__ import annotations
import logging
import random
import tempfile
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union
import numpy as np
import pandas as pd
from symfluence.core import ConfigurableMixin
from symfluence.core.constants import ModelDefaults
from symfluence.core.exceptions import OptimizationError, require_not_none
from ..mixins import GradientOptimizationMixin, ParallelExecutionMixin, ResultsTrackingMixin, RetryExecutionMixin
from ..mixins.parallel.local_scratch_manager import LocalScratchManager
from ..workers.base_worker import BaseWorker
from .algorithms import ALGORITHM_REGISTRY, get_algorithm
from .component_factory import OptimizerComponentFactory
from .evaluators import PopulationEvaluator, TaskBuilder
from .final_evaluation import FinalEvaluationOrchestrator, FinalResultsSaver
from .metrics_tracker import EvaluationMetricsTracker
if TYPE_CHECKING:
from symfluence.core.config.models import SymfluenceConfig
[docs]
class BaseModelOptimizer(
ConfigurableMixin,
ParallelExecutionMixin,
ResultsTrackingMixin,
RetryExecutionMixin,
GradientOptimizationMixin,
ABC
):
"""Abstract base class for model-specific optimizers.
Implements template method pattern providing unified optimization across all
hydrological models. Uses mixins for parallel execution, results tracking,
retry logic, and gradient-based optimization.
Subclasses must implement: _get_model_name(), _run_model_for_final_evaluation(),
_get_final_file_manager_path(). Components (param_manager, worker,
calibration_target) are created via overridable factory methods with
registry-based defaults.
"""
# Default algorithm parameters
DEFAULT_ITERATIONS = 100
DEFAULT_POPULATION_SIZE = 30
DEFAULT_PENALTY_SCORE = ModelDefaults.PENALTY_SCORE
[docs]
def __init__(
self,
config: Union['SymfluenceConfig', Dict[str, Any]],
logger: logging.Logger,
optimization_settings_dir: Optional[Path] = None,
reporting_manager: Optional[Any] = None
):
"""
Initialize the model optimizer.
Args:
config: Configuration (typed SymfluenceConfig or legacy dict)
logger: Logger instance
optimization_settings_dir: Optional path to optimization settings
reporting_manager: ReportingManager instance
"""
from symfluence.core.config.coercion import coerce_config
self._config = coerce_config(config, warn=True)
self.logger = logger
self.reporting_manager = reporting_manager
if isinstance(self._config, dict):
self.logger.warning(
"Configuration could not be fully validated — running with "
"raw dict. Some typed config accessors may return defaults "
"instead of configured values. Set SYMFLUENCE_STRICT_CONFIG=true "
"to surface the underlying validation error."
)
# Setup paths using typed config accessors
# Note: dict_key enables fallback for legacy dict-based configs
self.data_dir = Path(self._get_config_value(
lambda: self.config.system.data_dir, default='.',
dict_key='SYMFLUENCE_DATA_DIR'
))
self.domain_name = self._get_config_value(
lambda: self.config.domain.name, default='default',
dict_key='DOMAIN_NAME'
)
self.project_dir = self.data_dir / f"domain_{self.domain_name}"
# Note: experiment_id is provided by ConfigMixin property
# Optimization settings directory
if optimization_settings_dir is not None:
self.optimization_settings_dir = Path(optimization_settings_dir)
else:
model_name = self._get_model_name()
self.optimization_settings_dir = (
self.project_dir / 'settings' / model_name
)
# Results directory - now includes model name to avoid overwrites between models
algorithm = self._get_config_value(
lambda: self.config.optimization.algorithm, default='optimization',
dict_key='OPTIMIZATION_ALGORITHM'
).lower()
model_name = self._get_model_name()
self.results_dir = (
self.project_dir / 'optimization' / model_name /
f"{algorithm}_{self.experiment_id}"
)
self.results_dir.mkdir(parents=True, exist_ok=True)
# Initialize results tracking
self.__init_results_tracking__()
# Composed helpers (must be created before model-specific components)
self._factory = OptimizerComponentFactory(self.config, self.logger, self.project_dir)
# Create model-specific components
self.param_manager = self._create_parameter_manager()
self.calibration_target = self._create_calibration_target()
self.worker = self._create_worker()
# Algorithm parameters (using typed config with dict_key fallback)
self.max_iterations = self._get_config_value(
lambda: self.config.optimization.iterations, default=self.DEFAULT_ITERATIONS,
dict_key='OPTIMIZATION_MAX_ITERATIONS'
)
self.population_size = self._get_config_value(
lambda: self.config.optimization.population_size, default=self.DEFAULT_POPULATION_SIZE,
dict_key='OPTIMIZATION_POPULATION_SIZE'
)
self.target_metric = self._get_config_value(
lambda: self.config.optimization.metric, default='KGE',
dict_key='OPTIMIZATION_METRIC'
)
# Random seed
self.random_seed = self._get_config_value(
lambda: self.config.system.random_seed, dict_key='RANDOM_SEED'
)
if self.random_seed is not None and self.random_seed != 'None':
self._set_random_seeds(int(self.random_seed))
# Parallel processing state
self.parallel_dirs: Dict[int, Dict[str, Any]] = {}
self.default_sim_dir = self.results_dir # Initialize with results_dir as fallback
self._scratch_manager: Optional[LocalScratchManager] = None
self._original_project_dir: Optional[Path] = None
# Setup directories if NUM_PROCESSES is set, regardless of count (for isolation)
num_processes = self._get_config_value(
lambda: self.config.system.num_processes, default=1,
dict_key='NUM_PROCESSES'
)
if num_processes >= 1:
self._setup_parallel_dirs()
# Runtime config overrides (for algorithm-specific settings like Adam/LBFGS)
self._runtime_overrides: Dict[str, Any] = {}
# Algorithm registry
self._registry = ALGORITHM_REGISTRY
# Lazy-initialized components
self._task_builder: Optional[TaskBuilder] = None
self._population_evaluator: Optional[PopulationEvaluator] = None
self._final_evaluation_runner: Optional[Any] = None
self._results_saver: Optional[FinalResultsSaver] = None
# Composed helpers (factory created earlier, before component creation)
self._metrics_tracker = EvaluationMetricsTracker(
self.max_iterations, self.logger, self.format_elapsed_time
)
self._final_orchestrator: Optional[FinalEvaluationOrchestrator] = None
# =========================================================================
# Lazy-initialized component properties
# =========================================================================
@property
def task_builder(self) -> TaskBuilder:
"""Lazy-initialized task builder."""
if self._task_builder is None:
self._task_builder = TaskBuilder(
config=self.config,
project_dir=self.project_dir,
domain_name=self.domain_name,
optimization_settings_dir=self.optimization_settings_dir,
default_sim_dir=self.default_sim_dir,
parallel_dirs=self.parallel_dirs,
num_processes=self.num_processes,
target_metric=self.target_metric,
param_manager=self.param_manager,
logger=self.logger
)
if hasattr(self, 'summa_exe_path'):
self._task_builder.set_summa_exe_path(self.summa_exe_path)
return require_not_none(self._task_builder, "task_builder", OptimizationError)
@property
def population_evaluator(self) -> PopulationEvaluator:
"""Lazy-initialized population evaluator."""
if self._population_evaluator is None:
self._population_evaluator = PopulationEvaluator(
task_builder=self.task_builder,
worker=self.worker,
execute_batch=self.execute_batch,
use_parallel=self.use_parallel,
num_processes=self.num_processes,
model_name=self._get_model_name(),
logger=self.logger
)
return require_not_none(self._population_evaluator, "population_evaluator", OptimizationError)
@property
def results_saver(self) -> FinalResultsSaver:
"""Lazy-initialized results saver."""
if self._results_saver is None:
self._results_saver = FinalResultsSaver(
results_dir=self.results_dir,
experiment_id=self.experiment_id,
domain_name=self.domain_name,
logger=self.logger
)
return require_not_none(self._results_saver, "results_saver", OptimizationError)
@property
def final_orchestrator(self) -> FinalEvaluationOrchestrator:
"""Lazy-initialized final evaluation orchestrator."""
if self._final_orchestrator is None:
self._final_orchestrator = FinalEvaluationOrchestrator(
config=self.config,
logger=self.logger,
optimization_settings_dir=self.optimization_settings_dir,
results_saver=self.results_saver,
)
return self._final_orchestrator
def _visualize_progress(self, algorithm: str) -> None:
"""Helper to visualize optimization progress if reporting manager available."""
if self.reporting_manager:
calibration_variable = self._get_config_value(
lambda: self.config.optimization.calibration_variable, default='streamflow'
)
self.reporting_manager.visualize_optimization_progress(
self._iteration_history,
self.results_dir.parent / f"{algorithm.lower()}_{self.experiment_id}", # Matches results_dir logic or pass results_dir
calibration_variable,
self.target_metric
)
calibrate_depth = self._get_config_value(
lambda: self.config.model.summa.calibrate_depth, default=False
)
if calibrate_depth:
self.reporting_manager.visualize_optimization_depth_parameters(
self._iteration_history,
self.results_dir.parent / f"{algorithm.lower()}_{self.experiment_id}"
)
# =========================================================================
# Default factory methods using registry-based discovery
# =========================================================================
def _create_parameter_manager_default(self):
"""Default factory for parameter managers using registry discovery.
Delegates to OptimizerComponentFactory for registry lookup and
instantiation. Override _create_parameter_manager() if a
non-standard constructor is needed.
"""
return self._factory.create_parameter_manager(self._get_model_name())
def _create_worker_default(self) -> BaseWorker:
"""Default factory for workers using registry discovery.
Delegates to OptimizerComponentFactory.
"""
return self._factory.create_worker(self._get_model_name())
def _create_calibration_target_default(self):
"""Default factory for calibration targets using centralized factory.
Delegates to OptimizerComponentFactory.
"""
model_name = self._get_model_name()
target_type = str(self._get_config_value(
lambda: self.config.optimization.target,
default='streamflow', dict_key='OPTIMIZATION_TARGET'
)).lower()
return self._factory.create_calibration_target(model_name, target_type)
def _get_settings_directory(self) -> Path:
"""Get model settings directory. Delegates to OptimizerComponentFactory."""
return self._factory.get_settings_directory(self._get_model_name())
# =========================================================================
# Abstract methods - must be implemented by subclasses
# =========================================================================
@abstractmethod
def _get_model_name(self) -> str:
"""Return the name of the model being optimized."""
pass
@abstractmethod
def _run_model_for_final_evaluation(self, output_dir: Path) -> bool:
"""Run the model for final evaluation (model-specific implementation)."""
pass
@abstractmethod
def _get_final_file_manager_path(self) -> Path:
"""Get path to the file manager used for final evaluation."""
pass
def _load_best_previous_params(self) -> Optional[Dict[str, float]]:
"""Load best parameters from previous optimization runs (warm-start).
Scans sibling run directories for best_params.json files and returns
the parameters from the run with the highest score. Prefers runs
that used the same optimization metric to avoid comparing scores
across incompatible metrics (e.g. KGE vs COMPOSITE). Accepts partial
matches when at least 50% of expected parameters are present —
missing parameters are filled with log-aware midpoints from the
parameter manager.
"""
import json
parent_dir = self.results_dir.parent
if not parent_dir.exists():
return None
# Get current model's expected parameter names for validation
expected_params = set(self.param_manager.all_param_names)
min_overlap = max(1, len(expected_params) // 2) # 50% threshold
# Determine current metric for same-metric preference
current_metric = self._get_config_value(
lambda: self.config.optimization.metric,
default='KGE',
dict_key='OPTIMIZATION_METRIC'
)
# Collect all valid candidates grouped by metric match
same_metric_best: Dict[str, Any] = {'score': -float('inf'), 'params': None, 'run': None, 'missing': set()}
other_metric_best: Dict[str, Any] = {'score': -float('inf'), 'params': None, 'run': None, 'missing': set()}
for run_dir in parent_dir.iterdir():
if not run_dir.is_dir() or run_dir == self.results_dir:
continue
for f in run_dir.iterdir():
if f.name.endswith('_best_params.json'):
try:
data = json.loads(f.read_text(encoding='utf-8'))
score = data.get('best_score', -float('inf'))
params = data.get('best_params')
run_metric = data.get('metric') # may be None for legacy runs
if not params:
continue
loaded_keys = set(params.keys())
overlap = expected_params & loaded_keys
missing = expected_params - loaded_keys
if len(overlap) < min_overlap:
self.logger.debug(
f"Skipping {run_dir.name}: insufficient "
f"overlap ({len(overlap)}/{len(expected_params)}, "
f"need {min_overlap})"
)
continue
# Categorise by metric match
metrics_match = (
run_metric is not None
and run_metric.upper() == current_metric.upper()
)
target = same_metric_best if metrics_match else other_metric_best
if score > target['score']:
target['score'] = score
target['params'] = params
target['run'] = run_dir.name
target['missing'] = missing
except (json.JSONDecodeError, OSError):
continue
# Prefer same-metric runs; fall back to cross-metric only if none
if same_metric_best['params'] is not None:
best_score = same_metric_best['score']
best_params = same_metric_best['params']
best_run = same_metric_best['run']
best_missing = same_metric_best['missing']
if other_metric_best['params'] is not None and other_metric_best['score'] > best_score:
self.logger.info(
f"Ignoring higher-scoring {other_metric_best['run']} "
f"(score={other_metric_best['score']:.4f}, different metric) "
f"in favour of same-metric run {best_run} "
f"(score={best_score:.4f}, metric={current_metric})"
)
elif other_metric_best['params'] is not None:
best_score = other_metric_best['score']
best_params = other_metric_best['params']
best_run = other_metric_best['run']
best_missing = other_metric_best['missing']
self.logger.warning(
f"No same-metric ({current_metric}) runs found; "
f"warm-starting from {best_run} which used a different metric "
f"(score={best_score:.4f})"
)
else:
best_params = None
best_run = None
best_missing = set()
if best_params is not None:
# Fill missing parameters with raw file values (not midpoints).
# These params weren't calibrated in the previous run, so their
# file values represent what the model was actually running with
# when it achieved its best score.
if best_missing:
try:
defaults = self.param_manager.get_initial_parameters(
skip_boundary_check=True
) or {}
except TypeError:
# Model's param manager doesn't support the kwarg
defaults = self.param_manager.get_initial_parameters() or {}
filled: Dict[str, float] = {}
for p in sorted(best_missing):
if p in defaults:
val = defaults[p]
try:
val = float(val)
except (TypeError, ValueError):
import numpy as np
val = float(np.asarray(val).flat[0])
best_params[p] = val
filled[p] = val
else:
# Fallback: midpoint from bounds
bounds = self.param_manager.param_bounds.get(p, {'min': 0.1, 'max': 10.0})
transform = bounds.get('transform', 'linear')
if transform == 'log' and bounds['min'] > 0:
import numpy as np
mid = float(np.exp((np.log(bounds['min']) + np.log(bounds['max'])) / 2))
else:
mid = float((bounds['min'] + bounds['max']) / 2)
best_params[p] = mid
filled[p] = mid
self.logger.info(
f"Warm-starting from {best_run} ({current_metric}={best_score:.4f}); "
f"filled {len(filled)} missing params: "
+ ", ".join(f"{k}={v:.4g}" for k, v in filled.items())
)
else:
self.logger.info(
f"Warm-starting from {best_run} ({current_metric}={best_score:.4f})"
)
# Only return expected params (drop any extra from previous run)
return {k: v for k, v in best_params.items() if k in expected_params}
return None
def _create_parameter_manager(self):
"""
Create the model-specific parameter manager.
Default implementation uses registry-based discovery via
_create_parameter_manager_default(). Override if:
- Non-standard constructor signature needed
- Pre-initialization logic required
- Custom path resolution needed
Examples of when to override:
- FUSE: Needs fuse_sim_dir computed before super().__init__()
- SUMMA: Uses summa_settings_dir instead of generic settings_dir
- GR: Uses gr_setup_dir instead of generic settings_dir
Returns:
Parameter manager instance
"""
return self._create_parameter_manager_default()
def _create_calibration_target(self):
"""
Create the model-specific calibration target.
Default implementation uses centralized factory via
_create_calibration_target_default(). Override rarely needed as
the factory handles registry lookup and fallbacks.
Returns:
Calibration target instance
"""
return self._create_calibration_target_default()
def _create_worker(self) -> BaseWorker:
"""
Create the model-specific worker.
Default implementation uses registry-based discovery via
_create_worker_default(). Override rarely needed as all workers
use the same constructor signature.
Returns:
Worker instance
"""
return self._create_worker_default()
# =========================================================================
# Utility methods
# =========================================================================
def _get_nsga2_objective_names(self) -> List[str]:
"""Resolve NSGA-II objective metric names in priority order."""
primary_metric = self._get_config_value(
lambda: self.config.optimization.nsga2.primary_metric, default=self.target_metric
)
secondary_metric = self._get_config_value(
lambda: self.config.optimization.nsga2.secondary_metric, default=self.target_metric
)
return [str(primary_metric).upper(), str(secondary_metric).upper()]
def _get_moead_objective_names(self) -> List[str]:
"""Resolve MOEA/D objective metric names in priority order."""
primary_metric = self._get_config_value(
lambda: self.config.optimization.moead.primary_metric, default=self.target_metric,
dict_key='MOEAD_PRIMARY_METRIC'
)
secondary_metric = self._get_config_value(
lambda: self.config.optimization.moead.secondary_metric, default=self.target_metric,
dict_key='MOEAD_SECONDARY_METRIC'
)
return [str(primary_metric).upper(), str(secondary_metric).upper()]
def _supports_multi_objective(self) -> bool:
"""Check if this model supports multi-objective optimization (NSGA-II).
All models support multi-objective optimization. SUMMA workers return
explicit 'objectives' lists; JAX/in-memory workers return 'metrics'
dicts from which objectives are extracted by name (e.g., KGE, NSE).
Returns:
True if multi-objective is supported, False otherwise.
"""
return True
def _get_config_initial_guess(self) -> Optional[Dict[str, Any]]:
"""Get and validate a config-specified optimizer initial guess."""
initial_guess = self._get_config_value(
lambda: self.config.optimization.initial_guess,
default=None,
dict_key='INITIAL_GUESS',
)
if not initial_guess:
return None
if not isinstance(initial_guess, dict):
raise ValueError(
"INITIAL_GUESS must be a dictionary of parameter names to values"
)
unknown_params = sorted(
set(initial_guess) - set(self.param_manager.all_param_names)
)
if unknown_params:
raise ValueError(
"INITIAL_GUESS contains unknown parameters: "
+ ", ".join(unknown_params)
)
return dict(initial_guess)
def _log_calibration_alignment(self) -> None:
"""Log basic calibration alignment info before optimization starts."""
try:
# Check if this is a multivariate target
if hasattr(self.calibration_target, 'variables') and self.calibration_target.variables:
# Multivariate calibration: check each variable's observed data
from symfluence.evaluation.registry import EvaluationRegistry
all_found = True
for var in self.calibration_target.variables:
evaluator = EvaluationRegistry.get_evaluator(
var, self.config, self.logger, self.project_dir, target=var
)
if evaluator and hasattr(evaluator, '_load_observed_data'):
obs = evaluator._load_observed_data()
if obs is None or obs.empty:
self.logger.warning(f"Calibration check: no observed data found for {var}")
all_found = False
else:
self.logger.info(f"Calibration check: {var} has {len(obs)} observed data points")
if not all_found:
self.logger.warning("Some variables in multivariate calibration lack observed data")
return
# Single-target calibration
if not hasattr(self.calibration_target, '_load_observed_data'):
return
obs = self.calibration_target._load_observed_data()
if obs is None or obs.empty:
self.logger.warning("Calibration check: no observed data found")
return
if not isinstance(obs.index, pd.DatetimeIndex):
obs.index = pd.to_datetime(obs.index)
calib_period = self.calibration_target._parse_date_range(
self._get_config_value(lambda: self.config.domain.calibration_period, default='')
)
obs_period = obs.copy()
if calib_period[0] and calib_period[1]:
obs_period = obs_period[(obs_period.index >= calib_period[0]) & (obs_period.index <= calib_period[1])]
eval_timestep = str(self._get_config_value(
lambda: self.config.optimization.calibration_timestep, default='native'
)).lower()
if eval_timestep != 'native' and hasattr(self.calibration_target, '_resample_to_timestep'):
obs_period = self.calibration_target._resample_to_timestep(obs_period, eval_timestep)
sim_start = self._get_config_value(lambda: self.config.domain.time_start)
sim_end = self._get_config_value(lambda: self.config.domain.time_end)
overlap = obs_period
if sim_start and sim_end:
sim_start = pd.Timestamp(sim_start)
sim_end = pd.Timestamp(sim_end)
overlap = obs_period[(obs_period.index >= sim_start) & (obs_period.index <= sim_end)]
self.logger.debug(
"Calibration data check | timestep=%s | obs=%d | calib_window=%d | overlap_with_sim=%d",
eval_timestep,
len(obs),
len(obs_period),
len(overlap)
)
except (KeyError, IndexError, TypeError, ValueError) as e:
self.logger.debug(f"Calibration alignment check failed: {e}")
def _set_random_seeds(self, seed: int) -> None:
"""Set random seeds for reproducibility."""
random.seed(seed)
np.random.seed(seed)
def _adjust_end_time_for_forcing(self, end_time_str: str) -> str:
"""Adjust end time to align with forcing data timestep."""
try:
forcing_timestep_seconds = self._get_config_value(
lambda: self.config.forcing.time_step_size, default=3600
)
if forcing_timestep_seconds >= 3600: # Hourly or coarser
# Parse the end time
end_time = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M')
# Calculate the last valid hour based on timestep
forcing_timestep_hours = forcing_timestep_seconds / 3600
last_hour = int(24 - (24 % forcing_timestep_hours)) - forcing_timestep_hours
if last_hour < 0:
last_hour = 0
# Adjust if needed
if end_time.hour > last_hour or (end_time.hour == 23 and last_hour < 23):
end_time = end_time.replace(hour=int(last_hour), minute=0)
adjusted_str = end_time.strftime('%Y-%m-%d %H:%M')
self.logger.info(f"Adjusted end time from {end_time_str} to {adjusted_str} for {forcing_timestep_hours}h forcing")
return adjusted_str
return end_time_str
except (ValueError, TypeError) as e:
self.logger.warning(f"Could not adjust end time: {e}")
return end_time_str
def _resolve_sim_base_dir(self, algorithm: str) -> Path:
"""Resolve the base simulation directory, activating local scratch if configured.
When USE_LOCAL_SCRATCH is enabled and SLURM_TMPDIR is available,
settings, forcing, and observation data are copied to node-local
storage via LocalScratchManager. Results are staged back to the
permanent filesystem during cleanup().
Subclass overrides of ``_setup_parallel_dirs`` should call this instead
of hardcoding ``self.project_dir / 'simulations' / ...`` so that every
model benefits from scratch redirection on HPC.
Returns the ``base_dir`` that should be passed to
``setup_parallel_processing()``.
"""
permanent_base = self.project_dir / 'simulations' / f'run_{algorithm}'
use_local_scratch = self._get_config_value(
lambda: self.config.system.use_local_scratch, default=False,
dict_key='USE_LOCAL_SCRATCH',
)
self._scratch_base_dir = None
self._permanent_base_dir = permanent_base
if use_local_scratch:
mgr = LocalScratchManager(
config=self.config,
logger=self.logger,
project_dir=self.project_dir,
algorithm_name=algorithm,
)
if mgr.use_scratch and mgr.setup_scratch_space():
self._scratch_manager = mgr
scratch_project = mgr.get_effective_project_dir()
base_dir = scratch_project / 'simulations' / f'run_{algorithm}'
base_dir.mkdir(parents=True, exist_ok=True)
self._scratch_base_dir = base_dir
self._original_project_dir = self.project_dir
self.project_dir = scratch_project
self.logger.info(
f"USE_LOCAL_SCRATCH enabled — project_dir redirected to "
f"node-local storage: {self.project_dir}"
)
else:
self.logger.warning(
"USE_LOCAL_SCRATCH requested but scratch setup failed or "
"SLURM_TMPDIR unavailable. Using standard filesystem."
)
base_dir = permanent_base
else:
base_dir = permanent_base
try:
base_dir.mkdir(parents=True, exist_ok=True)
except PermissionError:
fallback = Path(tempfile.gettempdir()) / "symfluence" / self.domain_name / f'run_{algorithm}'
fallback.mkdir(parents=True, exist_ok=True)
self.logger.warning(
f"Simulations directory not writable: {base_dir}. "
f"Falling back to scratch: {fallback}"
)
base_dir = fallback
return base_dir
def _setup_parallel_dirs(self) -> None:
"""Setup parallel processing directories."""
algorithm = self._get_config_value(
lambda: self.config.optimization.algorithm, default='optimization'
).lower()
base_dir = self._resolve_sim_base_dir(algorithm)
self.parallel_dirs = self.setup_parallel_processing(
base_dir,
self._get_model_name(),
self.experiment_id
)
if not self.use_parallel and self.parallel_dirs:
self.default_sim_dir = self.parallel_dirs[0].get('sim_dir', self.results_dir)
else:
self.default_sim_dir = self.results_dir
# =========================================================================
# Evaluation methods
# =========================================================================
[docs]
def log_iteration_progress(
self,
algorithm_name: str,
iteration: int,
best_score: float,
secondary_score: Optional[float] = None,
secondary_label: Optional[str] = None,
n_improved: Optional[int] = None,
population_size: Optional[int] = None,
crash_stats: Optional[Dict[str, Any]] = None
) -> None:
"""Log optimization progress. Delegates to EvaluationMetricsTracker."""
self._metrics_tracker.log_iteration_progress(
algorithm_name, iteration, best_score,
secondary_score=secondary_score, secondary_label=secondary_label,
n_improved=n_improved, population_size=population_size,
crash_stats=crash_stats
)
[docs]
def log_initial_population(
self,
algorithm_name: str,
population_size: int,
best_score: float
) -> None:
"""Log initial population completion. Delegates to EvaluationMetricsTracker."""
self._metrics_tracker.log_initial_population(algorithm_name, population_size, best_score)
def _evaluate_solution(
self,
normalized_params: np.ndarray,
proc_id: int = 0
) -> float:
"""
Evaluate a normalized parameter set.
Args:
normalized_params: Normalized parameters [0, 1]
proc_id: Process ID for parallel execution
Returns:
Fitness score
"""
score = self.population_evaluator.evaluate_solution(normalized_params, proc_id)
self._metrics_tracker.track_evaluation(score)
return score
[docs]
def get_crash_stats(self) -> Dict[str, Any]:
"""Return crash rate statistics. Delegates to EvaluationMetricsTracker."""
return self._metrics_tracker.get_crash_stats()
def _create_gradient_callback(self) -> Optional[Callable]:
"""Create native gradient callback if worker supports autodiff.
Returns:
Callable (x_normalized -> (loss, gradient)) or None if unsupported.
"""
# Check if worker supports native gradients
if not hasattr(self, 'worker') or self.worker is None:
return None
if not hasattr(self.worker, 'supports_native_gradients'):
return None
if not self.worker.supports_native_gradients():
return None
# Get optimization metric from config
# Uses self.target_metric which is set in __init__ from config.optimization.metric
# This matches _extract_primary_score in base_worker.py to ensure FD and native
# gradient paths optimize the same objective
metric = self.target_metric.lower()
# Get parameter names and bounds for gradient transformation
param_names = self.param_manager.all_param_names
bounds = self.param_manager.get_parameter_bounds()
# Compute scale factors for gradient chain rule
# d(loss)/d(x_norm) = d(loss)/d(x_phys) * d(x_phys)/d(x_norm)
# where d(x_phys)/d(x_norm) = (upper - lower) for linear scaling
scale_factors = np.array([
bounds[name]['max'] - bounds[name]['min']
for name in param_names
])
def gradient_callback(x_normalized: np.ndarray) -> Tuple[float, np.ndarray]:
"""
Compute loss and gradient for normalized parameters.
Args:
x_normalized: Parameters in [0,1] normalized space
Returns:
Tuple of (loss, gradient_normalized) where:
- loss: Scalar loss value (negative of metric, for minimization)
- gradient_normalized: Gradient in normalized [0,1] space
"""
# Denormalize to physical parameters
params_dict = self.param_manager.denormalize_parameters(x_normalized)
# Call worker's evaluate_with_gradient
loss, grad_dict = self.worker.evaluate_with_gradient(params_dict, metric)
if grad_dict is None:
raise RuntimeError(
f"Worker returned None gradient despite supporting native gradients. "
f"Check {self.worker.__class__.__name__}.evaluate_with_gradient() implementation."
)
# Convert gradient dict to array (same order as param_names)
grad_physical = np.array([grad_dict[name] for name in param_names])
# Transform gradient from physical to normalized space via chain rule
grad_normalized = grad_physical * scale_factors
return loss, grad_normalized
self.logger.info(
f"Native gradient callback created for {self._get_model_name()} "
f"({len(param_names)} parameters)"
)
return gradient_callback
def _get_gradient_mode(self) -> str:
"""
Get gradient computation mode from configuration.
Returns:
One of: 'auto', 'native', 'finite_difference'
- 'auto': Use native gradients if available, else finite differences
- 'native': Require native gradients (error if unavailable)
- 'finite_difference': Always use FD (useful for comparison/debugging)
"""
return self._get_config_value(
lambda: self.config.optimization.gradient_mode,
default='auto',
dict_key='GRADIENT_MODE'
)
def _evaluate_population(
self,
population: np.ndarray,
iteration: int = 0
) -> np.ndarray:
"""Evaluate a population of normalized parameter sets in parallel.
Args:
population: Array shape (n_individuals, n_parameters) in [0, 1]
iteration: Current generation number
Returns:
Array shape (n_individuals,) with fitness scores
"""
base_seed = self.random_seed if hasattr(self, 'random_seed') else None
return self.population_evaluator.evaluate_population(
population, iteration, base_random_seed=base_seed
)
def _evaluate_population_objectives(
self,
population: np.ndarray,
objective_names: List[str],
iteration: int = 0
) -> np.ndarray:
"""Evaluate population for multiple objectives (NSGA-II/MOEA-D).
Args:
population: Array shape (n_individuals, n_parameters) in [0, 1]
objective_names: List of metric names to extract (e.g., ['KGE', 'NSE'])
iteration: Current generation number
Returns:
Array shape (n_individuals, n_objectives)
"""
base_seed = self.random_seed if hasattr(self, 'random_seed') else None
return self.population_evaluator.evaluate_population_objectives(
population, objective_names, iteration, base_random_seed=base_seed
)
# =========================================================================
# Algorithm implementations
# =========================================================================
def _save_pareto_front(self, result: Dict[str, Any], algorithm_name: str) -> Optional[Path]:
"""Save Pareto front from multi-objective optimization results.
Args:
result: Algorithm result dict containing 'pareto_front' and 'pareto_objectives'
algorithm_name: Name of the algorithm
Returns:
Path to saved Pareto front CSV, or None if saving failed
"""
import pandas as pd
pareto_solutions = result.get('pareto_front')
pareto_objectives = result.get('pareto_objectives')
if pareto_solutions is None or pareto_objectives is None:
return None
if len(pareto_solutions) == 0:
self.logger.warning("Pareto front is empty, nothing to save")
return None
# Build records with objectives and parameters
records = []
param_names = self.param_manager.all_param_names
for i, (solution, objectives) in enumerate(zip(pareto_solutions, pareto_objectives)):
record = {'solution_id': i}
# Add objectives
for j, obj in enumerate(objectives):
record[f'obj_{j}'] = obj
# Add denormalized parameters
params_dict = self.param_manager.denormalize_parameters(solution)
for param_name in param_names:
val = params_dict.get(param_name)
if isinstance(val, (list, np.ndarray)):
val = val[0] if len(val) > 0 else None
record[param_name] = val
records.append(record)
df = pd.DataFrame(records)
# Generate filename (sanitize algorithm name for filesystem)
safe_algorithm = algorithm_name.lower().replace('/', '_')
filename = f"{self.experiment_id}_{safe_algorithm}_pareto_front.csv"
pareto_path = self.results_dir / filename
df.to_csv(pareto_path, index=False)
self.logger.info(f"Saved Pareto front ({len(records)} solutions) to {pareto_path}")
return pareto_path
def _run_default_only(self, algorithm_name: str) -> Path:
"""
Run a single default evaluation when no parameters are configured.
"""
self.start_timing()
self.logger.info(
f"No parameters configured for {self._get_model_name()} - running default evaluation only"
)
score = self.DEFAULT_PENALTY_SCORE
final_result = self.run_final_evaluation({})
if final_result and isinstance(final_result, dict):
metrics = final_result.get('final_metrics', {})
score = metrics.get(self.target_metric, self.DEFAULT_PENALTY_SCORE)
self.record_iteration(0, score, {})
self.update_best(score, {}, 0)
self.save_best_params(algorithm_name)
# Save results
results_path = self.save_results(algorithm_name, standard_filename=True)
if results_path is None:
raise OptimizationError(f"Failed to save results for {algorithm_name} default evaluation")
return results_path
def _build_algorithm_callbacks(self, algorithm_name: str = 'optimization') -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Build callback functions and kwargs dict for algorithm.optimize().
Args:
algorithm_name: Algorithm name used for checkpoint file naming.
Returns:
Tuple of (callbacks_dict, kwargs_dict) where callbacks_dict contains
the core function bindings and kwargs_dict contains additional settings.
"""
checkpoint_interval = self._get_config_value(
lambda: self.config.optimization.checkpoint_interval,
default=10, dict_key='CHECKPOINT_INTERVAL',
)
def evaluate_solution(normalized_params, proc_id=0):
return self._evaluate_solution(normalized_params, proc_id)
def evaluate_population(population, iteration=0):
return self._evaluate_population(population, iteration)
def denormalize_params(normalized):
return self.param_manager.denormalize_parameters(normalized)
def save_checkpoint(iteration: int):
try:
self.save_results(algorithm_name, standard_filename=True)
self.save_best_params(algorithm_name)
self.logger.debug(f"Checkpoint saved at iteration {iteration}")
except Exception as e: # noqa: BLE001 — must-not-raise contract
self.logger.warning(f"Checkpoint save failed at iteration {iteration}: {e}")
def record_iteration(iteration, score, params, additional_metrics=None):
crash_stats = self.get_crash_stats()
merged = dict(additional_metrics or {}, **{
'crash_count': crash_stats['crash_count'],
'crash_rate': crash_stats['crash_rate'],
})
self.record_iteration(iteration, score, params, additional_metrics=merged)
if iteration > 0 and iteration % checkpoint_interval == 0:
save_checkpoint(iteration)
def update_best(score, params, iteration):
self.update_best(score, params, iteration)
def log_progress(alg_name, iteration, best_score, n_improved=None, pop_size=None, secondary_score=None, secondary_label=None):
self.log_iteration_progress(
alg_name, iteration, best_score,
secondary_score=secondary_score, secondary_label=secondary_label,
n_improved=n_improved, population_size=pop_size,
crash_stats=self.get_crash_stats()
)
callbacks = {
'evaluate_solution': evaluate_solution,
'evaluate_population': evaluate_population,
'denormalize_params': denormalize_params,
'record_iteration': record_iteration,
'update_best': update_best,
'log_progress': log_progress,
}
kwargs = {
'log_initial_population': self.log_initial_population,
'num_processes': self.num_processes if hasattr(self, 'num_processes') else 1,
'results_dir': self.results_dir,
'experiment_id': self.experiment_id,
}
return callbacks, kwargs
[docs]
def run_optimization(self, algorithm_name: str) -> Path:
"""Run optimization using a specified algorithm from the registry.
Args:
algorithm_name: Algorithm name (case-insensitive)
Returns:
Path to results JSON file
"""
self.start_timing()
self.logger.info(f"Starting {algorithm_name.upper()} optimization for {self._get_model_name()}")
self._log_calibration_alignment()
n_params = len(self.param_manager.all_param_names)
if n_params == 0:
return self._run_default_only(algorithm_name)
# Get algorithm instance from registry
algorithm = get_algorithm(algorithm_name, self.config, self.logger)
# Build callbacks and kwargs for the algorithm
callbacks, kwargs = self._build_algorithm_callbacks(algorithm.name)
# Seed optimization using INITIAL_GUESS, previous best parameters, then defaults
skip_warm_start = self._get_config_value(lambda: None, default=False, dict_key='SKIP_WARM_START')
initial_params_dict = self._get_config_initial_guess()
try:
if initial_params_dict:
self.logger.info(
"Using config-specified INITIAL_GUESS for optimization seeding"
)
elif skip_warm_start:
self.logger.info(
"SKIP_WARM_START is set — skipping warm-start from previous runs. "
"Optimization will start from model default initial parameters."
)
initial_params_dict = self.param_manager.get_initial_parameters()
else:
initial_params_dict = self._load_best_previous_params()
if initial_params_dict is None:
initial_params_dict = self.param_manager.get_initial_parameters()
if initial_params_dict:
self.logger.debug("Using initial parameter guess for optimization seeding")
if initial_params_dict:
initial_guess = self.param_manager.normalize_parameters(initial_params_dict)
kwargs['initial_guess'] = initial_guess
self.logger.info(
f"Initial guess prepared: {len(initial_guess)} params, "
f"mean={initial_guess.mean():.4f}, "
f"sample: {list(initial_params_dict.items())[:3]}"
)
else:
self.logger.warning("No initial parameter guess available (dict was None/empty)")
except (KeyError, AttributeError, ValueError) as e:
self.logger.warning(f"Failed to prepare initial parameter guess: {e}")
# Guard multi-objective algorithms: only SUMMA supports multi-objective
if algorithm_name.lower() in ['nsga2', 'nsga-ii', 'moead', 'moea-d', 'moea_d']:
if not self._supports_multi_objective():
raise OptimizationError(
f"{algorithm_name.upper()} multi-objective optimization is not supported for {self._get_model_name()}. "
f"Only SUMMA models have the worker infrastructure to return multiple objectives. "
f"Use single-objective algorithms like DDS, PSO, DE, or SCE-UA instead."
)
# For NSGA-II, add multi-objective support
if algorithm_name.lower() in ['nsga2', 'nsga-ii']:
kwargs['evaluate_population_objectives'] = self._evaluate_population_objectives
kwargs['objective_names'] = self._get_nsga2_objective_names()
kwargs['multiobjective'] = bool(self._get_config_value(
lambda: self.config.optimization.nsga2.multi_target, default=False
))
# For MOEA/D, add multi-objective support only when explicitly enabled
if algorithm_name.lower() in ['moead', 'moea-d', 'moea_d']:
moead_multi = bool(self._get_config_value(
lambda: self.config.optimization.moead.multi_target, default=False,
dict_key='MOEAD_MULTI_TARGET'
))
kwargs['multiobjective'] = moead_multi
if moead_multi:
kwargs['evaluate_population_objectives'] = self._evaluate_population_objectives
kwargs['objective_names'] = self._get_moead_objective_names()
# For gradient-based algorithms (Adam, L-BFGS), add native gradient support
if algorithm_name.lower() in ['adam', 'lbfgs']:
gradient_callback = self._create_gradient_callback()
gradient_mode = self._get_gradient_mode()
if gradient_callback is not None:
kwargs['compute_gradient'] = gradient_callback
self.logger.info(
f"Native gradient support enabled for {algorithm_name.upper()} "
f"(mode: {gradient_mode})"
)
else:
self.logger.info(
f"Using finite-difference gradients for {algorithm_name.upper()} "
f"(native gradients not available for {self._get_model_name()})"
)
kwargs['gradient_mode'] = gradient_mode
# Run the algorithm
result = algorithm.optimize(
n_params=n_params,
**callbacks,
**kwargs
)
# Save results
results_path = self.save_results(algorithm.name, standard_filename=True)
self.save_best_params(algorithm.name)
self._visualize_progress(algorithm.name)
# Save Pareto front for multi-objective algorithms
if result.get('pareto_objectives') is not None and result.get('pareto_front') is not None:
self._save_pareto_front(result, algorithm.name)
self.logger.info(f"{algorithm.name} completed in {self.format_elapsed_time()}")
# Run final evaluation on full period
if result.get('best_params'):
final_result = self.run_final_evaluation(result['best_params'])
if final_result:
self._save_final_evaluation_results(final_result, algorithm.name)
return results_path # type: ignore[return-value]
# =========================================================================
# Algorithm convenience methods - delegate to run_optimization()
# =========================================================================
[docs]
def run_dds(self) -> Path:
"""Run Dynamically Dimensioned Search (DDS) optimization."""
return self.run_optimization('dds')
[docs]
def run_pso(self) -> Path:
"""Run Particle Swarm Optimization (PSO)."""
return self.run_optimization('pso')
[docs]
def run_de(self) -> Path:
"""Run Differential Evolution (DE) optimization."""
return self.run_optimization('de')
[docs]
def run_sce(self) -> Path:
"""Run Shuffled Complex Evolution (SCE-UA) optimization."""
return self.run_optimization('sce-ua')
[docs]
def run_async_dds(self) -> Path:
"""Run Asynchronous Parallel DDS optimization."""
return self.run_optimization('async_dds')
[docs]
def run_nsga2(self) -> Path:
"""Run NSGA-II multi-objective optimization."""
return self.run_optimization('nsga2')
[docs]
def run_cmaes(self) -> Path:
"""Run CMA-ES (Covariance Matrix Adaptation Evolution Strategy) optimization."""
return self.run_optimization('cmaes')
[docs]
def run_dream(self) -> Path:
"""Run DREAM (DiffeRential Evolution Adaptive Metropolis) optimization."""
return self.run_optimization('dream')
[docs]
def run_glue(self) -> Path:
"""Run GLUE (Generalized Likelihood Uncertainty Estimation) analysis."""
return self.run_optimization('glue')
[docs]
def run_basin_hopping(self) -> Path:
"""Run Basin Hopping global optimization."""
return self.run_optimization('basin_hopping')
[docs]
def run_nelder_mead(self) -> Path:
"""Run Nelder-Mead simplex optimization."""
return self.run_optimization('nelder_mead')
[docs]
def run_ga(self) -> Path:
"""Run Genetic Algorithm (GA) optimization."""
return self.run_optimization('ga')
[docs]
def run_bayesian_opt(self) -> Path:
"""Run Bayesian Optimization with Gaussian Process surrogate."""
return self.run_optimization('bayesian_opt')
[docs]
def run_moead(self) -> Path:
"""Run MOEA/D (Multi-Objective Evolutionary Algorithm based on Decomposition)."""
return self.run_optimization('moead')
[docs]
def run_simulated_annealing(self) -> Path:
"""Run Simulated Annealing optimization."""
return self.run_optimization('simulated_annealing')
[docs]
def run_abc(self) -> Path:
"""Run Approximate Bayesian Computation (ABC-SMC) for likelihood-free inference."""
return self.run_optimization('abc')
[docs]
def run_adam(self, steps: int = 100, lr: float = 0.01) -> Path:
"""
Run Adam gradient-based optimization.
Args:
steps: Number of optimization steps (passed via config ADAM_STEPS)
lr: Learning rate (passed via config ADAM_LR)
Returns:
Path to results file
"""
# Store parameters in runtime overrides for the algorithm to use
self._runtime_overrides['ADAM_STEPS'] = steps
self._runtime_overrides['ADAM_LR'] = lr
return self.run_optimization('adam')
[docs]
def run_lbfgs(self, steps: int = 50, lr: float = 0.1) -> Path:
"""
Run L-BFGS gradient-based optimization.
Args:
steps: Maximum number of steps (passed via config LBFGS_STEPS)
lr: Initial step size (passed via config LBFGS_LR)
Returns:
Path to results file
"""
# Store parameters in runtime overrides for the algorithm to use
self._runtime_overrides['LBFGS_STEPS'] = steps
self._runtime_overrides['LBFGS_LR'] = lr
return self.run_optimization('lbfgs')
# =========================================================================
# Final Evaluation
# =========================================================================
[docs]
def run_final_evaluation(self, best_params: Dict[str, float]) -> Optional[Dict[str, Any]]:
"""Run final evaluation with best parameters over full experiment period.
Evaluates the model on both calibration and evaluation windows, then restores
settings for reproducibility. Subclasses may override for custom behavior.
Args:
best_params: Best parameters from optimization (physical units)
Returns:
Dict with 'final_metrics', 'calibration_metrics', 'evaluation_metrics',
'success', 'best_params', or None if failed
"""
self.logger.info("=" * 60)
self.logger.info("RUNNING FINAL EVALUATION")
self.logger.info("=" * 60)
self.logger.info("Running model with best parameters over full simulation period...")
try:
# Update file manager for full period
self._update_file_manager_for_final_run()
# Apply best parameters directly
if not self._apply_best_parameters_for_final(best_params):
self.logger.error("Failed to apply best parameters for final evaluation")
return None
# Setup output directory
final_output_dir = self.results_dir / 'final_evaluation'
final_output_dir.mkdir(parents=True, exist_ok=True)
# Update file manager output path
self._update_file_manager_output_path(final_output_dir)
# Run model directly using specific hook
if not self._run_model_for_final_evaluation(final_output_dir):
self.logger.error(f"{self._get_model_name()} run failed during final evaluation")
return None
# Calculate metrics for both periods (calibration_only=False)
metrics = self.calibration_target.calculate_metrics(
final_output_dir,
calibration_only=False
)
if not metrics:
self.logger.error("Failed to calculate final evaluation metrics")
return None
# Extract period-specific metrics
calib_metrics = self._extract_period_metrics(metrics, 'Calib')
eval_metrics = self._extract_period_metrics(metrics, 'Eval')
# Log detailed results
self._log_final_evaluation_results(calib_metrics, eval_metrics)
final_result = {
'final_metrics': metrics,
'calibration_metrics': calib_metrics,
'evaluation_metrics': eval_metrics,
'success': True,
'best_params': best_params
}
return final_result
except (IOError, OSError, ValueError, RuntimeError) as e:
self.logger.error(f"Error in final evaluation: {e}")
import traceback
self.logger.error(f"Traceback: {traceback.format_exc()}")
return None
finally:
# Restore optimization settings
self._restore_model_decisions_for_optimization()
self._restore_file_manager_for_optimization()
def _extract_period_metrics(self, all_metrics: Dict, period_prefix: str) -> Dict:
"""Extract metrics for a specific period. Delegates to FinalEvaluationOrchestrator."""
return self.final_orchestrator.extract_period_metrics(all_metrics, period_prefix)
def _log_final_evaluation_results(self, calib_metrics: Dict, eval_metrics: Dict) -> None:
"""Log final evaluation results. Delegates to FinalEvaluationOrchestrator."""
self.final_orchestrator.log_results(calib_metrics, eval_metrics)
def _update_file_manager_for_final_run(self) -> None:
"""Update file manager for full experiment period. Delegates to FinalEvaluationOrchestrator."""
self.final_orchestrator.update_file_manager_for_final_run(
self._get_final_file_manager_path()
)
def _restore_model_decisions_for_optimization(self) -> None:
"""Restore model decisions. Delegates to FinalEvaluationOrchestrator."""
self.final_orchestrator.restore_model_decisions()
def _restore_file_manager_for_optimization(self) -> None:
"""Restore file manager to calibration period. Delegates to FinalEvaluationOrchestrator."""
self.final_orchestrator.restore_file_manager(self._get_final_file_manager_path())
def _apply_best_parameters_for_final(self, best_params: Dict[str, float]) -> bool:
"""Apply best parameters for final evaluation."""
try:
return self.worker.apply_parameters(
best_params,
self.optimization_settings_dir,
config=self.config
)
except (ValueError, IOError, RuntimeError) as e:
self.logger.error(f"Error applying parameters for final evaluation: {e}")
return False
def _update_file_manager_output_path(self, output_dir: Path) -> None:
"""Update file manager output path. Delegates to FinalEvaluationOrchestrator."""
self.final_orchestrator.update_file_manager_output_path(
self._get_final_file_manager_path(), output_dir
)
def _save_final_evaluation_results(
self,
final_result: Dict[str, Any],
algorithm: str
) -> None:
"""
Save final evaluation results to JSON file.
Args:
final_result: Final evaluation results dictionary
algorithm: Algorithm name (e.g., 'PSO', 'DDS')
"""
self.final_orchestrator.save_results(final_result, algorithm)
# =========================================================================
# Cleanup
# =========================================================================
[docs]
def cleanup(self) -> None:
"""Cleanup parallel processing directories and temporary files."""
self._shutdown_mpi_strategy()
# Stage results from local scratch back to permanent storage
if getattr(self, '_scratch_manager', None) is not None:
self._scratch_manager.stage_results_back()
if self._original_project_dir is not None:
self.project_dir = self._original_project_dir
if self.parallel_dirs:
self.cleanup_parallel_processing(self.parallel_dirs)